Part 1: We need channels! A gentle introduction to communicating sequential processes.
CSP is a formal language for describing patterns of interaction in concurrent systems. It's used in Go, Crystal, Clojure's core.async and a couple of other places. The idea in its core is nothing so complicated but it offers some interesting capabilities. Surprisingly enough it is not really popular in JavaScript. Recently I'm exploring this pattern and here are my findings.
This blog post is part of the series introducing Riew library. Reactive library that aims to solve communication and synchronization issues between your view and business logic.
- We need channels! A gentle introduction to CSP.
- Riew - reactive view basics
- Riew - reactive view in patterns
(In the beginning of this article we are defining a problem which is solved by using the Mediator and then PubSub pattern. After that we introduce CSP as a concept and show how it solves the same problem. If you want to have a straight intro to CSP feel free to jump here.)
Table of contents:
- The test case
- Brute-force approach
- The mediator pattern
- The publisher-subscriber pattern
- Communication vs synchronization
- Communicating sequential processes (CSP)
- Final words
The test case
Let's say that we have two functions - A
and B
. The first one knows something that the second one needs in order to do its job. The result of the second one B
is required by the first one to continue working. These two functions depend on each other. A
has a little bit of parentship on B
. We can say that A
will probably trigger B
by passing what it knows and will consume what B
returns. So, in such situation we have to think about two things - communication and synchronization. We have data to pass back and forth and we have logic that needs to run in a specific order.
Brute-force approach
Let's add some more details to the case. We will assume that the goal of this example is to calculate a Fibonacci sequence number where our sequence looks like that 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89
. Our function A
receives the index and B
knows how to calculate the number behind that index in the sequence. We will also assume that B
does the calculation asynchronously. We will start with the following:
async function A(n) {
const value = await B(n);
console.log(`Fibonacci number for index ${n} is ${value}`);
}
function B(n) {
const fibonacci = num => {
if (num <= 1) return 1;
return fibonacci(num - 1) + fibonacci(num - 2);
};
return new Promise(done => {
setTimeout(() => done(fibonacci(n)), 1000);
});
}
A(10);
If we run this code we will get a delay of 1 second and will see "Fibonacci number for index 10 is 89"
.
Of course this brute-force approach doesn't really scale. That's because of this direct call of B
inside A
. These two functions are tightly coupled together now. We can't write such code everywhere because we will end up with a giant net of calls which is definitely difficult to maintain and extend. Let's take a step back and think what exactly these two function do.
A
: I know the index. I just need someone to calculate the actual Fibonacci number. I will then print it to the console.
B
: I know how to calculate a Fibonacci number. I just need the index and some time to do it.
We can conclude that A
and B
could be completely separated entities and still function properly.
The mediator pattern
What if we put something next to A
and B
. Then this thing will coordinate the work between the two functions and they will work in isolation.
const Mediator = {
async run(index) {
const number = await B(index);
A(index, number);
},
};
Now A
doesn't need to call B
because that's a job of the Mediator
. It simply consumes the result of the B
's work.
async function A(index, value) {
console.log(`Fibonacci number for index ${index} is ${value}`);
}
function B(n) {
const fibonacci = num => {
if (num <= 1) return 1;
return fibonacci(num - 1) + fibonacci(num - 2);
};
return new Promise(done => {
setTimeout(() => done(fibonacci(n)), 1000);
});
}
Mediator.run(10);
That is slightly better because A
has no idea about the existence of B
. We also have a layer where we can switch B
with a more performant implementation of the Fibonacci sequence.
As very often happens though, the advantage of a particular pattern becomes its disadvantage in a long term. The presence of a mediator layer makes us put a lot of things there. It becomes complex and suddenly knows a lot about the system. The Mediator
type of objects gets into serious intimacy with most of the entities in our system and that's not good.
What if there is still a mediator but he is blind and knows nothing about our components. Someone that will only orchestrate the processes.
The publisher-subscriber pattern
There is another pattern called publisher-subscriber (PubSub) which again brings a layer between the elements in our system and successfully decouples them. The problem that we see in the mediator pattern simply doesn't exist here because we have no direct references to entities.
The publisher and the subscriber in this pattern have access to a central bus (some people call it channel). Through that central bus they exchange messages. There are other patterns like the observer pattern where the publisher of the message knows specifically who is going to receive it but in the PubSub pattern this information is arbitrary. We simply put a message on the bus and whoever is interested in it picks it up from there.
Let's transform our Fibonacci example to use the PubSub pattern. Both functions need to change completely. We will keep only the fibonacci
function inside B
.
function A(n) {
Bus.subscribe('FIBONACCI_NUMBER', (index, number) => {
console.log(`Fibonacci number for index ${index} is ${number}`);
});
Bus.send('CALCULATE_FIBONACCI', n);
}
function B() {
const fibonacci = num => {
if (num <= 1) return 1;
return fibonacci(num - 1) + fibonacci(num - 2);
};
Bus.subscribe('CALCULATE_FIBONACCI', n => {
setTimeout(() => {
Bus.send('FIBONACCI_NUMBER', n, fibonacci(n));
}, 1000);
});
}
B();
A(10);
We will see the implementation of the Bus
in a bit but before that let's focus on how A
and B
uses it. A
needs the Fibonacci number so it subscribes to a messaged called FIBONACCI_NUMBER
. It doesn't know who is sending it. It just knows that it will come with a index
and a number
. Then it makes its request by sending out CALCULATE_FIBONACCI
. Again it doesn't know who will receive it. On the other side we have the function B
. It waits for CALCULATE_FIBONACCI
message, calculates the Fibonacci number and sends the result in a FIBONACCI_NUMBER
message. Same as function A
function B
is absolutely blind for the outside world. It only knows about the Bus
. And here is how that bit looks like:
const Bus = {
_subscribers: {},
subscribe(type, callback) {
if (!this._subscribers[type]) this._subscribers[type] = [];
this._subscribers[type].push(callback);
},
send(type, ...payload) {
if (this._subscribers[type]) {
this._subscribers[type].forEach(s => s(...payload));
}
}
};
We have that internal structure _subscribers
that keeps track of the message listeners. The send
function then distributes the messages to the relevant callbacks. Of course there are some edge cases that we are not covering. For example what happens when we try to subscribe for the same message twice or what if we want to unsubscribe. Those questions can be answered on a Bus
level. We just need to provide more APIs. What is more interesting is the problems of the PubSub pattern as a whole. Similarly to the mediator patter the highly decoupling comes with a price. Here are a few points to consider:
- We have no visibility on the status of the message. Is it delivered successfully or not.
- We may encounter malicious publishers. Imagine that you have a clash of message names. Our subscribers will receive the messages in a wrong order and potentially with wrong payload. On the other side there may be a subscriber that receives a message which is not for him.
- A small utility like the
Bus
above may become really complex if we decide to support more features. Think about adding namespaces. This will drastically change the API and the internal structure.
Communication vs synchronization
Both, the mediator and PubSub patterns, are widely used today. And indeed if we don't overuse them we may achieve a decently designed system. If we however look back at the beginning of this article we will see that by applying this patterns we were only solving the communication problem but not the synchronization one. We successfully decoupled the two functions but we still have to manually synchronize their work. Look at the PubSub transformation of A
and B
. At the end we have:
B();
A(10);
The complexity in our applications very often comes not from communication issues but from a synchronization ones.If we swap these two lines and run A
first, the result will be different. That is because A
will trigger its request for calculation but B
is not listening yet. This I think is the elephant in the room which no one is talking about. The complexity in our applications very often comes not from communication issues but from a synchronization ones.
A
needs the Fibonacci number from B
before printing to the console. As we saw above our code needs to be written in a specific way to respect this needs. Wouldn't be nice if we just run A
and B
in a random order and there is a mechanism which synchronizes them. Well, there is 👉 communicating sequential processes.
Communicating sequential processes (CSP)
We already learned about the PubSub pattern. In there we have a central bus which receives our messages and broadcasts them to subscribers. Think about the CSP as a PubSub pattern but with many buses. We will call them channels. We can put a message on the channel and on the other side there is someone to take it. Similar to what we saw so far with one significant difference - you can't put to the channel if there is no one to take and you can't take unless there is someone to put.
Let's use again our A
and B
functions and try implementing this idea. We have to be able create multiple channels and each one of them should have put
and take
methods.
function createChannel() {
return {
put(data) { ... },
take() { ... }
};
}
We may want to put data into the channel but if there is no taker the operation should be blocking. Or if we talk into JavaScript, calling put()
needs to return something that will allow us to pause the current function. A promise will do the job because we can await
on it. And that promise will be resolved when there is a take()
call. So, we need internal structures to keep the pending puts and pending takes and deal with them later in time.
function createChannel() {
const puts = [];
const takes = [];
return {
put(data) { ... },
take() { ... }
};
}
Now, let's implement the put
operation. We will assume that the items in the takes
array are functions that expect the data sent to the channel.
put(data) {
return new Promise(resolvePut => {
if (takes.length > 0) {
takes.shift()(data);
resolvePut();
} else {
puts.push(() => {
resolvePut();
return data;
});
}
});
}
As we said above, this is potentially a blocking operation so we return a promise. Then we check if there is a pending take. If yes, we pull it out from the takes
array and pass the data. Job is done, the data is consumed so we resolve the promise right away. However, if there are no takes we add an entry into the puts
array. It's a function which called will resolve the promise and will return the data pushed to the channel.
The take
is slightly simpler. Again we start by returning a promise:
take() {
return new Promise(resolveTake => {
if (puts.length > 0) {
resolveTake(puts.shift()());
} else {
takes.push(resolveTake);
}
});
}
Then we check if there are pending put. If that's the case we pull from the beginning of the puts
array and run it. It should return the data pushed to the channel and we use it to resolve the promise. If there are no puts we register a pending take.
With that we have a factory function that creates channels. Let's use it in the context of our Fibonacci example.
const channel = createChannel();
async function A(index) {
await channel.put(index);
const value = await channel.take();
console.log(`Fibonacci number for index ${index} is ${value}`);
}
async function B() {
const fibonacci = num => {
if (num <= 1) return 1;
return fibonacci(num - 1) + fibonacci(num - 2);
};
const index = await channel.take();
setTimeout(() => {
channel.put(fibonacci(index));
}, 1000);
}
A(10);
B();
We can compare this code with the one used in the PubSub section and we will see that both versions are fundamentally identical. Function A
in both cases sends out the index and waits for the Fibonacci number to come. Function B
waits for the index, calculates the number and send it out. There is one big difference though - everything in the CSP variant is synchronized. Somehow everything works even if we run A
first and B
second. The result is still Fibonacci number for index 10 is 89
. We can swap them and we will have the same sentence in the console. We can even go further and write
A(10);
B();
B();
A(0);
A(3);
B();
and we will get the calculations and the logs correctly:
Fibonacci number for index 10 is 89
Fibonacci number for index 0 is 1
Fibonacci number for index 3 is 3
No matter how we position the calls we will have both functions working together and printing correct answers. That is because the channel synchronizes the puts and the takes. Go here poet.krasimir.now.sh/e/zvPpnQ1RPSJ to play with the code and see it in action.
If we run the same sequence of A
/B
calls within the PubSub section we'll get a really weird response. We'll do new subscriptions every time when we run the functions. The data put and taken from the bus (channel) will be out of sync. If we want to get the same response we'll have to complicated our implementation a lot.
Final words
I'm very excited about CSP concepts coming more to JavaScript. There are definitely benefits and we already saw some of them.
- The CSP channel can be used as a communication tool.
- The channels can synchronize the processes in our application.
It is also interesting to see how this pattern changes the developer experience. If nothing else the asynchronous processes will be written as synchronous. Also CSP gives us more control on what goes where so it should be better in terms of system visibility.
This blog post is part of the series introducing Riew library. Reactive library that aims to solve communication and synchronization issues between your view and business logic.
- We need channels! A gentle introduction to CSP.
- Riew - reactive view basics
- Riew - reactive view in patterns