开发者

Create an IObservable and return the result of a cached async operation immediately

开发者 https://www.devze.com 2023-01-29 05:20 出处:网络
I\'m using reactive extensions to call a async method and I want to cache the result and return this for subsequent calls to the method.

I'm using reactive extensions to call a async method and I want to cache the result and return this for subsequent calls to the method.

How can I create an Observable instance, return it and provided the data (cacheResult) the subscribe requires?

public IObservable<Bar> GetBars(int pageIndex, int pageSize)
{
   var @params = new object[] { pageIndex, pageSize };
   var cachedResult = _cache.Get(@params);
   if (cachedResult != null)
   {
 // How do I create a Observable instance and return the 'cacheResult'...
 return ...
   }

   var observable = new BaseObservable<Bar>();
   _components.WithSsoToken(_configuration.SsoToken)
      .Get(@params)
      .Select(Map)
      .Subscribe(c =>
                     {
                          _cache.Add(@params, c);
                          observable.Publish(c);
                          observable.Completed();
                     }, exception =>
                     {
 开发者_如何学运维                       observable.Failed(exception);
                        observable.Completed();
                     });

       return observable;
}


I believe you are looking for Observable.Return:

return Observable.Return((Bar)cachedResult);

On an unrelated note:

  • There's no need to return a BaseObservable<T>. You should return a Subject<T> as it does what your implementation is doing but is thread safe (you should also call .AsObservable() on the return value to it can't be cast back).
  • You use Do to add the value to the cache:

var observable = new Subject<Bar>();
_components.WithSsoToken(_configuration.SsoToken)
    .Get(@params)
    .Select(Map)
    .Subscribe(c =>
    {
        _cache.Add(@params, c);
        observable.OnNext(c);
        observable.OnCompleted();
    }, exception =>
    {
        observable.OnError(exception);
    });

return observable.AsObservable();


Conveniently, I've written a class that does this pattern for you, check it out:

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ObservableAsyncMRUCache.cs

var cache = new ObservableAsyncMRUCache<int, int>(
    x => Observable.Return(x*10).Delay(1000) /* Return an IObservable here */, 
    100 /*items to cache*/,
    5 /* max in-flight, important for web calls */
    );

IObservable<int> futureResult = cache.AsyncGet(10);

futureResult.Subscribe(Console.WriteLine);
>>> 100

Some tricky things that it handles correctly:

  • It caches the last n items and throws away items that aren't being used
  • It ensures that no more than n items are running at the same time - if you don't do this, you can easily spawn out thousands of web calls if the cache is empty
  • If you ask for the same item twice in a row, the first request will initiate a request, and the second call will wait on the first one instead of spawning an identical request, so you won't end up querying data redundantly.
0

精彩评论

暂无评论...
验证码 换一张
取 消