Skip to content

Celery workflows

2014 July 13
by Richard Knop

Celery is a great asynchronous task/job queue framework. It allows you you create distributed systems where tasks (execution units) are executed concurrently on multiple workers using multiprocessing. It also supports scheduling and scales really well since you can horizontally scale workers.

Celery is great at firing both synchronous and – which is one of its main strengths – asynchronous tasks such as email sending, processing of credit cards, writing transactions to a general ledger.

However, Celery offers much more. One of its most useful features is an ability to chain multiple tasks to create workflows.

Task Callbacks

Let’s create few simple tasks for demonstration purposes:

from celery import shared_task


@shared_task
def add(x, y): 
    return x + y


@shared_task
def multiply(x, y): 
    return x * y


@shared_task
def tsum(numbers): 
    return sum(numbers)

A very simple example of linking two tasks would be:

add.apply_async((5, 5), link=add.s(35))

Which would result in:

(5 + 5) + 35

You can also define an error callback. Let’s create a simple error handling task:

@shared_task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, exc, result.traceback))

You could then write:

add.apply_async((5, 5), link_error=error_handler.s())

This is useful to send an email notifying of system error or for logging exceptions for later debugging.

Both callbacks and error callbacks can be expressed as a list:

add.apply_async((5, 5), link=[add.s(35), multiply.s(2)])

The result from the first task would then be passed to two callbacks so you would get:

(5 + 5) + 35

and

(5 + 5) * 2

If you don’t want to pass the result from first task to its callback, you can create an immutable callback. This can be useful when you have a piece of logic you want to execute after the task but do not need its return value.

add.apply_async((2, 2), link=multiply.si(4, 4))

Next, let’s look at some more complex workflow primitives Celery offers.

The Primitives

First primitive I will show you is group. Groups are used when you want to execute any number of tasks in parallel.

from celery import group
result = group(add.s(i, i) for i in xrange(10))()
result.get(timeout=1)

Would result in a list of results:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Next primitive is a chain. Chain defines a set of tasks to be executed one after another in a synchronous matter.

result = (multiply.s(5, 5) | add.s(4) | multiply.s(8))()
result.get()

Would give you equivalent of:

((5 * 5) + 4 ) * 8 = 29 * 8

Another very useful primitive is a chord. Chord let’s you define a header and a body. Header is a list of tasks to be executed in parallel, body is a callback to be executed after all tasks in the header have run. The callback in body will receive a list of arguments representing return values of all tasks in the header.

from celery import chord


result = chord((add.s(i, i) for i in xrange(10)), tsum.s())()
result.get()

Would result in [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] being passed to tsum task which would add all numbers together giving 90 as a result, basically:

sum([0, 2, 4, 6, 8, 10, 12, 14, 16, 18])

There are couple more primitives. Map and starmap work similar to the built in Python map function.

tsum.map([range(10), range(100)])

Will result in:

[45, 4950]

Starmap allows you to send arguments as *args:

add.starmap(zip(range(10), range(10)))

Will result in:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Chunks let you split a long list of arguments into subsets, resulting in a task called multiple times with a smaller chunk of arguments.

# array of 1000 tuple pairs [(1, 1), (2, 2), ..., (999, 999)]
items = zip(xrange(1000), xrange(1000))
add.chunks(items, 10)

This was a very short introduction to workflows in Celery. There is much more flexibility in defining workflows. I haven’t really properly touched error handling and lots of different options you have.

There are some limitations as well though. For instance, I chaining chords together is not possible as far as I know.

Finally, let me say that Celery is an essential part of every Python programmer’s repertoire. If you haven’t used it yet, you should definitely take a look.

It can be used from simple use cases such as asynchronous charging of credit cards and sending emails in the background to more sophisticated stuff like workflows or even as a middleware in service oriented architectures.

Using Celery as middleware in SOA

2014 May 13
by Richard Knop

When creating a service oriented architecture, one of the most important decisions to make is usually what protocol to use for inter service communication.

I would like to propose a SOA based on RabbitMQ and Celery as a middleware between specific services and architecture components.

Let’s say the architecture consists of two layers:

  • edge
  • application

The edge is just a thin publicly accessible HTTP layer exposing RESTful endpoints. This is usually just a server with Nginx reverse proxy (or multiple servers behind a load balancer) and something to route URL requests and make calls to services that handle all business logic.

