I'm using reactive extensions to call a async method and I want to cache the result and return this for subsequent calls to the method.
How can I create an Observable instance, return it and provided the data (cacheResult) the subscribe requires?
public IObservable<Bar> GetBars(int pageIndex, int pageSize)
{
var @params = new object[] { pageIndex, pageSize };
var cachedResult = _cache.Get(@params);
if (cachedResult != null)
{
// How do I create a Observable instance and return the 'cacheResult'...
return ...
}
var observable = new BaseObservable<Bar>();
_components.WithSsoToken(_configuration.SsoToken)
.Get(@params)
.Select(Map)
.Subscribe(c =>
{
_cache.Add(@params, c);
observable.Publish(c);
observable.Completed();
}, exception =>
{
开发者_如何学运维 observable.Failed(exception);
observable.Completed();
});
return observable;
}
I believe you are looking for Observable.Return
:
return Observable.Return((Bar)cachedResult);
On an unrelated note:
- There's no need to return a
BaseObservable<T>
. You should return aSubject<T>
as it does what your implementation is doing but is thread safe (you should also call.AsObservable()
on the return value to it can't be cast back). - You use
Do
to add the value to the cache:
var observable = new Subject<Bar>();
_components.WithSsoToken(_configuration.SsoToken)
.Get(@params)
.Select(Map)
.Subscribe(c =>
{
_cache.Add(@params, c);
observable.OnNext(c);
observable.OnCompleted();
}, exception =>
{
observable.OnError(exception);
});
return observable.AsObservable();
Conveniently, I've written a class that does this pattern for you, check it out:
https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ObservableAsyncMRUCache.cs
var cache = new ObservableAsyncMRUCache<int, int>(
x => Observable.Return(x*10).Delay(1000) /* Return an IObservable here */,
100 /*items to cache*/,
5 /* max in-flight, important for web calls */
);
IObservable<int> futureResult = cache.AsyncGet(10);
futureResult.Subscribe(Console.WriteLine);
>>> 100
Some tricky things that it handles correctly:
- It caches the last n items and throws away items that aren't being used
- It ensures that no more than n items are running at the same time - if you don't do this, you can easily spawn out thousands of web calls if the cache is empty
- If you ask for the same item twice in a row, the first request will initiate a request, and the second call will wait on the first one instead of spawning an identical request, so you won't end up querying data redundantly.
精彩评论