Understanding the observable type

October 29, 2016

The Observable type is used to model push-based data sources. Observables are compositional and lazy: can be composed with higher-order combinators and do not start emitting data until an observer has subscribed.

Observables are everywhere: we can find them used in various codebases, from Cycle.js to Angular2, from Horizon to Falcor, from Mobx to Ecmascript proposal, you name it. If you followed the ReactiveConf in Bratislava this week, I’m sure you noticed how observables were mentioned in almost every talk. I started playing with them a couple of weeks ago as I began using cycle.js (A functional and reactive JavaScript framework for cleaner code). Observables cannot be ignored, so, let’s try to wrap our head around it.

As Ben Lesh wrote: If you really want to understand observable, you could simply write one. In this post, I’ll try to do exactly that.


A basic observable

An observable, boiled down to it’s smallest parts, is no more than a specific type of function with a specific purpose. ~ @benlesh

So what is the purpose of an observable? Simply connecting an observer to a producer.

const Observable = function(producer) {
  this.subscribe = producer
}

In the above code that contract is perhaps not that clear yet. Where does the observer come into play? To understand that let’s explore the anatomy of a producer.

The producer

We can imagine the producer as being the observable own firehoose. In fact, as the name suggest, it produces and emits data. Every time the producer emit values, those get pushed to the observer. The contract to make this work require the observer to have 3 methods: ‘next’, ‘error’ and ‘complete’.

Let’s see a very basic producer that will simply emit 2 strings (‘Hello’ and ‘World’) before signaling its completion.

const helloWorldProducer = function(observer) {
  observer.next('Hello')
  observer.next('World')
  observer.complete()
}

Now that the contract within a producer and an observable is more explicit and visible, we can go ahead and create an observable using our helloWorldProducer as its data source.

const observableHelloWorld = new Observable(helloWorldProducer)

The observer

Before proceeding forward watching our new shiny observable in action, we still need to dissect the observer. As mentioned earlier an observer is nothing more than an object with 3 methods: ‘next’, ‘error’ and ‘complete’.

const observer = {
  next(value) { console.log(value) },
  error(err) { console.log('Error: ', err) },
  complete() { console.log('Done') }
}

Subscribing

To put it all together, we just have to subscribe to our observable passing in the observer

observableHelloWorld.subscribe(observer)
// -> "Hello"
// -> "World"
// -> "Done"

Play with the above code on jsBin


Handling errors

Sofar our code doesn’t handle errors. Lets try to fix this:

const helloWorldProducer = function(observer) {
  try {
    observer.next('Hello')
    // let's simulate something going wrong...
    throw new Error('Oups')
    observer.next('World')
    observer.complete()
  } catch(err) {
    observer.error(err)
  }
}

Let’s subscribe again and see what happens:

observableHelloWorld.subscribe(observer)
// -> "Hello"
// -> "Error: [object Error]"

Play with the above code on jsBin

Is worth noting that an observer doesn’t have to have all of the methods implemented.

‘next’, ‘error’ and ‘complete’ are all actually optional. You don’t have to handle every value, or errors or completions. You might just want to handle one or two of those things. ~ @benlesh


Unsubscribing

Subscribing to our observable doesn’t return anything yet, normally we would want to return a dispose function allowing us to unsubscribe from the observable, let’s add this in.

But first, in order to better understand why this could be useful, let’s refactor our hello word producer adding a bit of asyncness to it. Let’s transform it into a producer that will keep emitting an incremental counter every second:

const intervalProducer = function(observer) {
  let counter = 0
  const timer = setInterval(() => {
    try {
      observer.next(counter++)
    } catch(err) {
      observer.error(err)
    }
  }, 1000)
}

I guess you can now see why being able to unsubscribe from our observable is very important… Exactly, subscribing to an observable made with a producer like the one we just defined, will keep pushing data to us, for ever, without us being able to stop it.

To solve this, let’s make sure that we return an unsubscribe function for that:

const intervalProducer = function(observer) {
  let counter = 0

  const unsubscribe = function() {
    clearInterval(timer)
  }

  const timer = setInterval(() => {
    try {
      observer.next(counter++)
    } catch(err) {
      unsubscribe() // <- on error we also want to unsusbcribe
      observer.error(err)
    }
  }, 1000)

  return {
    unsubscribe
  }
}

we can now subscribe and at anytime we want also unsubscribe

const observableInterval = new Observable(intervalProducer)

const sub = observableInterval.subscribe(observer)

setTimeout(() => {
  sub.unsubscribe()
}, 6000)

// -> 0
// -> 1
// -> 2
// -> 3
// -> 4
// -> 5

Play with the above code on jsBin


Recap

We saw how the Observable constructor initializes a new Observable object.

As per the tc39 proposal:

The subscriber argument must be a function object. It is called each time the subscribe method of the Observable object is invoked. The subscriber function is called with a wrapped observer object and may optionally return a function which will cancel the subscription.

Finally the observer object wrap 3 functions: ‘next’, ‘error’, and ‘complete’.

If you prefer the subscribe(fn, fn, fn) signature like the one you’ll find in RxJS we could easily map them accordingly:

const helloWorldProducer = function(next, err=()=>{}, complete=()=>{}) {
  const observer = (typeof next !== 'function') ?
    next :
    { next, err, complete }

  observer.next('Hello')
  observer.next('World')
  observer.complete()
}

Allowing us to subscribe by simply passing one or more callback:

observableHelloWorld.subscribe(
  (value) => console.log(value)
)
// -> "Hello"
// -> "World"

Play with the above code on jsBin


In the next posts, we’ll go through

  • Implementing different observables, like ‘fromArray’ or ‘fromEvent’
  • Understanding Hot vs Cold Observables and multicasting
  • Implementing some of the basic operators you could find in a stream library like ‘rxjs’

Resources worth checking:

Many thanks go to André Staltz, Jafar Husain, Ben Lesh and all the great people that wrote great articles and produced great resources that are helping me better understand the topic. I’m still fresh on the subject, so I probably misunderstood something or got some things wrong, if so, please do let me know.

Further must-read resources that I highly recommend:


All the articles of the “Understanding the observable type” series:

  1. Implementing the Observable type from scratch
  2. Creating observables out of arrays, events and promises
  3. Composing observables streams
comments powered by Disqus