DB as MQ: Reliable Task Distributions with a POC

DB as MQ: Reliable Task Distributions with a POC


If you wish to see my POC, please find it at https://github.com/mrkagelui/alfred, I welcome feedback and issues!

Our products often face periodic heavy lifting duties: telcos sending bills, payroll systems doling out salaries, banks spitting out statements. In essence, once every week/month, these applications read data (such as user accounts) off a source, then perform laborious processing for each entry. This is a race against time to ensure those paychecks don’t arrive with more dust than dollars! How should we go about architecting this?

First Instinct: Involve a Message?Queue!

When it comes to “task distribution”, many would immediately throw a message queue into the mix. The resulting architecture may resemble this simple diagram:

Now we can scale the subscriber horizontally, making the processing parallel. Done! Time to move on to the next design… but is it?

Functional Complexities with a?Queue

There are two important principles of robust architecture design:

  1. Assume the process to crash any time;
  2. Assume unreliable network communication.

With these in mind, we can see some issues with our publisher: firstly, if it crashes after sending some of the information but not all, upon restart it will resend everything, which means some of the messages would be sent more than once. If we want to mitigate this problem, some form of persistence is needed for it to remember the progress, such as a database.

However, it is tricky to coordinate the database and the queue, for example:?

  1. If the publisher simply stores the information in DB and sends it to the queue, the process can crash in between the process, in the worst case, this leaves some message in DB but doesn't send them, i.e., it will never be published;
  2. We may leverage database transactions, i.e.,?a. begin tx;? b. insert message in DB; c. send message;? d. commit tx. but the pitfall is that the subscriber might receive the message faster than the tx is committed. If it needs to access the record in DB, it would not find it under usual isolation settings;
  3. 2PC (two-phase commit) is complicated, if possible at all, to implement between your choice of DB and MQ.

For these reasons, it is good to implement some form of transactional outbox pattern. In a nutshell, the publisher persists the outgoing message in the DB, then another relay polls the DB regularly and actually performs the publishing, making the architecture like this:

Unfortunately, even with this complexity, we still won’t eliminate the possibility of duplicate sending. For example, the relay might not receive the response of the MQ even if it’s successfully sent (unreliable network remember?) and resend it; or it might crash before receiving the response, and then resends upon resurrection.

This brings us to the other side of the architecture: since we can’t avoid duplicate messages, the subscriber must be idempotent, i.e., the subscriber must remember the completed or ongoing jobs so that even if it sees a duplicate message, it won’t perform the same process again. (An employer probably doesn’t want to pay the same employee more than once in a month!) Now we have something like this diagram:

(Of course, the subscriber can use the same DB as the publisher.)

Phew. If you think it can’t get crazier than this, the complexity doesn’t end here: the actual heavy lifting often involves third party API calls, PDF rendering, user notification etc, making it long and error-prone. Consequently, we need to differentiate retry and duplicate processing.?

For example, simply inserting an “in_progress” record in DB and discarding the message upon seeing such a record won’t work, because if the subscriber crashes in the process of heavy lifting, this task is stuck in limbo perpetually.?

One mitigation is to include a timestamp check for the record, i.e., define a timeout, say a minute, upon receiving a message, look into the DB:

  • If no such record exists: this means such message is not seen before, proceed;
  • If it does but it’s created before a minute ago: the heavy lifting must have crashed, proceed;
  • If it does but it’s created within a minute: the heavy lifting is still in progress.

(For this to work, we need to ensure the retry of MQ waits for longer than a minute.)

The last bit of intricacy lies in what to do when it’s in progress. This happens even if you set the retry properly, as the queue may use “at-least-once” delivery semantics. When the message is delivered twice to different subscriber instances, with this mitigation the second instance will see an “in_progress” record. If it acknowledges/discards the message, you are at the mercy of fate - if the first instance crashes in the handling process, it will seem as if the message is handled but actually not. For this reason it is better to requeue or delay the message upon seeing a duplicate, even if it is slightly less efficient.

Other Complexities with a?Queue

Depending on the actual technology chosen, there might be many other configuration in play when scaling the solution, not to mention that there are numerous implementation details such as node replication, possibility of fail over etc behind the scene. All these must work in order to ensure reliability.


Better Solution: DB as?MQ

With all these complexities, we have to ask: is it really necessary?

Thankfully no, as these jobs do not need to be picked up immediately. We can actually ditch the queue completely and utilize an RDBMS, with its row-level locks, to accomplish this! The flow looks like this.

The resulting architecture diagram is much simplified.

The key idea is the query of

SELECT ...
FROM jobs
LIMIT 1
FOR UPDATE SKIP LOCKED        

"LIMIT 1" instructs the DB to return at most one job; "FOR UPDATE" means to lock this row, which means only this transaction can now write to the row; by "SKIP LOCKED", we won’t wait for others to release the lock before picking the row. If it is necessary to enforce an order of processing, you can throw an "ORDER BY" to it (but don’t forget to add an index for efficiency!).

With this, the DB “assigns” one job to the routine holding the transaction. Jobs will be picked up “exactly-once”, because no two transactions can lock the same row at a time. Also due to SKIP LOCKED, there would be no lock contention, minimizing any performance penalty.

Note that when this query returns no row, it’s not yet safe to exit, because it could also mean others are still processing some jobs and they could fail. We should only exit when all records are explicitly completed.

With this few moving parts, there are much fewer configuration issues or incorrect assumptions that could wake you up from your sleep.

From the architecture, it is clear that the probable bottleneck would be the max connections that the DB can support, and that depends on its memory which is fairly affordable nowadays.

What if the process crashes?midway?

Good question! It is important to always wrap a status update with a database transaction and let the database rollback when a timeout is reached. (PostgreSQL has idle_in_transaction_session_timeout for this, MySQL has innodb_lock_wait_timeout and innodb_rollback_on_timeout control, for example.) With this, in the worst case, no job will be lost in limbo and no intermediate task will be skipped. Upon transaction timeout, the row will be released for another transaction to pick up and any unsaved tasks will be retried.

What if network communication fails?

Due to the much simplified architecture, there will be much fewer network communications and they would be protected by timeout and retries.?

Note, though, for the same reason discussed above, it is inevitable that downstream APIs might be called more than once under specific situations (e.g., the process crashes after calling that API but before committing the status update to DB).?

For this reason, that API should be idempotent: it should receive a key and not perform duplicate work upon seeing the key again.?

Conversely, this application should employ some form of “write-ahead log”: it should generate such a key (usually a UUID) and commit to DB, then call the API along with this key. If the call fails, retry by reading the key and sending it along in the subsequent call.

Does it really?work?

I have put together a POC at https://github.com/mrkagelui/alfred. You only need Go and docker to take it for a spin!

Moreover, this idea isn’t novel at all, some allegedly have been doing it for years.


Summary

When dealing with task distribution that can tolerate initial delay, it is not necessary to utilize an MQ. Using DB with row-level locks can be much more robust, reliable, efficient and also cost effective.

I hope this helps you. Let me know what you think in comments!

Moeen Mohsin

Staff Software Engineer @FoodPanda/DelieveryHero

9 个月

Great read. Thanks for sharing. Here is a 2 cent enhancement i would like to recommemd here, as i get a chance to implememt same things few months back. Periodically update the state of the heavy lifting in db for the purpose of recoveriability in case of defined failure state is reached, it further reduces the chances of duplicate operations. So state and status column were helpful. Cheers

要查看或添加评论,请登录

Jason Lui的更多文章

社区洞察

其他会员也浏览了