Entries:
Comments:
Posts:

Loading User Information from Channel 9

Something went wrong getting user information from Channel 9

Latest Achievement:

Loading User Information from MSDN

Something went wrong getting user information from MSDN

Visual Studio Achievements

Latest Achievement:

Loading Visual Studio Achievements

Something went wrong getting the Visual Studio Achievements

Rx Workshop: Schedulers

Download

Right click “Save as…”

Learn about the use of schedulers to parameterize concurrency in Rx and to test applications using virtual time.

Download the Challenge

Tags:

Follow the Discussion

  • Jordan TerrellJordan Terrell

    FYI: The IScheduler interface in the released library is different than the one shown in this video. Probably because it was filmed prior to the change - just worth noting.

    Great stuff - thanks Rx team!

  • CKoenigCbrAInK Carsten

    Well - here is my try.

    I have to say I've got some problems with this. First it took me a horrible long time to realise that Subject can be used as an Observable-Source you can publish values to. And even worse is the way I have to use the Schedule/OnNext - mess.

    Don't know if there is any better way, but why was the way suggested by the video droped?

            IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
            {
                // Create an observable source of stock quotes
                var sub = new Subject<StockQuote>();
    
                foreach (var quote in quotes)
                {
                    var quote1 = quote;
                    scheduler.Schedule(quote.Date, () => sub.OnNext(quote1));
                }
    
                return sub;
            }
    
            IObservable<object> Query(IObservable<StockQuote> quotes)
            {
                // Write a query to grab the Microsoft "MSFT" stock quotes and output the closing price
                // HINT: Make sure you include a property in the result which has a type of DateTime
                return quotes.Where(q => q.Symbol == "MSFT").Select(q => new {q.Date, q.Close, q.High, q.Low, q.Open});
            }

  • okh397okh397

    there's another way, without a subject, although it's not prettier (but was easier to find for me anyway):

    return Observable.Create<StockQuote>(x =>
    {
    foreach (StockQuote stockQuote in quotes)
    {
    var myStockQuote = stockQuote;
    scheduler.Schedule(stockQuote.Date, () => x.OnNext(myStockQuote));
    }

    return x.OnCompleted;
    });

  • CKoenigCbrAInK Carsten

    Indeed this might be the indented way - thanks.

    But boths of theses seems to me like "breaking the pattern" - if we use a concrete scheduler in the definition of the Observable-Source then what about SubscribeOn (the one with the IScheduler overload)?

    What I had expected was something like Create with "Action<ISubscriber>" or something like

    public static IObservable<tData> ToObservable<tData>(this IEnumerable<Tuple<DateTimeOffset, tData>> source)
    {
    /* feed the data (snd) into a IObsevable and use the fst component for the scheduler, whatever it might be */
    }

  • ChevalN2Cheval Why not null?

    It runs and should do as the method explanations indicate but doesn't delay each quote as expected (unless I don't use the scheduler?), but I shouldn't think you need to for each over it like the other attempts above...

    Query Code:

    return from n in quotes.Where(p => p.Symbol == "MSFT")
      select new { date = n.Date, close = n.Close.ToString() };


    GetQuotes Code:

    var timer = Observable.Interval(TimeSpan.FromSeconds(5), scheduler).ObserveOn(this);
    return quotes.ToObservable().Zip(timer, (quoteList, timerList) => quoteList);

  • CKoenigCbrAInK Carsten

    Hi - you never have to "foreach" - you can allways use some LINQ-syntax to do the same (and tools like ReSharper even have some automatic code-conversation between the two ways) - it's just a matter of taste and the way the code might look if you use Select/Aggregate/whatever to make it LINQish.

  •       I am getting the following exception       
    Specified argument was out of the range of valid values.
     
    Any Ideas??..
            IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
            {
                // TODO: Create an observable source of stock quotes
                // HINT: Use both the scheduler and the quotes and think about how to create sources which are like events          
    
              return quotes.ToObservable<StockQuote>().ObserveOn(scheduler);
            }
    
            IObservable<object> Query(IObservable<StockQuote> quotes)
            {
                // TODO: Write a query to grab the Microsoft "MSFT" stock quotes and output the closing price
                // HINT: Make sure you include a property in the result which has a type of DateTime
    
              var filteredQuotes = from quote in quotes
                                   where quote.Symbol == "MSFT"
                                   select quote;
    
              return filteredQuotes;
            }
  • Hi Rachakondo,

    look at the implementation of MyHistorcalScheduler.Run(). ToObservable() will schedule all StockQuotes right away with a DueTime of 0 (because you ignore the StockQuote.Data value completely). AdvanceTo() then throws.

    I am not sure what is the right implementation of GetQuotes(). I am guessing that

    • when called it should not block but return right away (with the IObservable<StockQuote>), regardless whether iterating over the sequence of stock quotes blocks or not
    • when called several times it should produce the sequence serveral times from the beginning
    • it should call OnNext of the observer at StockQuote.Date measured in the clock time of the passed scheduler
    • one call of OnNext should be finished before calling the next time (might conflict with the point obove)

    So here is my (maybe over-complicated solution):

            IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
            {
                return Observable.Create<StockQuote>(observer =>
                    {
                        var quotesEnumerator = quotes.GetEnumerator();
                        if (quotesEnumerator.MoveNext())
                        {
                            return scheduler.Schedule(new DateTimeOffset(quotesEnumerator.Current.Date), self =>
                            {
                                observer.OnNext(quotesEnumerator.Current);
                                if (quotesEnumerator.MoveNext())
                                {
                                    self(new DateTimeOffset(quotesEnumerator.Current.Date));
                                }
                                else
                                {
                                    observer.OnCompleted();
                                    quotesEnumerator.Dispose();
                                }
                            });
                        }
                        else
                        {
                            observer.OnCompleted();
                            return quotesEnumerator;
                        }
                    });
            }
    
            IObservable<object> Query(IObservable<StockQuote> quotes)
            {
                return quotes.Where(quote => quote.Symbol == "MSFT").Select(quote => new { Date = quote.Date, Price = quote.Close });
            }
    

    It will replay the stock quotes at the times given in StockQuote.Date. The MyHistoricalScheduler will replay it as fast as possible.

    Is that what was intended?

    My graph behaves quite strangley, though.

    I only get what was intended (I think) when I sort the StockQuotes by increasing date before passing them to GetQuotes().

    Bernd

  • I had to change

    var open = decimal.Parse(elements[1]);
    var high = decimal.Parse(elements[2]);
    var low = decimal.Parse(elements[3]);
    var close = decimal.Parse(elements[4]);
    var volume = long.Parse(elements[5]);
    

    to

    var open = decimal.Parse(elements[1], CultureInfo.InvariantCulture);
    var high = decimal.Parse(elements[2], CultureInfo.InvariantCulture);
    var low = decimal.Parse(elements[3], CultureInfo.InvariantCulture);
    var close = decimal.Parse(elements[4], CultureInfo.InvariantCulture);
    var volume = long.Parse(elements[5], CultureInfo.InvariantCulture);
    
     

    in StockQuotes.cs.

  • IanGIanG IanG

    Here's an approach that worked for me:

        IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
        {
            var subject = new Subject<StockQuote>();
            quotes.ToObservable().Subscribe(
                q => scheduler.Schedule(q.Date, () => subject.OnNext(q)));
    
            return subject;
        }
    

    But although CbrAlnK advocates avoiding foreach, doing something more "LINQish", the problem is that this is really just a foreach in disguise. I think the foreach-based solutions are arguably better, because they reveal what's really happening.

    For example, one unfortunate aspect of this whole lab is that it requires the entire schedule to be loaded up before you Run the scheduler. I don't think it's entirely obvious, but a) that code snippet above evaluates the callback (the one that calls scheduler.Schedule(...)) once for every quote inside the call to Subscribe, and b) the example only works because of that synchronous execution of the quotes observable. In other words, this only works because someEnumerable.ToObservable().Subscribe is just an obscure way of writing a foreach loop.

    I was hoping to be able to do something that felt a bit more in keeping with Rx. What I wanted was some way to iterate over a sequence (either in IEnumerable<T> or IObservable<T> form) that was able to select a scheduling time out of that sequence as it generated an observable. So I wanted to be able to write this sort of thing:

        return FeedIntoScheduler(quotes, scheduler, q => q.Date);
    

    The theory there being that I could take an IEnumerable<T> and convert it into an IObservable<T> via a scheduler, where the schedule timings were provided by the Date property of the stock quote. I figured this might be waht Observable.Generate was for, and it sort of it:

        IObservable<T> FeedIntoScheduler<T>(IEnumerable<T> source, IScheduler scheduler, Func<T, DateTime> timeSelector)
        {
            return Observable.Generate(
                new { Enumerator = source.GetEnumerator(), Done = new BoolBox() },
                state =>
                    {
                        if (!state.Done.Value)
                        {
                            bool available = state.Enumerator.MoveNext();
                            if (!available)
                            {
                                state.Enumerator.Dispose();
                                state.Done.Value = true;
                                return false;
                            }
                        }
                        return true;
                    },
                state =>
                    {
                        return state;
                    },
                state => state.Enumerator.Current,
                state => timeSelector(state.Enumerator.Current),
                scheduler);
    
        }   
    // where BoolBox (which enables mutable state in the accumulator, which we need   
    // because Generate rather unhelpfully doesn't fit the enumeration idiom) 
    private class BoolBox
      {
          public bool Value { get; set; }
      }
    
    

    But there's a problem here. The actual work of kicking off the observable is done via the scheduler you pass, and it will pick whatever time the scheduler claims "Now" is. And it turns out that this will crash the historical scheduler provided in the challenge - it's incapable of handling "now" work items. You can fix this by changing the call to AdvanceTo to this:

        if (dt > Now)
        {
            AdvanceTo(dt);
        }
        else
        {
            next.Invoke();
        }
    

    But then a new problem emerges: it turns out the data is in reverse chronological order. So the first item that this schedules is the most recent one, causing the virtual clock to run right to the end, messing things up for the remaining items

    So we have to implement GetQuotes like this:

        IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
        {
            var quotesInOrder = from quote in quotes orderby quote.Date ascending select quote;
            return FeedIntoScheduler(quotesInOrder, scheduler, q => q.Date);
        }
    

    In conjunction with the FeedIntoScheduler method and the fix to MyHistoricalScheduler, this appears to work. It feels in keeping with the sort of shape I thought I was supposed to be getting for the solution - my GetQuotes builds an IObservable<T> from an IEnumerable<T>, feeding it into a scheduler according to a clearly-defined schedule.

    However, this can't be the "right" solution. For one thing, I had to fix the scheduler to deal with a scenario it wasn't built to deal with. Moreover, this is now very much more complex than the foreach versions originall submitted, and with very little discernable benefit.

    About the one advantage I could think of for this is was that it might work with an infinite sequence - the observable generator is working lazily whereas the foreach versions iterate through the entire thing before even starting the scheduler. For that to work, the enumerable source needs to produce items in order, because the "orderby" query in GetQuotes has to go - orderby can only work on a finite sequence.

    I modified the quote source to feed the items out of the file in asecending date order, and then to just make up an infinite stream of quotes thereafter. And that does actually appear to work. So perhaps this is a scenario for which this much more complex approach is "right".

    I think rab36's solution is in much the same spirit, but using Create instead of Generate. I see that the Create-based solution also ends up requiring the input to be sorted to work correctly.

  • AnonymousAnonymous

    Hi. You use ObserveOn() in the video for a windows form, but how do you do the same for WPF?

  • AnonymousAnonymous

    Got it. Thanks for the great vids!
    ObserveOn(new System.Windows.Threading.DispatcherSynchronizationContext(GUIElement.Dispatcher))

  • l bl b

    Why is this returning the whole sequence of quotes when using First()?

    IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
    {
    Subject<StockQuote> sub = new Subject<StockQuote>();
    IObservable<StockQuote> ret = sub;
    quotes.ToObservable().Do(q =>
    scheduler.Schedule(q.Date, () => sub.OnNext(q))
    ).First(); //???????

    return ret;
    }

  • MaxMax

    I know I am a bit late, but I just found out about Rx .NET few weeks ago.

    My solution is following:

    IObservable<StockQuote> GetQuotes(IScheduler scheduler, IEnumerable<StockQuote> quotes)
    {
    var subject = new Subject<StockQuote>();
    quotes.ToObservable().Subscribe((quote) =>
    {
    scheduler.Schedule(new DateTimeOffset(quote.Date), () => subject.OnNext(quote));
    });
    return subject;
    }

    IObservable<object> Query(IObservable<StockQuote> quotes)
    {
    var res = from quote in quotes
    where quote.Symbol == "MSFT"
    select new { quote.Date, ClosingPrice = quote.Close };
    return res;
    }

    it works like charm ....

    I LOVE Rx!!!

Remove this comment

Remove this thread

close

Comments Closed

Comments have been closed since this content was published more than 30 days ago, but if you'd like to continue the conversation, please create a new thread in our Forums,
or Contact Us and let us know.