The “mission statement” for the Rx library can be summarised as follows:
Rx is a library for composing asynchronous and event-based programs using observable collections.
In practical terms, Rx provides a Linq-like API for handling events, allowing the developer to aggregate, filter and query them in a concise and familiar fashion.
Within the next few sections, I will provide a very brief overview of the Rx library, with the rest of the article focusing on practical and fun examples. For a much more detailed tutorial, I would recommend reading “
DEVHOL202 – Curing the asynchronous blues with the Reactive Extensions for .NET”.
Probably the easiest way to gain a quick understanding of Rx is to compare it with Linq. The following example shows some simple logic that finds all the odd numbers in a short sequence:
Collapse |
Copy Code
List<int> numbers = new List<int>()
{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
};
foreach (int number in numbers)
{
if (number % 2 == 1)
{
Debug.WriteLine(number);
}
}
Running the above code produces the following output:
Collapse |
Copy Code
1
3
5
7
9
We can re-write this simple algorithm using the Linq
Where operator as follows:
Collapse |
Copy Code
var oddNumbers = numbers.Where(n => n % 2 == 1);
foreach (int number in oddNumbers)
{
Debug.WriteLine(number);
}
The Linq version of our code to find the odd numbers produces the same result as the ‘manual’ approach, however, the way in which it is executed is quite different. The Linq query is constructed by applying the
Where operator to our source data, however, at this point the condition is not evaluated on the source. If we expand the
foreachloop, we can see that it uses an enumerator which is obtained from the result for our query.
Collapse |
Copy Code
IEnumerable<int> oddNumbers = numbers.Where(n => n % 2 == 1);
IEnumerator<int> enumerator = oddNumbers.GetEnumerator();
while (enumerator.MoveNext())
{
Debug.WriteLine((int)enumerator.Current);
}
(
NOTE: The expansion actually adds a
try /
finally block, see the
C# language reference)
Each call to the
MoveNext method on the enumerator is ‘pulling’ data from our query, with the condition being evaluated on each element of the source as and when it is needed. This ‘pull’ model results in deferred execution (or lazy evaluation depending on your preferred terminology) of the query. In the above example, our
IEnumerable source is a list of fixed size, although in more general terms it is simply a source of data from which we can pull items and does not have to have a fixed size.
We can achieve exactly the same result, creating a list of odd numbers, using the Rx library as follows:
Collapse |
Copy Code
List<int> numbers = new List<int>()
{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
};
var observableNumbers = numbers.ToObservable();
var oddNumbers = observableNumbers.Where(n => n % 2 == 1);
oddNumbers.Subscribe(number => Debug.WriteLine(number));
Again, the output of the above code is exactly the same as the Linq equivalent. The
ToObservable extension method returns an
IObservable which is the Rx analogue of the
IEnumerable interface in that it is a source of items. The Rx library defines many Linq-like extension methods for manipulating
IObservable sources of data, if you explore the above code via Intellisense, you will find many familiar methods (
Where,
Select,
Max,
SelectMany...).
The Rx library also defines the
IObserver interface which is an analogue of the
IEnumerator interface that is ‘hidden’ by the
foreach syntax. The
IObservable has a
Subscribe method where you provide an
IObserver. The observable source will invoke the
OnNext method on the
IObservable as items are pushed. Often
IObserver is hidden via the
Subscribe extension methods on
IObservable which creates an
IObserverinstance for you, with your delegate method invoked when
OnNext is invoked by the observable source.

So, we have seen the similarities between Linq and Rx, but what are the differences?
The key difference is in what ‘drives’ the execution. With Linq, we iterate over the result of the query, ‘pulling’ the items from the
IEnumerable source. With Rx, as soon as the subscription to our
IObservable source is made, the items are ‘pushed’ to our subscriber.
The above trivial example was selected in order to highlight the similarities between Rx and Linq, and the net result is exactly the same. Where Rx comes into its own is the extensions methods it provides to create
IObservablesources from events or asynchronous web service calls, allowing the developer to apply familiar Linq style operations. We’ll look at a slightly more interesting example next, using Rx to handle and manipulate events.
You can create an observable source from an even via the
FromEvent factory method where you supply the event source (in our case a
TextBox) and the name of the event. When you subscribe to this source, your code is executed each time the event is fired.
Collapse |
Copy Code
var textChangedSource = Observable.FromEvent<TextChangedEventArgs>
(searchTextBox, "TextChanged");
textChangedSource.Subscribe(e => Debug.WriteLine(((TextBox)e.Sender).Text));
Typing in the text ‘reactive’ yields the following output:
Collapse |
Copy Code
r
re
rea
reac
react
reacti
reactiv
reactive
NOTE: If the hard-coded event string in the above example offends, there is a slightly more cumbersome FromEvent overload which allows you to specify add and remove handler actions explicitly.Whilst the above example is not terribly exciting, now that we have the event packaged as an observable, we can perform Linq-style queries. In the following example, a
Select projection operator is used to create an observable which contains just the text of the
TextBox. This is then transformed via a second
Select to create an observable which provides length changed ‘events’.
Extracted From -
http://www.codeproject.com/Articles/132966/Exploring-Reactive-Extensions-Rx-through-Twitter-a