Entries:
Comments:
Posts:

Loading User Information from Channel 9

Something went wrong getting user information from Channel 9

Latest Achievement:

Loading User Information from MSDN

Something went wrong getting user information from MSDN

Visual Studio Achievements

Latest Achievement:

Loading Visual Studio Achievements

Something went wrong getting the Visual Studio Achievements

Rx Workshop: Programming the Cloud

Download

Right click “Save as…”

Learn about the special challenges of distributed reactive applications and how to use distributed schedulers to write powerful reactive programs.

Download the Challenge

Tags:

Follow the Discussion

  • I got this to work, although I'm not sure if I could have made the action into an anonymous method?

    public IDisposable Subscribe(IObserver<R> observer)
    {
        GenerateState state = new GenerateState
        {
            condition = this.condition,
            current = this.initial,
            iterate = this.iterate,
            observer = observer,
            resultSelector = this.resultSelector
        };
    
        Action<GenerateState, Action<GenerateState>> action = (_state, self) =>
        {
            if (_state.condition(_state.current))
            {
                var result = _state.resultSelector(_state.current);
                _state.observer.OnNext(result);
                _state.current = _state.iterate(_state.current);
    
                self(_state);
            }
            else
                _state.observer.OnCompleted();
        };
    
        return scheduler.Schedule(state, action);
    }
    

  • dmarshdmarsh Knee draggin'
    Great stuff. Have you guys spoken with the compiler teams about potentially just adding the SerializableAttribute to closure classes and/or anonymous types in their next versions?? Having to manually introduce an MBV wrapper class for scenarios where multiple values need to be passed in seems like a it's going to turn into an exercise in typing.
  • Aaron StainbackAceHack AceHack

    |requires observer != null
    |ensures result != null
    public IDisposable Subscribe(IObserver<R> observer)
    {
        var generateState = new GenerateState()
        {
            condition = condition,
            current = initial,
            iterate = iterate,
            observer = observer,
            resultSelector = resultSelector,
        };
        return scheduler.Schedule(generateState, (state, self) =>
        {
            if (state.condition(state.current))
            {
                var result = state.resultSelector(state.current);
                state.observer.OnNext(result);
                state.current = state.iterate(state.current);
                self(state);
            }
            else
                state.observer.OnCompleted();
        });
    }
    

    This is what I came up with.  It runs in AppDomainScheduler named scheduler.  I want to build one of these schedulers to send remote entity framework queries around to different machines using WCF underneath.  Is this possible?  Any good getting started guides?

    Also I run code contracts static checker and it complains that I might return null.  I tried a few things like returning Disposable.Empty if IDisposable returned from Schedule function is null but that failed also.  What is the correct behavior here to satisfy the contract.  Thanks so much.

  • Mouhammad FakhouryMouhammad Fakhoury

    Thanks for the intro series on Rx they are very helpful! I was very excited to start using it and the first thing I tried failed miserably! I just did not have enough examples to show us how to hook up Rx to an svc service.

    I just put an IEnumrable off an svc service and tried to subscribe to it from a client. I got an error service abnormally disconnected.

    Can you show us an example on how to hookup Rx client to web service?
    Thanks!

  • gtgt Rx is sweet

    @dmarsh: good point - compiler could figure out serializable & mbr types

    Wonder why the remote queries weren't using the QObservable "provider" where expressions & values were remoted rather than type references...

  • BnayaBnaya

    the following challenge solution is
    using the (T state, Func<IScheduler,T,IDisposable> f) signature
    it more complext than the Action signature
    but may be better for real cloud schenarios

    class GenerateObservable<T, R> : IObservable<R>
    {
    private IScheduler scheduler;
    private GenerateState _state;

    public GenerateObservable(
    T initial, Func<T, bool> condition,
    Func<T, T> iterate,
    Func<T, R> resultSelector,
    IScheduler scheduler)
    {
    _state = new GenerateState
    {
    current = initial,
    condition = condition,
    iterate = iterate,
    resultSelector = resultSelector
    };
    this.scheduler = scheduler;
    }

    public IDisposable Subscribe(IObserver<R> observer)
    {
    var state = _state.Clone(observer);

    Func<IScheduler, GenerateState, IDisposable> _f =
    (scheduler_, state_) =>
    {
    Func<IScheduler, GenerateState, IDisposable> f_ = null;
    f_ =(scd, st) =>
    {
    string domainName = string.Format("Domain = {0}", AppDomain.CurrentDomain.FriendlyName);
    Trace.WriteLine(domainName);
    if (st.condition(st.current))
    {
    var result = st.resultSelector(st.current);
    st.observer.OnNext(result);
    st.current = st.iterate(st.current);
    scd.Schedule(st, f_);
    Scheduler.Immediate.Schedule(st, f_);
    //scd.Schedule(st, f_);
    }
    else
    st.observer.OnCompleted();

    return Disposable.Empty;
    };
    return f_(scheduler_, state_);
    };

    return scheduler.Schedule<GenerateState>(state, _f);
    }

    [Serializable]
    class GenerateState
    {
    public T current;
    public Func<T, bool> condition;
    public Func<T, T> iterate;
    public Func<T, R> resultSelector;
    public IObserver<R> observer;

    #region Clone

    public GenerateState Clone(IObserver<R> observer)
    {
    var state = new GenerateState
    {
    current = current,
    condition = condition,
    iterate = iterate,
    resultSelector = resultSelector,
    observer = observer
    };

    return state;
    }

    #endregion // Clone
    }
    }
    }

  • Madu AlikorMadu Alikor

    Could somebody update this series to reflect the changes in the Rx Framework and especially around all the changes around the IScheduler interfaces

  • biboudisAggelos​Biboudis What happens in the monad, stays in the monad

    I endorse @dmarsh 's thought!

  • I think Erlang is much much simpler... this stuff ... it simply smells from a mile:)

    I know that its possible to do it, but is it easy stuff, can you explain it to a junior quickly?

    what you are trying to do with this azure scheduler is similar to what erlang can do of of a box, you have a problem passing stuff across boundaries, in Erlang you can send new code to execute on remote machine, and do upgrade on a fly, I don't expect that behavior here. Its almost indistinguishable, to pass a message across processes on a single node and across processes on different nodes, I know that only form of cross process communication in Erlang is message passing, and the whole VM and language semantics is built on that notion.

    This code is self-explanatory 

    Pid ! {self(), Message}.

    or

    {process_name, Node} ! {self(), Message}.

    you see the code and you say immediately, a got it, the only what you have to know is that "!" sends a message, whereas in your example, aaaa ok I have to remember that we go remote and we have to pass values in a certain way, and we have to remember that this work should be done on remote sheduler, etc, etc, it is not simple... very long way ahead of you... locally RX is great stuff, and I'm fascinated by it, but this azure scheduler stuff is to put it simply is over the top.

Remove this comment

Remove this thread

close

Comments Closed

Comments have been closed since this content was published more than 30 days ago, but if you'd like to continue the conversation, please create a new thread in our Forums,
or Contact Us and let us know.