@Scale is an initiative to build a journal, talking about problems occurring at scale at Imaginea or otherwise. I am trying to make this blogpost and the series as beginner friendly as possible.

The first part of this series talks about Task Queues.

Table of contents

  1. Introduction
  2. Internals
    • Basics
    • AMQP Fundamentals
      • Exchanges
      • Bindings
      • Queues
  3. Ecosystem
  4. Usecases
    • Distributed Processing
    • Deferred Delivery
  5. Conclusion

Introduction to Task Queues

Task Queues (also known as Job Queues or Message Queues) are a mechanism to distribute/dispatch “tasks” or “jobs” among “workers”, with the purpose of executing them asynchronously.

Right from the inception of AMQP in 2003, Task Queues have been the backbone of most data/task pipelines and are used in production by many companies including but not limited to:

  • Slack
  • Uber
  • Pinterest
  • Redislabs
  • Airbnb
  • 99designs
  • MongoDB Inc.
  • Google
  • Microsoft

and many more.

Common use-cases include:

  • Video Encoding & Decoding
  • Send Emails
  • Resizing Pictures
  • Search Engine Indexing
  • Processing Bulk Updates
  • Any task which can be executed asynchronously

Let’s now understand task queues in depth.

Task Queue Internals

Basics

Although there are no standardized architectures for task queues, most of them are expected to support:

  • Asynchronous communication
  • Parallel processing
  • Event driven protocols
  • Multi-platform Execution

Let’s look at the architecture of celery, a popular distributed processing library written in python:

Celery uses RabbitMQ or Redis as a message broker. App1 & App2, also sometimes known as “Producers” or “Publishers”, work with celery library APIs that register logical blocks of code as “tasks”. Worker1 & Worker2, also sometimes known as “Consumers”, consume these “tasks” and optionally store any results to a “message backend”. The “broker” (in fact a task queue) in this case receives tasks(written using celery) encapsulated as messages from producers and routes them to consumers.

Even though 2 producers and 2 consumers is a simple use case (real world use cases may have larger number of producers and consumers), a series of messages sent to and fro can lead to complexities (Read more about it here). Lets also not presume that the producers, consumers and the broker all reside on the same machine. Managing messages in such cases is not as simple as storing them in a data store as a queue.

Few other gotchas when implementing such a messaging system are:

  • Detecting poison messages. A malformed message, or a task that requires access to resources that aren’t available, can cause a task to fail. The system should prevent such messages being returned to the queue, and instead capture and store the details of these messages elsewhere so that they can be analyzed if necessary.
  • Ensuring reliability of the messaging system. A reliable messaging system is needed to guarantee that after the application enqueues a message it won’t be lost. This is essential for ensuring that all messages are delivered at least once.
  • Scaling the messaging system. In a large-scale solution, a single message queue could be overwhelmed by the number of messages and become a bottleneck in the system. In this situation, consider partitioning the messaging system to send messages from specific producers to a particular queue, or use load balancing to distribute messages across multiple message queues.

AMQP Fundamentals

RabbitMQ, one of the most popular message brokers, solves many of the above problems. It natively supports AMQP (Advanced Message Queuing Protocol), a messaging protocol that includes features like reliable queuing, topic-based publish-and-subscribe messaging, flexible routing, transactions, and security.

When a producer publishes tasks to RabbitMQ, messages containing metadata related to the task are published to exchanges, a terminal-sorts for all messages. Exchanges then distribute message copies to queues using rules called bindings. Once the messages are inside a queue, they can be dispatched to consumers.

Depending on the dispatch mechanism, these queue can be categorized as a:

  • Push Queue: Push queues dispatch requests at a reliable, steady rate. They guarantee reliable task execution. Because you can control the rate at which tasks are sent from the queue, you can control the workers’ scaling behavior and hence your costs.
  • Pull Queue: Pull queues do not dispatch tasks at all. They depend on other worker services to “lease” tasks from the queue on their own initiative. Pull queues give you more power and flexibility over when and where tasks are processed, but they also require you to to do more process management. When a task is leased the leasing worker declares a deadline. By the time the deadline arrives the worker must either complete the task and delete it or the Task Queue service will allow another worker to lease it.

Usage of the above depends mostly on the use case. For example, pull queues can be used for a pub-sub or a competing consumer pattern.

AMQP supports message acknowledgements when networks are unreliable or when tasks fail. For example, when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message(or group of messages).

In certain situations, for example, when a message cannot be routed, messages may be returned to publishers, dropped, or, if the broker implements an extension, placed into a so-called “dead letter queue”. Publishers choose how to handle situations like this by publishing messages using certain parameters.

Exchanges

Exchanges are AMQP entities where messages are sent. Exchanges take a message and route it into zero or more queues. The routing algorithm used depends on the exchange type and rules called bindings. AMQP brokers provide four exchange types:

  • Direct exchange: Delivers messages to queues based on a message routing key. Ideal for the unicast routing of messages (although they can be used for multicast routing as well). Here is how it works:
    1. A queue binds to the exchange with a routing key K
    2. When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R
    3. Often used to distribute tasks between multiple workers (instances of the same application) in a round robin manner. When doing so, it is important to understand that, in AMQP, messages are load balanced between consumers and not between queues.
  • Fanout exchange: Routes messages to all of the queues that are bound to it. If N queues are bound to a fanout exchange, when a new message is published to that exchange a copy of the message is delivered to all N queues. Ideal for broadcast routing of messages.Because a fanout exchange delivers a copy of a message to every queue bound to it, it can be used for use cases involving broadcasting of updates/information across multiple users in near real-time (news sites for example).
  • Topic exchange: Routes messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. It’s often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.They differ from direct exchanges in how they load balance the messages (topic vs random).
  • Headers exchange: Designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. The attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.These can be looked upon as “direct exchanges on steroids”. Because they route based on header values,they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.

