// handcrafted observables
// not recommended
void Main()
{
var numbers = new NumbersObservable(5);
var subscription = numbers.Subscribe(new ConsoleObserver<int>("numbers"));
var numbersExt = new NumbersObservable(5);
numbersExt.SubscribeConsole("numbersExt");
}
public class NumbersObservable : IObservable<int>
{
private readonly int _amount;
public NumbersObservable(int amount)
{
_amount = amount;
}
public IDisposable Subscribe(IObserver<int> observer)
{
for (int i = 0; i < _amount; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
return Disposable.Empty;
}
}
public class ConsoleObserver<T> : IObserver<T>
{
private readonly string _name;
public ConsoleObserver(string name = "")
{
_name = name;
}
public void OnNext(T value)
{
Console.WriteLine("{0} - OnNext({1})", _name, value);
}
public void OnError(Exception error)
{
Console.WriteLine("{0} - OnError", _name);
Console.WriteLine("\t {0}", error);
}
public void OnCompleted()
{
Console.WriteLine("{0} - OnCompleted", _name);
}
}
public static class Extensions
{
public static IDisposable SubscribeConsole<T>(this IObservable<T> observable, string name = "")
{
return observable.Subscribe(new ConsoleObserver<T>(name));
}
}
// Observable.Create()
void Main()
{
// using System.Reactive.Linq
var observable = Observable.Create<int>(observer =>
{
for (int i = 0; i < _amount; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
return Disposable.Empty;
});
}