开发者

Reactive Framework (RX) and dealing with events Asynchronously

开发者 https://www.devze.com 2023-01-06 03:29 出处:网络
So I\'m just playing around with RX and learning it. I started playing with Events, and wanted to know how to subscribe to events, and process the results in batches asynchronously. Allow me to explai

So I'm just playing around with RX and learning it. I started playing with Events, and wanted to know how to subscribe to events, and process the results in batches asynchronously. Allow me to explain with code:

Simple class that raises events:

public class EventRaisingClass
{
   public event EventHandler<SomeEventArgs> EventOccured;

   //some other code that raises event...
}

public class SomeEventArgs : EventArgs
{
    public SomeEventArgs(int data)
    {
        this.SomeArg = data;
    }

    public int SomeArg { get; private set; }
}

Then my Main:

public static void Main(string[] args)
{
    var eventRaiser = new EventRaisingClass();
    IObservable<IEvent<SomeEventArgs>> observable = 
        Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...

}

As you can see in my comments, when you just call subscribe like that, all the subsc开发者_如何学Cription code happens synchronously. Is there a way out of the box using RX to have the Subscribe be called on different threads whenever there's a new batch of events to work on?


bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...

SubscribeOn is to specify the schedule on which so-called "subscription side effects" are happening. For example, your observable can open a file each time somebody subscribes.

ObserveOn is to specify the schedule on which the call to the observer will happen every time when there is a new value. In practice, it is used more often than SubscribeOn.


I believe you're looking for SubscribeOn or ObserveOn, passing an IScheduler. There are several schedulers built-in under System.Concurrency; some of them use whatever thread is current, and others use specific threads.

This video has more info on the scheduler concept.

The Rx team also recently released a hands-on labs document which is the closest thing to a tutorial right now.

0

精彩评论

暂无评论...
验证码 换一张
取 消