开发者

GroupBy then ObserveOn loses items

开发者 https://www.devze.com 2023-03-13 03:40 出处:网络
Try th开发者_如何学运维is in LinqPad: Observable .Range(0, 10) .GroupBy(x => x % 3) .ObserveOn(Scheduler.NewThread)

Try th开发者_如何学运维is in LinqPad:

Observable
    .Range(0, 10)
    .GroupBy(x => x % 3)
    .ObserveOn(Scheduler.NewThread)
    .SelectMany(g => g.Select(x => g.Key + " " + x))
    .Dump()

The results are clearly non-deterministic, but in every case I fail to receive all 10 items. My current theory is that the items are going through the grouped observable unobserved as the pipeline marshals to the new thread.


Linqpad doesn't know that you're running all of these threads - it gets to the end of the code immediately (remember, Rx statements don't always act synchronously, that's the idea!), waits a few milliseconds, then ends by blowing away the AppDomain and all of its threads (that haven't caught up yet). Try adding a Thread.Sleep to the end to give the new threads time to catch up.

As an aside, Scheduler.NewThread is a very inefficient scheduler, EventLoopScheduler (create exactly one thread), or Scheduler.TaskPool (use the TPL pool, as if you created a Task for each item) are much more efficient (of course in this case since you only have 10 items, Scheduler.Immediate is the best!)


It appears here that the problem is in timing between starting the subscription to the new group in the GroupBy operation and the delay of implementing the new subscription. If you increase the number of iterations from 10 to 100, you should start seeing some results after a period of time.

Also, if you change the GroupBy to .Where(x => x % 3 == 0), you will likely notice that no values are lost because the dynamic subscription to the IObservable groups doesn't need to initialize new observers.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号