开发者

Catching exceptions which may be thrown from a Subscription OnNext Action

开发者 https://www.devze.com 2023-03-30 21:39 出处:网络
I\'m somewhat new to Rx.NET.Is it possible to catch an exception which may be thrown by any of the subscribers?Take the following...

I'm somewhat new to Rx.NET. Is it possible to catch an exception which may be thrown by any of the subscribers? Take the following...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

Currently I'm catching on a per subscription basis with an instance of the following. The implementation of which just uses a ManualResetEvent to wake up a waiting thread.

开发者_运维问答public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

and using it like so...

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

I feel like there must be some better way. Are all of the error handling capabilities in Rx.NET specifically for dealing with errors observables?

EDIT: Per request, my implementation is https://gist.github.com/1409829 (interface and implementation separated into different assemblies in prod code). Feedback is welcome. This may seem silly, but I'm using castle windsor to manage many different Rx subscribers. This exception catcher is registered with the container like this

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

It would then be used like this where observable is instance of IObservable:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);


The built-in Observable operators do not do what you are asking for by default (much like events), but you could make an extension method that would do this.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.Create<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

Then any observable could be wrapped by this method to get the behavior you described.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号