Learning Observable By Building Observable

I’ve since rewritten the piece and moved it to my personal blog here:

https://benlesh.com/posts/learning-observable-by-building-observable/

Observable is just a function that takes an observer and returns a function

If you really want to understand observable, you could simply write one. It’s not as hard as it sounds, honestly. An observable, boiled down to it’s smallest parts, is no more than a specific type of function with a specific purpose.

Shape:

  • A function
  • That accepts an observer: An object with `next`, `error` and `complete` methods on it.
  • And returns a cancellation function

Purpose:

To connect the observer to something that produces values (a producer), and return a means to tear down that connection to the producer. The observer is really a registry of handlers that can be pushed values over time.

Basic Implementation:

function myObservable(observer) {
const datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
}

Safe Observers: Make Observers Great Again

When we talk about RxJS or Reactive programming, generally observables get top billing. But the observer implementation is actually the workhorse of this type of reactive programming. Observables are inert. They’re just functions. They sit there until you `subscribe` to them, they set up your observer, and they’re done, back to being boring old functions waiting to be called. The observers on the other hand, stay active and listen for events from your producers.

Observer Guarantees

  1. If you pass an Observer doesn’t have all of the methods implemented, that’s okay.
  2. You don’t want to call `next` after a `complete` or an `error`
  3. You don’t want anything called if you’ve unsubscribed.
  4. Calls to `complete` and `error` need to call unsubscription logic.
  5. If your `next`, `complete` or `error` handler throws an exception, you want to call your unsubscription logic so you don’t leak resources.
  6. `next`, `error` and `complete` are all actually optional. You don’t have to handle every value, or errors or completions. You might just want to handle one or two of those things.
function myObservable(observer) {
const safeObserver = new SafeObserver(observer);
const datasource = new DataSource();
datasource.ondata = (e) => safeObserver.next(e);
datasource.onerror = (err) => safeObserver.error(err);
datasource.oncomplete = () => safeObserver.complete();

safeObserver.unsub = () => {
datasource.destroy();
};

return safeObserver.unsubscribe.bind(safeObserver);
}

Designing Observable: Ergonomic Observer Safety

Having observables as a class/object enables us to easily apply a SafeObserver to passed anonymous observers (and handler functions if you like the `subscribe(fn, fn, fn)` signature in RxJS) and provide better ergonomics for the developer-user. By handling the creation of a SafeObserver inside Observable’s `subscribe` implementation, Observables can again be defined in the simplest possible way:

const myObservable = new Observable((observer) => {
const datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
});

Operators: Also Just Functions

An “operator” in RxJS is little more than a function that takes a source observable, and returns a new observable that will subscribe to that source observable when you subscribe to it. We can implement a basic, standalone operator like this (again in JSBin):

function map(source, project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}

Designing Observable: Making Operator Chains Pretty

If we were to have all of our operators implemented as standalone functions like the example above, chaining our operators gets a bit ugly:

map(map(myObservable, (x) => x + 1), (x) => x + 2);
pipe(myObservable, map(x => x + 1), map(x => x + 2));
myObservable.map(x => x + 1).map(x => x + 2);
Observable.prototype.map = function (project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return this.subscribe(mapObserver);
});
};

TLDR: Observables are a function that take an observer and return a function

Keep in mind, after reading everything above, that all of this was designed around a simple function. Observables are a function that take an observer and return a function. Nothing more, nothing less. If you write a function that takes an observer and returns a function, is it async or sync? Neither. It’s a function. The behavior of any function all depends on how it’s implemented. So when dealing with an Observable, treat it like a function reference you’re passing around, not some magic, stateful alien type. When you’re building your operator chains, what you’re really doing is composing a function that sets up a chain of observers that are linked together and pass values through to your observer.

RxJS Lead. Views are my own

RxJS Lead. Views are my own