The application layer contains all business logic. It usually consists of several services (login service, wallet service, payment service etc) deployed in a VPC (virtual private cloud).

The question is, how should the edge communicate with different services? And how should the services communicate between each other?

A simple and very common solution is for the services to expose their own RESTful APIs so the edge server can trigger service calls via standard HTTP.

One of the drawbacks of this solution is that the edge server and all application servers now need to know additional configuration (DNS, IP addresses etc). Also, the services need to be deployed behind their own load balancers in order to scale and they need to run their own web server (Nginx and an application server, something like UWSGI or Green Unicorn).

I prefer to use AMQP protocol instead. This solution solves the above mentioned problems. For example, I am using RabbitMQ and Celery in my latest project as a middleware. I have two RabbitMQ clusters:

  • one for synchronous blocking tasks
  • one for asynchronous non blocking tasks

All my services are running as Celery deamons on application servers. There is no need for complex configuration (only URL of the message broker is needed) and no need to run a web server. Also, load balancers are no longer needed anymore as RabbitMQ cluster uses round robin to distribute messages to available workers.

Edge server only publishes messages to the correct queue. To make this work as expected, I wrote a clever routing Celery configuration which routes tasks to correct queues.

I am using a separate exchange for each service. Each service is running as two Celery deamons (sync and async) to group synchronous and asynchronous tasks together. Here is how it’s done:

from celery import Celery
import re


# http://docs.celeryproject.org/en/latest/userguide/routing.html
class Router(object):
    def route_for_task(self, task, args=None, kwargs=None):
        parts = task.split('.')
        if re.match(r'^mp[a-z_]+\.sync\.[a-z_]+$', task) is not None:
            return {
                'routing_key': task,
                'queue': parts[0] + '.sync',
            }
        elif re.match(r'^mp[a-z_]+\.async\.[a-z_]+$', task) is not None:
            return {
                'routing_key': task,
                'queue': parts[0] + '.async',
            }
        return None


def _get_celery_queues():
    services = [
        'mplogin',
        'mpwallet',
        'mpledger',
    ]

    queues = {}
    for service in services:
        queues[service + '.sync'] = {
            'binding_key': service + '.sync.#',
            'exchange': service,
            'exchange_type': 'topic',
            'delivery_mode': 1, # transient messages, not written to disk
        }
        queues[service + '.async'] = {
            'binding_key': service + '.async.#',
            'exchange': service,
            'exchange_type': 'topic'
        }

    return queues


class CeleryConfig(object):
    CELERY_ROUTES = (Router(),)

    #: Only add pickle to this list if your broker is secured
    #: from unwanted access (see userguide/security.html)
    CELERY_ACCEPT_CONTENT = ['pickle', 'json']
    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'
    CELERY_TIMEZONE = 'UTC'
    CELERY_ENABLE_UTC = True
    CELERY_BACKEND = 'amqp'

    # Replicate queues to all nodes in the cluster
    CELERY_QUEUE_HA_POLICY = 'all'

    # http://docs.celeryproject.org/en/latest/userguide/tasks.html#disable-rate-limits-if-they-re-not-used
    CELERY_DISABLE_RATE_LIMITS = True

    CELERY_QUEUES = _get_celery_queues()
    BROKER_HEARTBEAT = 10
    BROKER_HEARTBEAT_CHECKRATE = 2.0
    BROKER_POOL_LIMIT = 0


def celery_apps_factory(app_type, sync_broker_url, async_broker_url, service_name):
    protocol = 'pyamqp' if app_type == 'SUBSCRIBER' else 'librabbitmq'

    broker_url_sync = protocol + '://' + sync_broker_url
    broker_url_async = protocol + '://' + async_broker_url

    sync_app = Celery(service_name + '.sync_app', broker=broker_url_sync)
    sync_app.config_from_object(CeleryConfig)

    async_app = Celery(service_name + '.async_app', broker=broker_url_async)
    async_app.config_from_object(CeleryConfig)
    async_app.conf.CELERY_IGNORE_RESULT = True

    return sync_app, async_app

In the routing configuration above the SOA platform would have a common prefix mp (mp = my platform…. just an example). Every service would therefor be prefixed with mp (e.g. mplogin would be name for the login service).

