Aggregating Results Across Sequential RxJS Requests
Published on April 23, 2021
Author: Will Yoxall
Something in RxJS that comes up often is the question of how to best aggregate the results of a number of sequential requests - the out-of-box solution feels more complicated than it should be for something that is a fairly basic requirement.
In this article we will show some options for how to achieve this using standard RxJS. This will explain our motivation for implementing a utility function (concatJoin) to make the intention of the code more explicit by hiding the mechanics of maintaining the aggregate result. This function is available in the npm package rxjs-concat-join.
To illustrate the problem, let’s start with a simple sequence of 3 requests (a “request” being an observable that just emits 1 value and completes such as an http request). Here is a basic implementation:
return request1.pipe(
mergeMap(result1=>request2(result1)),
mergeMap(result2=>request3(result2)),
);
Note that each request is based on the results of the previous request, which is typically why we do them in sequence rather than in parallel. There are other reasons though why requests that could otherwise be in parallel might be done in sequence, such as reducing server load, progress reporting and ability for the user to cancel, or because of underlying dependencies (eg for related database updates).
In this article we will show some options for how to achieve this using standard RxJS. This will explain our motivation for implementing a utility function (concatJoin) to make the intention of the code more explicit by hiding the mechanics of maintaining the aggregate result. This function is available in the npm package rxjs-concat-join.
To illustrate the problem, let’s start with a simple sequence of 3 requests (a “request” being an observable that just emits 1 value and completes such as an http request). Here is a basic implementation:
return request1.pipe(
mergeMap(result1=>request2(result1)),
mergeMap(result2=>request3(result2)),
);
Note that each request is based on the results of the previous request, which is typically why we do them in sequence rather than in parallel. There are other reasons though why requests that could otherwise be in parallel might be done in sequence, such as reducing server load, progress reporting and ability for the user to cancel, or because of underlying dependencies (eg for related database updates).
Note also the use of mergeMap. One question that arises with this pattern is what form of xMap to use? This is discussed in detail in a later section, but the short answer is that the question is spurious: it makes no difference which is used. This is illustrated by the fact we can use concat instead (if we aren’t interested in the interim values) without having to choose between different xMap types:
concat(request1, request2, request3).pipe(last());
The last() at the end is required because concat emits every value emitted by the inner observables. In fact, this pattern without the last() is interesting if we do want to emit a value as each request completes, such as when tracking progress.
And if we only want to gather the results at the end, the concat pattern can be used with toArray instead of last():
concat(request1, request2, request3).pipe(toArray());
So far that’s all fairly straightforward. Now we come to the specific challenge: how would we change this so that any request in the sequence can use, as input, the result of any of the previous requests?
In essence this challenge boils down to aggregating the results as we go down the pipe, so that the results of all previous requests are available at any point, and the aggregate of all results is output from the sequence as a whole.
concat(request1, request2, request3).pipe(last());
The last() at the end is required because concat emits every value emitted by the inner observables. In fact, this pattern without the last() is interesting if we do want to emit a value as each request completes, such as when tracking progress.
And if we only want to gather the results at the end, the concat pattern can be used with toArray instead of last():
concat(request1, request2, request3).pipe(toArray());
So far that’s all fairly straightforward. Now we come to the specific challenge: how would we change this so that any request in the sequence can use, as input, the result of any of the previous requests?
In essence this challenge boils down to aggregating the results as we go down the pipe, so that the results of all previous requests are available at any point, and the aggregate of all results is output from the sequence as a whole.