Exchanges can be durable or transient. Durable exchanges survive broker restart whereas transient exchanges do not (they have to be re-declared when broker comes back online). Not all scenarios and use cases require exchanges to be durable.

Bindings

Bindings are rules that exchanges use (among other things) to route messages to queues. To instruct an exchange E to route messages to a queue Q, Q has to be bound to E. Bindings may have an optional routing key attribute used by some exchange types. The purpose of the routing key is to select certain messages published to an exchange to be routed to the bound queue. In other words,the routing key acts like a filter.

Having this layer of indirection enables routing scenarios that are impossible or very hard to implement using publishing directly to queues and also eliminates certain amount of duplicated work application developers have to do.

If AMQP message cannot be routed to any queue (for example,because there are no bindings for the exchange it was published to) it is either dropped or returned to the publisher, depending on message attributes the publisher has set.

Queues

Queues in the AMQP model are very similar to queues in other message and task-queueing systems: they store messages that are consumed by applications. Queues share some properties with exchanges, but also have some additional properties:

  • Name
  • Durable (the queue will survive a broker restart by persisting to disk)
  • Exclusive (used by only one connection and the queue will be deleted when that connection closes)
  • Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
  • Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)

Although that was a gentle introduction, we can now understand how RabbitMQ uses AMQP’s entities to solve problems like poison tasks using dead-letter queues or how it’s reliable by using durable queues or how producers and consumers can scale horizontally without worrying about how tasks are distributed.

The Ecosystem

If you are trying to understand why we spoke about celery in our first example and immediately jumped to RabbitMQ, fret not. Celery uses RabbitMQ as it cannot really work without a message broker. It is important to note that celery’s value is in providing a flexible API, patterns to quickly scale up, even to multiple machines. There’s nothing stopping anyone from writing their “own” celery (in fact few companies have done the same, Uber’s cherami being a fine example).

Lets look at other popular task queues:

Brokers Implementation Language Clients Supported
RabbitMQ Erlang Python, Java, Node.js & Go
Amazon SQS Java Python, Java, Node.js & Go
Apache Qpid Java, C++ Java, C++
Redis C Python, Java, Node.js & Go
Apache ActiveMQ Java Python, Java, Node.js & Go
MSMQ C++ C++, C#
Cherami Go Go

Lets also look at popular libraries for using them:

Language Library Broker Support Project Link
Python Celery RabbitMQ, Redis, Amazon SQS http://celeryproject.org/
Huey Redis https://huey.readthedocs.io/en/latest/
RQ Redis http://python-rq.org/
Dramatiq RabbitMQ, Redis https://dramatiq.io
Go Machinery RabbitMQ, Redis https://github.com/RichardKnop/machinery
Ruby Sidekiq Redis http://sidekiq.org/
Delayed::Job Redis https://github.com/collectiveidea/delayed_job
Resque Redis https://github.com/resque/resque
Java Jesque Redis https://github.com/gresrun/jesque
Node.js Bee Queue Redis https://github.com/bee-queue/bee-queue
Kue Redis http://automattic.github.io/kue/

Use-cases

Depending on the problem we are trying to solve, the scale of the problem and existing tech stack in use by the project/organization, we can decide on a message broker with a library that supports it. But how it is being used usually falls in either of following categories

  • Distributed Processing: Use-cases where we need to parallelize a long running task which is either CPU (or) IO bound in order to reduce its execution time.
  • Deferred Delivery: Use-cases where we can process a task at a later point.

Lets use Celery and RabbitMQ for showcasing how these use-cases can be executed. Familiarity with python is a plus for the following section. However, as python code is self documentary in nature, you shouldn’t face any problems understanding the nuances of the below code.

The following code snippet to install celery assumes you have installed Anaconda for Python and added it in your PATH.

Follow the instructions here to setup RabbitMQ on your system.

Distributed Processing

One of the primary use-cases of message brokers is to break down a set of sequentially executed tasks (which are not dependent on each other) into parallel tasks. For example:

You can run them like below:

Sometimes, you might get weird results, like a non-optimized task taking lesser time than parallelizing with celery. That’s because the time taken for the life-cycle of the message (from task to broker to acknowledgement) is not included.

A way to parallelize the complex task of generating, say the first 10000 fibonacci numbers, can be a great exercise to understand celery(and its internals) and to get your hands dirty with it.

Deferred Delivery

Another common use case for message brokers is running tasks in the background. In fact, this is the primary use case for celery (and one of the reasons for its existence). Lets say when you sign up to a new service, the service needs to send an email with some details. This doesn’t have to execute immediately and ideally the user can quickly login. If we are using a webserver which is blocking in nature (as opposed to something like Node.js or Tornado), this could also affect the response times for other users were it to execute immediately.

Let’s use django as an example to illustrate how a message broker (celery) can solve this problem:

Check this link if you are interested in checking how celery can execute periodic tasks.

You can use any popular message broker/library mentioned here to achieve the same results shown above, although they may differ in the feature set provided.

Conclusion

Have you been identifying all possible use-cases YOU can apply concepts of asynchronous distribution of tasks? If Yes, then you are moving in the right direction.

In the next part of this series, we will be talking about Numerical Computing.