A task name would consist of service name (the same as exchange name), “sync” or “async” word and a task name – all separated by full stop. For example:

  • mplogin.sync.register

Is a task to register a new user. If you split it by commas, you can say that:

  • mplogin is the name of the exchange
  • mplogin.sync is the name of the queue
  • register is the name of the task (service method)

Here would be an example definition of the register task then:

sync_app, async_app = celery_apps_factory(
    app_type='SUBSCRIBER',
    sync_broker_url=settings.BROKER_URL_SYNC,
    async_broker_url=settings.BROKER_URL_ASYNC,
    service_name='mplogin',
)

@sync_app.task(name='mplogin.sync.register')
def register(user_obj):
    response = user_service.register(user_obj)

To register a new user, you could then call the task from the edge server:

login_sync_app, login_async_app = celery_apps_factory(
    app_type='PUBLISHER',
    sync_broker_url=settings.BROKER_URL_SYNC,
    async_broker_url=settings.BROKER_URL_ASYNC,
    service_name='mplogin',
)

login_sync_app.send_task(
    'mplogin.sync.register',
    kwargs={
        'user_obj': json_obj,
    },
).get()

I hope at least somebody will find this useful :)

Create a readonly user in Postgres

2014 April 16
by Richard Knop

This is quite useful to create a user that can be used for backups, reporting and so on.

Assuming you have databases foo_db and bar_db and want to create a readonly user for them called backup_user with password qux94874:

CREATE USER backup_user WITH ENCRYPTED PASSWORD 'qux94874';
GRANT CONNECT ON DATABASE foo_db to backup_user;
GRANT CONNECT ON DATABASE bar_db to backup_user;
\c foo
GRANT USAGE ON SCHEMA public to backup_user;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO backup_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO backup_user;
\c bar
GRANT USAGE ON SCHEMA public to backup_user;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO backup_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO backup_user;

Round half to even in Go

2013 November 24
by Richard Knop

Rounding half to even is used a lot when dealing with financial transactions, accounting and so on.

I have recently used it when implementing a double entry general ledger for one of the hottest startups in London.

Here is my implementation in Go:

import "math"

// http://en.wikipedia.org/wiki/Rounding#Round_half_to_even
func roundHalfToEven(f float64) float64 {
        integer, fraction := math.Modf(f)
        f = integer
        if 0.5 == math.Abs(fraction) {
                if 0 != math.Mod(f, 2) {
                        f += math.Copysign(1, integer)
                }
        } else {
                f += math.Copysign(float64(int(fraction+math.Copysign(0.5, fraction))), fraction)
        }
        return f
}

And here are unit tests:

import "testing"

func TestRoundHalfToEven(t *testing.T) {

        testCases := []struct {
                numberToRound  float64
                expectedResult float64
        }{
                {
                        // 23.5 =~ 24
                        numberToRound:  23.5,
                        expectedResult: 24,
                },
                {
                        // 24.5 =~ 24
                        numberToRound:  24.5,
                        expectedResult: 24,
                },
                {
                        // 7.58 =~ 8
                        numberToRound:  7.58,
                        expectedResult: 8,
                },
                {
                        // 7.46 =~ 7
                        numberToRound:  7.46,
                        expectedResult: 7,
                },
                {
                        // 5 =~ 5
                        numberToRound:  5,
                        expectedResult: 5,
                },
                {
                        // -23.5 =! -24
                        numberToRound:  -23.5,
                        expectedResult: -24,
                },
                {
                        // -24.5 =~ -24
                        numberToRound:  -24.5,
                        expectedResult: -24,
                },
                {
                        // -7.58 =~ -8
                        numberToRound:  -7.58,
                        expectedResult: -8,
                },
                {
                        // -7.46 =~ -7
                        numberToRound:  -7.46,
                        expectedResult: -7,
                },
                {
                        // -5 =~ -5
                        numberToRound:  -5,
                        expectedResult: -5,
                },
                {
                        // 0 =~ 0
                        numberToRound:  0,
                        expectedResult: 0,
                },
        }

        for i, tc := range testCases {
                actualResult := roundHalfToEven(tc.numberToRound)
                if actualResult != tc.expectedResult {
                        t.Errorf("%v rounded half to even should be %v, instead of %v (%v)", tc.numberToRound, tc.expectedResult, actualResult, i)
                }
        }

}

