Rx: Controlling frequency of events

One of the strengths of the Reactive Framework is the number of LINQ operators that ship with it.  This can also be a weakness though, as you wade through the list of sometimes-strangely named method looking for something in particular.  I recently wanted an operator that would have the following effect on an event stream:

regulate marble diagram

In other words, if events get raised too often then delay them so that the output stream has a defined maximum frequency of events over time. 

My first attempt at an implementation was to use the built-in Throttle operator, although I had a nagging feeling that this one doesn’t quite do what I expect it to.  My nagging sensation was proved correct … Throttle will throw away events if they are too frequent. In the end I resorted to building my own operator, which I call Regulate.  It’s a bit long, as it needs to address some threading considerations, but it’s not that difficult to follow:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace SharpFellows.RxUtils
{
    public static class ObservableExtensions
    {
        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration)
        {
            return Regulate(observable, duration, Scheduler.TaskPool);
        }

        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration, IScheduler scheduler)
        {
            var regulator = new ObservableRegulator<T>(duration, scheduler);

            return Observable.Create<T>(observer => observable.Subscribe(obj => regulator.ProcessItem(obj, observer)));
        }

        private class ObservableRegulator<T>
        {
            private DateTimeOffset _lastEntry = DateTimeOffset.MinValue;
            private readonly object _lastEntryLock = new object();

            private readonly TimeSpan _duration;
            private readonly IScheduler _scheduler;

            public ObservableRegulator(TimeSpan duration, IScheduler scheduler)
            {
                _duration = duration;
                _scheduler = scheduler;
            }

            public void ProcessItem(T val, IObserver<T> observer)
            {
                var canBroadcastNow = false;
                var nexEntryTime = DateTimeOffset.MaxValue;
                lock (_lastEntryLock)
                {
                    var now = DateTimeOffset.Now;
                    if (now.Subtract(_lastEntry) > _duration)
                    {
                        _lastEntry = now;
                        canBroadcastNow = true;
                    }
                    else
                    {
                        _lastEntry = _lastEntry.Add(_duration);
                        nexEntryTime = _lastEntry;
                    }
                }

                if (canBroadcastNow)
                {
                    observer.OnNext(val);
                }
                else
                {
                    _scheduler.Schedule(nexEntryTime, () => observer.OnNext(val));
                }

            }
        }
    }
}

The result is that you can specify the minimum time between events in the output stream (and optionally a scheduler):

return service.GetIrregularEvents()
              .Regulate(TimeSpan.FromSeconds(1));

FYI my use of this is to slowly drip feed items onto a data-bound UI.  Each new item triggers an animation and I don’t want 5 items simultaneously starting to animate.  Drop me an email or leave a comment if you find other uses for it!

A random walk in Rx

A random walk can be defined as “a stochastic process consisting of a sequence of changes each of whose characteristics (as magnitude or direction) is determined by chance”.  Or to put it another way, from a starting point, keep adding (or subtracting) random numbers … the resulting stream of numbers is a random walk.

Now I recently needed to demonstrate some graphing capability and decided that I would plot a continuous random walk.  Here was my first attempt:

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IObservable<double> _randomWalk;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1);
    }
    public IObservable<double> Tick
    {
        get { return _randomWalk; }
    }
}

This did OK until I added a second graph and noticed something odd.  What was happening was that each graph was different even though my IOC (MEF) was configured to create this service as a singleton (CreationPolicy.Shared).  And then I realised that the two graphs subscribing to the Tick property were each generating their own stream of data, since the Observable.Interval does not start until someone subscribes to it (and then it starts again when someone else subscribes to it).  So my second attempt hinged on the Publish method, which allows an observable stream to be reused:

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IConnectableObservable<double> _randomWalk;
    private IDisposable _connection;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1)
                                .Publish();
    }
    public IObservable<double> Tick
    {
        get
        {
            if(_connection == null)
                _connection = _randomWalk.Connect();
            return _randomWalk;
        }
    }
}

The Tick property has gotten a bit ugly here and I wasn’t quite sure why I had this disposable instance kicking around.  It turned out that while my two charts now plotted the same stream of numbers, the numbers didn’t stop even when I closed all my charts.  #Fellow Andy pointed me in the direction of the RefCount method and so here is my final code (which is neater works nicely):

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IObservable<double> _randomWalk;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1)
                                .Publish()
                                .RefCount();
    }

    public IObservable<double> Tick
    {
        get { return _randomWalk; }
    }
}

