// handcrafted observables // not recommended void Main() { var numbers = new NumbersObservable(5); var subscription = numbers.Subscribe(new ConsoleObserver<int>("numbers")); var numbersExt = new NumbersObservable(5); numbersExt.SubscribeConsole("numbersExt"); } public class NumbersObservable : IObservable<int> { private readonly int _amount; public NumbersObservable(int amount) { _amount = amount; } public IDisposable Subscribe(IObserver<int> observer) { for (int i = 0; i < _amount; i++) { observer.OnNext(i); } observer.OnCompleted(); return Disposable.Empty; } } public class ConsoleObserver<T> : IObserver<T> { private readonly string _name; public ConsoleObserver(string name = "") { _name = name; } public void OnNext(T value) { Console.WriteLine("{0} - OnNext({1})", _name, value); } public void OnError(Exception error) { Console.WriteLine("{0} - OnError", _name); Console.WriteLine("\t {0}", error); } public void OnCompleted() { Console.WriteLine("{0} - OnCompleted", _name); } } public static class Extensions { public static IDisposable SubscribeConsole<T>(this IObservable<T> observable, string name = "") { return observable.Subscribe(new ConsoleObserver<T>(name)); } }
// Observable.Create() void Main() { // using System.Reactive.Linq var observable = Observable.Create<int>(observer => { for (int i = 0; i < _amount; i++) { observer.OnNext(i); } observer.OnCompleted(); return Disposable.Empty; }); }