Permutation algorithm

2013 October 25
by Richard Knop

You might remember me writing several articles explaining different sorting algorithms. I might come back to that series as there are few sorting algorithms I haven’t gone through.

But I wanted to do something else now, this is an interesting algorithm to find all permutations of a string.

Let’s say you have string “bar”. The steps are as follows:

  1. If the string is just a one letter, return it.
  2. Remove the first letter of the string and find all permutations of the new string. Do this recursively.
  3. For each found permutation, insert the remove letter from the previous step at every single position. Add each of these strings as a result.
  4. Return the array of results.

Here is implementation in JavaScript:

function permutations(word) {
    if (word.length <= 1) {
        return [word];
    }
 
    var perms = permutations(word.slice(1, word.length)),
    char = word[0],
    result = [],
    i;
    perms.forEach(function (perm) {
        for (i = 0; i < perm.length + 1; i += 1) {
            result.push(perm.slice(0, i) + char + perm.slice(i, perm.length));
        }
    });
 
    return result;
}

My new iPad game

2013 September 14
by Richard Knop

My new iPad game (School of Alchemy) just got approved by Apple. You can download it from the app store.

It was quite a fun project, I learned a lot about iOS web views and JavaScript / HTML5, the game is funny and features beautiful retina display optimised images, I’m sure you will like it.

Feel free to retweet:

Here are few screenshots:

School of Alchemy #1

School of Alchemy #2

School of Alchemy #3

School of Alchemy #4

School of Alchemy #5

How To Properly Inherit Methods From Prototype

2013 June 29
by Richard Knop

JavaScript is using prototypal inheritance instead of the common class bases inheritance we are all used to from Java or C#. This can be difficult to understand at first. Consider the simple following example:

function Mammal(config) {
    this.config = config;
    console.log(this.config);
}

function Cat(config) {
    // call parent constructor
    Mammal.call(this, config);
}
// inherit all methods from Mammal's prototype
Cat.prototype = new Mammal();

var felix = new Cat({
    "name": "Felix"
});

We have a simple inheritance pattern here. Mammal is a parent class which accepts config as a constructor argument. Cat extends Mammal’s prototype. Everything looks ok but when you run this code in the browser you will see this in the console:

undefined fiddle.jshell.net/_display/:23
Object {name: "Felix"}

The problem is that we calling the Mammal’s constructor twice. First time, when we try to inherit all methods from the Mammal’s prototype in the Cat object. Second time, when creating instance of Cat.

The correct way of inheriting would be:

function Mammal(config) {
    this.config = config;
    console.log(this.config);
}

function Cat(config) {
    // call parent constructor
    Mammal.call(this, config);
}
// inherit all methods from Mammal's prototype
Cat.prototype = Object.create(Mammal.prototype);

var felix = new Cat({
    "name": "Felix"
});

Build Pipeline Plugin’s Main Weakness

2013 June 24
by Richard Knop

Build Pipeline Plugin is one of my favorite Jenkins plugins. It allows you to build a pipeline out of several jobs by defining upstream and downstream jobs. This way you can create a process consisting of multiple steps from running unit tests, various code quality tools (lint, mess detector, copy paste detector etc), BDD tests, through deployment to different environments (integration, functional test, production).

This allows you to manage whole journey of an application from committing to version control (Git, SVN etc) to deployment on production servers. In addition, the Build Pipeline Plugin also represents this journey in a very nice visual way.

The only big issue I have with this plugin relates to manually triggered parametrized jobs. The problem is, when you have a parametrized job which requires a manual submission (deployment to staging or production certainly does), it should ask you to manually enter the required parameters when you trigger the job from the pipeline view. This doesn’t happen and it triggers the build without any parameters.

This is quite annoying and has forced me to remove jobs like this from the pipeline for now (until the plugin is updated to support this very important feature…).

Quick Introduction To BDD Testing With Behat

2013 June 20
tags:
by Richard Knop

Behat is a PHP framework for BDD testing. It us very similar to Cucumber (RoR) or Behave (Django), it uses Gherkin language to define features and scenarions. These are then executed by Behat, which uses one or several drivers. In this short example I will show you how to use two drivers:

  • Goutte (simple HTTP based headless browser)
  • Selenium 2 (full browser emulator with JS support)