RX: Detecting the end of a rapid series of events

Storing application state and user preferences is a pretty common problem.  And knowing when to save them can be a tricky problem in a distributed application (such as a Silverlight client) … save too often and you will use too much bandwidth just for saving preferences, save too infrequently or irregularly and users might not get their preferences saved at all.

Now I have a service for holding and saving user preferences, which is a pretty common situation.  It is notified when the saved preferences are dirty (e.g. as the user changes values on a form, say) and it has to decide when to actually collect and save all the values.  Now my service uses the Reactive Framework (RX) internally and Andy (a fellow RX junkie) insisted I post this:

public class UserPreferenceService : IUserPreferenceService
{
    private readonly TimeSpan _cooloffPeriod = TimeSpan.FromSeconds(10);
    private ISubject<string> _changeNotification;
    
    public void PreferencesAreStale()
    {
        _changeNotification.OnNext( null );
    }

    private void BeginListeningToChangeNotifications()
    {
        _changeNotification = new Subject<string>();

        // We apply a timestamp to all changes.  Then we delay these changes and recombine them with the latest changes.
        //      Comparing these timestamps allows us to work out if there have been any subsequent changes during the 
        //      delay period.  We allow some leeway (50ms) in the timestamps, since the two observable stream are delayed
        //      execution and so can get timestamped slightly differently.

        var timestampedChanges = _changeNotification.Timestamp();
        var delayedChanges = timestampedChanges.Delay( _cooloffPeriod );

        timestampedChanges
            .CombineLatest( delayedChanges, ( latest, delayed ) => latest.Timestamp.Subtract( delayed.Timestamp ) )
            .Where( timeDifference => timeDifference.TotalMilliseconds < 50 )
            .Subscribe( _ => PersistPreferences() );
    }

    private void PersistPreferences()
    {
        // Collects and saves preferences
    }
}

 

I think the comments are pretty clear with regard to the implementation details, but the overall result is that preferences get saved 10 seconds after the last update in a rapid series and no sooner.  Updates coming in more frequently than one every 10 seconds do not trigger a save operation.

November 21 2010

Composing away from friction

 

The reason it burns

Separation of concerns is key to flexible development and adding new features without friction. If you need to modify an existing piece of code away from it’s initial and current intention it’s probably time to rethink your design. I recently came across this when putting together NBehaves VS2010 plugin, initially the gherkin parsers sole responsibility was to supply classifications for the syntax highlighting. However as we progressed it was evident it was going to need to handle other language features such as intellisense and glyphs.

It looked something like this:

public class GherkinParser : IListener, IClassifier
{
    private IList<ClassificationSpan> _classifications;

    [Import]
    private IClassificationRegistry ClassificationRegistry { get; set; }
    
    public void Initialise(ITextBuffer buffer){
	buffer.Events += Parse(buffer.GetText());
    }

    private void Parse(string text)
    {
	try
	{
	    var languageService = new LanguageService();
	    ILexer lexer = languageService.GetLexer(text), this);
	    lexer.Scan(new StringReader(text)));
        }
        catch (LexingException) 
	{ 
	    /* Ignore mid typing parsing errors until we provide red line support */
	}
    }    

    public void Feature(Token keyword, Token name)
    {
	// Some complex processing on keyword and name ommitted for clarity.
        AddClassification(keyword, name, ClassificationRegistry.Feature);
    }

    public void Scenario(Token keyword, Token name)
    {
	// Some complex processing on keyword and name ommitted for clarity.
        AddClassification(keyword, name, ClassificationRegistry.Scenario);
    }

    private void AddClassification(Token keyword, Token name, IClassificationType classificationType)
    {
        // Some complex processing of text positions ommitted for clarity.
        _classifications.Add(keyword.ToClassificationSpan(classificationType));
    }

    public IList<ClassificationSpan> GetClassificationSpans(SnapshotSpan span)
    {
        return _classifications;
    }
}

This code is grossly simplified but it gets the idea that the parsers sole reason for being is keyword classifications. To add new features which depend on parsing but aren’t related to syntax highlighting we need to edit the parser, and that violates SRP. To make this code more flexible and extensible we need to do some work:

  1. Make the parsers sole responsibily to handle the buffers events.
  2. Format the events in a way easily consumable by future features.
  3. Publish when its parsing or idle.

