开发者

Get previous element in IObservable without re-evaluating the sequence

开发者 https://www.devze.com 2022-12-30 05:40 出处:网络
In an IObservable sequence (in Reactive Extensions for .NET), I\'d like to get the value of the previous and current elements so that I can compare them.I found an example online similar to below whic

In an IObservable sequence (in Reactive Extensions for .NET), I'd like to get the value of the previous and current elements so that I can compare them. I found an example online similar to below which accomplishes the task:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

It w开发者_开发知识库orks fine except that it evaluates the sequence twice, which I would like to avoid. You can see that it is being evaluated twice with this code:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

The output shows twice as many of the debug lines as there are elements in the sequence.

I understand why this happens, but so far I haven't found an alternative that doesn't evaluate the sequence twice. How can I combine the previous and current with only one sequence evaluation?


There's a better solution to this I think, that uses Observable.Scan and avoids the double subscription:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

I've written this up on my blog here: http://www.zerobugbuild.com/?p=213

Addendum

A further modification allows you to work with arbitrary types more cleanly by using a result selector:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}


@James World addendum looks great to me, if not for Tuple<>, which I almost always dislike: "Was .Item1 the previous? Or was it the current one? I can't remember. And what's the first argument to the selector, was it the previous item?".

For that part I liked @dcstraw definition of a dedicated ItemWithPrevious<T>. So there you go, putting the two together (hopefully I did not mix up previous with current) with some renaming and facilities:

public static class ObservableExtensions
{
    public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
        this IObservable<TSource> source, 
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source.Scan(seed,
            (acc, current) => SortedPair.Create(current, acc.Current));
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IObservable<TSource> source,
        Func<SortedPair<TSource>, TResult> resultSelector,
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source
            .Scan(seed,
                (acc, current) => SortedPair.Create(current, acc.Current))
            .Select(p => resultSelector(p));
    }
}

public class SortedPair<T>
{
    public SortedPair(T current, T previous)
    {
        Current = current;
        Previous = previous;
    }

    public SortedPair(T current) : this(current, default(T)) { }

    public SortedPair() : this(default(T), default(T)) { }

    public T Current;
    public T Previous;
}

public class SortedPair
{
    public static SortedPair<T> Create<T>(T current, T previous)
    {
        return new SortedPair<T>(current, previous);
    }

    public static SortedPair<T> Create<T>(T current)
    {
        return new SortedPair<T>(current);
    }

    public static SortedPair<T> Create<T>()
    {
        return new SortedPair<T>();
    }
}


Evaluating twice is an indicator of a Cold observable. You can turn it to a Hot one by using .Publish():

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();


If you only need to access the previous element during subscription, this is probably the simplest thing that will work. (I'm sure there's a better way, maybe a buffer operator on IObservable? The documentation is pretty sparse at the moment, so I can't really tell you.)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
    {
        if (prev != null)
        {
            // Previous and current element available here
        }

        prev = curr;                              

    });

EventArgs is just a stand-in for the type of your event's argument.


It turns out you can use a variable to hold the previous value and refer to it and reassign it within the chain of IObservable extensions. This even works within a helper method. With the code below I can now call CombineWithPrevious() on my IObservable to get a reference to the previous value, without re-evaluating the sequence.

public class ItemWithPrevious<T>
{
    public T Previous;
    public T Current;
}

public static class MyExtensions
{
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
    {
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
    }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消