First, let’s use Composer to install Behat and the drivers:

{
    "name": "foo-bar",
    "description": "Foo Bar",
    "minimum-stability": "dev",
    "require": {
        "php": ">=5.3",
    },
    "require-dev": {
        "behat/behat": "dev-develop#1f1bead31e96da5e30fd5d499d5cf66d29b68cf6",
        "behat/mink": "v1.4.3",
        "behat/mink-extension": "dev-master#ef2c8639ebc254f0ff6e555b7834700caf5db9c4",
        "behat/mink-goutte-driver": "dev-master#v1.0.8",
        "behat/mink-selenium2-driver": "v1.0.6",
    }
}

Install the dependencies defined in composer.json file (above):

php composer.phar install --dev

Make sure you have Firefox 21 installed as that’s what we will be using for Selenium 2 tests. Also important, make sure you have Selenium 2 server installed and running. I have written a post explaining how to install Selenium 2 on Mac OS as a service.

This is an example behat.yml file I’m using in one of projects I work on:

default:
  paths:
    features: tests/functional/features
    bootstrap: tests/functional/features/bootstrap
  extensions:
    Behat\MinkExtension\Extension:
      base_url: http://virtualhost.local
      goutte: ~
      default_session: goutte 
      javascript_session: selenium2 
      selenium2: 
        browser: firefox
        capabilities: { "browserName": "firefox", "browser": "firefox", "version": "21"}
        wd_host: http://127.0.0.1:4443/wd/hub

Then you can run the tests like this:

vendor/bin/behat --ansi

All tests with @javascript tag will use Selenium 2 driver (so a new Firefox window will pop up and your tests will run inside it), tests without @javascript tag will run using the headless Goutte driver.

You can exclude Selenium 2 tests like this (for example you might not have an external Selenium 2 server for Jenkins so you might want to run Selenium tests only locally and run only tests without @javascript tag on Jenkins as a part of the build process):

vendor/bin/behat --ansi --tags=~@javascript

Installing Selenium 2 On Mac OS As Service

2013 June 20
tags:
by Richard Knop

Selenium 2 is a powerful browser emulator which enables you to automate frontend testing (especially JavaScript heavy pages). It can be used with Behat as well for BDD testing.

I prefer using Goutte headless driver to test basic functionality and flow of the application but sometimes there are critical parts of the app that rely heavily on client side technologies like JavaScript. Only way to test to those is to use a browser emulator (Selenium 2, Sahi etc). Enough talking, here is how I installed Selenium 2 server on Mac OS as a service:

wget https://selenium.googlecode.com/files/selenium-server-standalone-2.33.0.jar

sudo mkdir /usr/lib/selenium

sudo mv selenium-server-standalone-2.33.0.jar /usr/lib/selenium

sudo mkdir /var/log/selenium

sudo chmod -R a+w /var/log/selenium

Then you need to create a new file:

sudo nano ~/Library/LaunchAgents/org.nhabit.Selenium.plist

With contents:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
        <key>Label</key>
        <string>org.nhabit.Selenium</string>
        <key>OnDemand</key>
        <true/>
        <key>ProgramArguments</key>
        <array>
                <string>/usr/bin/java</string>
                <string>-jar</string>
                <string>/usr/lib/selenium/selenium-server-standalone-2.33.0.jar</string>
                <string>-port</string>
                <string>4443</string>
        </array>
        <key>ServiceDescription</key>
        <string>Selenium Server</string>
        <key>StandardErrorPath</key>
        <string>/var/log/selenium/selenium-error.log</string>
        <key>StandardOutPath</key>
        <string>/var/log/selenium/selenium-output.log</string>
</dict>
</plist>

Load the service:

launchctl load ~/Library/LaunchAgents/org.nhabit.Selenium.plist
launchctl start org.nhabit.Selenium

You can also check it’s running:

ps aux | grep selenium

richardknop    19880   0.0  0.5  2859020  76648   ??  S     2:09pm   0:07.05 /usr/bin/java -jar /usr/lib/selenium/selenium-server-standalone-2.33.0.jar -port 4443
richardknop    20852   0.0  0.0  2432768    596 s001  R+    3:17pm   0:00.00 grep selenium

And error logs, just in case you need them:

tail -f /var/log/selenium/selenium-error.log