So let’s tackle these one by one and then move onto how we are going to consume this new format and make new features.

Dousing the fire

Effectively what I see this particular class doing, is consuming the lexers messages, and republishing them in a more consumable way for this particular application. The reactive extensions were built for this type of scenario, so let’s begin by consuming the buffers events:

IObservable<IEvent<TextContentChangedEventArgs>> fromEvent =
    Observable.FromEvent<TextContentChangedEventArgs>(
        handler => textBuffer.Changed += handler,
        handler => textBuffer.Changed -= handler);

_inputListener = fromEvent
    .Sample(TimeSpan.FromSeconds(1))
    .Select(event1 => event1.EventArgs.After)
    .Subscribe(Parse);

In a single line, we reduce the amount of messages produced by the user typing fast in visual studio, select the part of the message we need (text after the users change), and subscribe to the new feed. So we are now consuming the events we need to publish them…

private Subject<ParserEvent> _parserEvents;

public IObservable<ParserEvent> ParserEvents
{
    get { return _parserEvents; }
}

 

This makes it easy for anyone features who need to consume data from the parser to pick up the events. ParserEvent is a simple DTO with the message specific data inside. Pushing data to the subscribers is now as simple as:

public void Scenario(Token keyword, Token name)
{
    _parserEvents.OnNext(new ParserEvent(ParserEventType.Scenario)
    {
        Keyword = keyword.Content,
        Title = name.Content,
        Line = keyword.Position.Line,
        Snapshot = _snapshot
    });
}

Great, this has nothing to do with classifications or syntax highlighting, this parser is fairly generic and hopefully we won’t need to make any major changes to it for a while. To satisfy the last point of letting subscribers know when we are parsing, we simply create a new subject and push to it when we are working:

public IObservable<bool> IsParsing
{
    get { return _isParsing; }
}

private void Parse(ITextSnapshot snapshot)
{
    _isParsing.OnNext(true);
    _snapshot = snapshot;

    try
    {
        var languageService = new LanguageService();
        ILexer lexer = languageService.GetLexer(snapshot.GetText(), this);
        lexer.Scan(new StringReader(snapshot.GetText()));
    }
    catch (LexingException) { }
    finally
    {
        _isParsing.OnNext(false);
    }
}

 

Now that the separation is complete, we can take a look at moving the classifications to a new model.

Putting the house back together

Having one large single class performing all the classifications was a little too cumbersome. And also I wanted to add classifications incrementally with minimal disruption, so I decided to use composition to facilitate the separation and aggregation of these component parts. Each part of the language has it’s own classifier, and we use MEF to pull in the available classifiers and delegate the processing to them.

[ImportMany]
public IEnumerable<IGherkinClassifier> Classifiers { get; set; }

 

_listeners.Add(_parser
    .ParserEvents
    .Select(parserEvent => Classifiers
        .With(list => list.FirstOrDefault(classifier => classifier.CanClassify(parserEvent)))
        .Return(gherkinClassifier => gherkinClassifier.Classify(parserEvent), _noClassificationsFound))
    .Subscribe((spans => _spans.AddRange(spans))));

 

The ImportMany attribute allows us to bring in a collection of classifiers in our assembly or wherever we told the container to look for possible exports. Then we subscribe to the parsers observable stream of parser events and pass each event to the classifiers.

Now each classifier handles a single classification, and its obvious to new developers what each file does.

[Export(typeof(IGherkinClassifier))]
public class FeatureClassifier : GherkinClassifierBase
{
    public override bool CanClassify(ParserEvent parserEvent)
    {
        return parserEvent.EventType == ParserEventType.Feature;
    }

    public override void RegisterClassificationDefinitions()
    {
        Register(parserEvent => GetKeywordSpan(parserEvent));
        Register(parserEvent => GetTitleSpan(parserEvent, ClassificationRegistry.FeatureTitle));
        Register(parserEvent => GetDescriptionSpan(parserEvent.Line, parserEvent.Description, parserEvent.Snapshot));   
    }

The solution structure also now reveals a lot about the functionality on offer and where things should sit.

image

So when it comes to adding new visual studio language features, we should hopefully have a much easier and friction-free time.

Older Posts