Indroduction - The Use Case of Polling

One of the most common things done by a web application is to call a server endpoint to invoke some action. The action could mean buying a product, registering an account or adding an article to the database. Most of the time, such actions are short running. By short running I mean that they complete within the duration of a single HTTP request and the server can send a response indicating whether the action succeeded.

However, not all actions are short running. Sometimes, the action may require some significant computations or even manual input. To name a few examples:

  • Image recognition on an uploaded photograph
  • ID document verification by an actual human
  • Heavy calculation on a large data set

In such cases, the action may not finish before the request times out. As a result, there is no way of informing the user whether the action succeeded or not (unless you notify them some other way, e.g. by e-mail).

Polling Explained

There are several solutions to this problem but in this post we’ll focus on polling. In order to implement polling, the API needs to be implemented in a special way. The endpoint to invoke the action should support at least two HTTP methods:

  • POST for initiating the action
  • GET for checking the status of the action (and getting the results if available)
  • (optionally) DELETE for cancelling an already started action

Given such endpoint, polling can be described by the following steps:

  1. The frontend sends a POST request to the endpoint to initiate the action. The server replies with an identifier representing the action.
  2. The frontend sends a GET request to the endpoint, passing the identifier as a parameter. The server replies with a status of the action - either in progress or done.
  3. The frontend keeps repeating the GET request as long as the status is not done every X milliseconds.
  4. The last response to the GET request should contain the actual results.

Implementing Polling with RxJS

As you can see, the polling algorithm consists of several steps, all of them being asynchronous. It is exactly the kind of scenario where RxJS shows its usefulness. Let’s see how to implement polling with RxJS in just a few lines of code.

In this example we’re working with an API for analysing the sentiment of a long piece of text. On a high level, given a string the API returns a boolean value indicating whether the sentiment of the text is postive or negative. Since the analysis can take up to a few minutes, the API is asynchronous and requires polling. The exact shape of the API is as follows. You can find a very basic implementation of this API here.

  • POST request to /analyze with an example payload of { message: "text to be analyzed" } starts an analysis job and returns its identifier
  • GET request to /analyze/{jobId} returns the status of the job and, optionally, the result
  • POST request to /analyze/{jobId}/cancel cancels the job

Given such an API, let’s implement a simple polling flow with RxJS.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const startAnalysis = () => {
ajax
.post<AnalyzePostResponse>(`${url}/analyze`, { message: inputText })
.pipe(
switchMap(({ response }) =>
ajax.getJSON<AnalyzeGetResponse>(`${url}/analyze/${response.id}`).pipe(
repeat({ delay: 1000 }),
takeWhile((response) => response.status === "inProgress", true)
)
)
)
.subscribe((response) => {
setResult(response.result);
}, console.error);
};

Let’s analyze this code line by line:

  1. First, we create an Observable using ajax.post. This Observable sends a POST request to ${url}/analyze when subscribed. It emits a single item - the response - and immedietly completes.
  2. Next, for each item emitted by that Observable (and we know that there will be just one), we switch to a new Observable. This new Observable sends a GET request to ${url}/analyze/${ajaxResponse.response.id} and emits the response (just once).
  3. We apply the repeat operator that Observable. This operator resubscribes to the source Observable once it completes. Thanks to the delay parameter, it will only resubscribe after waiting 1 second. Each new subscribtion will cause the source Observable to send a new GET request.
  4. As of now, GET requests would keep happening forever and we need to tell our Observable when to stop. takeWhile operator will keep emitting for as long as the incoming items satisfy the passed condition. Once the condition is violated, it will emit one more item (thanks to true passed as the second parameter) and complete.
  5. Finally, we subscribe to the resulting Observable. We expect this Observable to emit a respone with inProgress status every second for some time, then emit a single response with finished status and then complete.

You may be wondering why repeat and takeWhile operators are in a “nested” pipe instead of right after switchMap. In other words, could we change this code to:

1
2
3
4
5
.pipe(
switchMap(...),
repeat(...),
takeWhile(...)
);

The way repeat operator works is that it resubscribes to the source Observable once it completes. The source, in this case, is the Observable returned by post function. Resubscribing it would result in a new POST request to ${url}/analyze, which is not what we want. Therefore, we need to apply repeat to the Observable returned by ajax.getJSON.

Once you hit the Analyze button, you’ll observe the following requests in the Network tab of Chrome Dev Tools. Don’t worry about double POST call - the first one is a preflight request.

Polling with Cancellation

This solution works well, but let’s see what happens when the user quickly clicks multiple times on the Analyze button.

It resulted in multiple polling sessions running concurrently. This is pretty bad, since each analysis is computationally costly and results in unnecessary load on the backend.

Let’s improve our solution to handle this scenario better. What’s needed here is cancellation. There are actually two aspets of cancellation:

  • stop polling the endpoint to avoid unnecessary network traffic
  • stop the actual analysis on the backend to avoid the computational cost

We’ll address the first aspect by introducing a cancelSubject. We’ll emit from this subject whenever polling should be cancelled. The nested polling stream will complete whenever cancelSubject emits. For the latter, we’ll need a dedicated endpoint for cancelling the operation. Unless it is exposed by the server, we won’t be able to address this concern.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Make sure to memoize it if you're using React
const cancelSubject = new Subject();
// assuming that we store the result somewhere using [result, setResult]

const startAnalysis = () => {
cancelAnalysis();
ajax
.post<AnalyzePostResponse>(`${url}/analyze`, { message: inputText })
.pipe(
switchMap(({ response }) =>
ajax.getJSON<AnalyzeGetResponse>(`${url}/analyze/${response.id}`).pipe(
repeat({ delay: 1000 }),
takeWhile((response) => response.status === "inProgress", true),
takeUntil(cancelSubject)
)
)
)
.subscribe((response) => {
setResult(response);
}, console.error);
};

const cancelAnalysis = () => {
cancelSubject.next({});
if (result?.status === "inProgress") {
ajax.post(`${url}/analyze/${result.id}/cancel`).subscribe();
setResult(undefined);
}
};

We introduced a new function - cancelAnalysis. It does two things:

  • emit on the cancelSubject which will cause the nested polling stream to complete thanks to the takeUntil operator
  • send a cancellation request to the backend

cancelAnalysis is called from startAnalysis but it can also be called explicitly, for example if want to allow the user to stop the analysis at any point of time.

Now you can observe the flow of server requests in the “fast-clicking” scenario. We can see that there is only a single polling session at a time and the the previous session always gets cancelled before starting a new session.

Summary

In this article I explained when to use polling and how to implement it in RxJS. This is a great example of a problem that can be easily implemented in RxJS but would require a complex solution if done imperatively.