In this post I'll try to demystify ReactiveX's Observables, how they can be hot or cold, and how to turn one into another. By the way, it doesn't have anything to do with Katy Perry's 2008 song.
What is an Observable?
An Observable is an object that emits objects and can be subscribed by an observer (or subscriber). When an observer subscribes to an Observable, the observer must specify a reaction that will be triggered when the Observable emits a value. This behavior allows a system to have concurrent actions as it won't need to block until values are received. Observables may be hot or cold, the difference between them being that they start emitting objects at different times.
What are cold Observables?
A cold Observable waits until an observer subscribes to it to start emitting. Observables are lazy by default, meaning they only execute when an observer subscribes to it. This behavior allows an observer to observe the entire sequence of emitted values.
import { Observable, interval } from 'rxjs';
const observable$ = new Observable((observer) => {
const interval$ = interval(1000);
interval$.subscribe((value: number) => {
observer.next(value);
});
});
observable$.subscribe((value) => {
this.logger.log('Subcription #1 - Received:', value);
});
setTimeout(() => {
observable$.subscribe((value) => {
this.logger.log('Subcription #2 - Received:', value);
});
}, 2500);
// Logs
// Subcription #1 - Received: 0
// Subcription #1 - Received: 1
// Subcription #1 - Received: 2
// Subcription #2 - Received: 0
// Subcription #1 - Received: 3
// Subcription #2 - Received: 1
// Subcription #1 - Received: 4
// Subcription #2 - Received: 2
The example above illustrates the behavior of a cold Observable. Notice that the sequence of values received in the second subscription is totally independent from the first one - this is called unicasting.
What are hot Observables?
A hot Observable will start emitting as soon as it's created, and as such the values it emits are produced outside of it. A hot Observable's emitted objects are shared between its subscribers - this is called multicasting. Because of this behavior, an observer may start observing the sequence somewhere in the middle of it.
import { Observable, Subject, interval } from 'rxjs';
const subject$ = new Subject();
interval(1000).subscribe((value: number) => {
subject$.next(value);
});
const observable$ = new Observable((observer) => {
subject$.subscribe((value: number) => {
observer.next(value);
});
});
observable$.subscribe((value) => {
this.logger.log('Subcription #1 - Received:', value);
});
setTimeout(() => {
observable$.subscribe((value) => {
this.logger.log('Subcription #2 - Received:', value);
});
}, 2500);
// Logs
// Subcription #1 - Received: 0
// Subcription #1 - Received: 1
// Subcription #1 - Received: 2
// Subcription #2 - Received: 2
// Subcription #1 - Received: 3
// Subcription #2 - Received: 3
// Subcription #1 - Received: 4
// Subcription #2 - Received: 4
The example above illustrates the behavior of a hot Observable. Notice that the second subscription set of received values starts with "2", meaning that it started observing in the middle of the sequence.
Turning a cold Observable hot
To turn a cold Observable hot we use a special kind of Observable named Subject. Subjects work both as Observables and observers: it can subscribe to other Observables and reemit the items it observes, and also emit new objects.
createHotObservable<T>(coldObservable: Observable<T>) {
const subject$ = new Subject();
coldObservable.subscribe(subject$);
// Returns teardown logic, i.e. function called when the observable is unsubscribed
return new Observable((observer) => {
const sub = subject$.subscribe(observer);
return () => {
sub.unsubscribe();
};
});
}
This method doesn't track the source subscription, which means it is possible to have a subscription open when unnecessary. For that, we'll keep a reference count for how many subscriptions there are for our hot Observable, and when that counter reaches zero unsubscribe to the source.
Making a hot Observable cold
Let's assume there's a function websocket
which creates a connection using WebSockets to an endpoint as soon as it's called and returns a hot Observable. If we wish to turn this Observable to cold, we can't call this function and pass the result to a function as we did in the previous section. Instead, we will create an Observable factory, i.e. a function that returns an Observable.
const observableFactory = () => websocket();
We can pass this factory to the method below to create a cold Observable from a hot one.
createColdObservable<T>(observableFactory: () => Observable<T>): Observable<T> {
return new Observable((subscriber) => {
const subscription = observableFactory().subscribe(subscriber);
return () => {
subscription.unsubscribe();
};
});
}
Using the result of this function, the (previously) hot Observable will only start emitting values when it is subscribed.
Final thoughts
ReactiveX creates a whole plethora of possibilities for asynchronous operations in a system. Observables may seem confusing and complex at first, but don't give up on them, they're worthwhile learning.
Sources: