DynamoDB in examples, Example 1.4: DynamoDB and Celery using green threads

Usually we set concurrency equal to number of processors.
But if we using eventlet, we able to set much bigger value, like ~ 1000.
To enable eventlet pool use -P option to celery worker.

$ pip install celery
$ pip install eventlet
# main.py

import datetime
import logging
import random
import uuid

import botocore.session

from celery import Celery
from celery.task import Task
from celery.registry import tasks


logger = logging.getLogger(__name__)


class Config:

    pass


app = Celery()
app.config_from_object(Config)


class UpdateBalance(Task):

    name = 'update_balance'

    def __init__(self, *args, **kwargs):
        super(UpdateBalance, self).__init__(*args, **kwargs)
        self.amazon_session = botocore.session.get_session()

    def run(self, user_id):
        client = self.amazon_session.create_client('dynamodb', region_name='us-west-2')
        balance = random.randint(1, 1000)  # some hard task calculates user balance
        result = client.put_item(
            TableName='user_wallet',
            Item={
                'user_id': {
                    'S': str(user_id),
                },
                'balance': {
                    'N': str(balance),
                }
            })
        logger.warning(datetime.datetime.now())
        logger.warning(result)


tasks.register(UpdateBalance)


if __name__ == '__main__':
    for i in range(10):
        user_id = str(uuid.uuid4())
        UpdateBalance.delay(user_id=user_id)

Results using eventlet:

$ celery worker -A main -P eventlet -c 1000
$ python main.py

```text
 -------------- celery@nanvel-air.local v3.1.18 (Cipater)
---- **** -----
--- * ***  * -- Darwin-14.3.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x1023c92d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 1000 (eventlet)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[2015-06-06 21:57:23,698: WARNING/MainProcess] celery@nanvel-air.local ready.
[2015-06-06 21:57:26,511: WARNING/MainProcess] 2015-06-06 21:57:26.511696
[2015-06-06 21:57:26,512: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '1BNOL3VSM33D1EUIP7QRP2QTUNVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,515: WARNING/MainProcess] 2015-06-06 21:57:26.515920
[2015-06-06 21:57:26,516: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HGVHJ9CRS5U3EJ4KG57C44N53BVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,541: WARNING/MainProcess] 2015-06-06 21:57:26.541750
[2015-06-06 21:57:26,542: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'RD0J3H6EL29B6CKUKLJ9AQT4HNVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,550: WARNING/MainProcess] 2015-06-06 21:57:26.550805
[2015-06-06 21:57:26,551: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'SP9M3F114NIEOAUQVQ0UU74JVRVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,590: WARNING/MainProcess] 2015-06-06 21:57:26.589971
[2015-06-06 21:57:26,590: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HTB2TU11O98IA1RA5IANC1T8GBVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,637: WARNING/MainProcess] 2015-06-06 21:57:26.637958
[2015-06-06 21:57:26,638: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'A9B7SON7IO7TBVA8CHCG1HMOSJVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,719: WARNING/MainProcess] 2015-06-06 21:57:26.719082
[2015-06-06 21:57:26,719: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HPKGKLUV80OR1I1I679457NV63VV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,729: WARNING/MainProcess] 2015-06-06 21:57:26.729478
[2015-06-06 21:57:26,729: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'AAIKV3SKH2QQJ1UDG0CBHCKKHNVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,739: WARNING/MainProcess] 2015-06-06 21:57:26.739881
[2015-06-06 21:57:26,740: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'QU4E7KQMC656O1MNB3C7F6A8MFVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:26,766: WARNING/MainProcess] 2015-06-06 21:57:26.766148
[2015-06-06 21:57:26,766: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'LLB6K6385US0V5BRBI8SBVTKBRVV4KQNSO5AEMVJF66Q9ASUAAJG'}}

Results using prefork:

$ celery worker -A main -c 2
$ python main.py
 -------------- celery@nanvel-air.local v3.1.18 (Cipater)
---- **** -----
--- * ***  * -- Darwin-14.3.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x107883850
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[2015-06-06 21:57:49,982: WARNING/MainProcess] celery@nanvel-air.local ready.
[2015-06-06 21:57:55,286: WARNING/Worker-2] 2015-06-06 21:57:55.286161
[2015-06-06 21:57:55,286: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'S217PD538RR7TL4VUDPGUJLAR7VV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:55,308: WARNING/Worker-1] 2015-06-06 21:57:55.308731
[2015-06-06 21:57:55,309: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '5966EVKRRTJI1GJOCR262H1ISFVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:56,483: WARNING/Worker-2] 2015-06-06 21:57:56.483080
[2015-06-06 21:57:56,483: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'PQT4RTVJ4QHH39Q2I6IPGSAIAJVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:56,485: WARNING/Worker-1] 2015-06-06 21:57:56.485244
[2015-06-06 21:57:56,485: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'JEK3R8F4EN93OTKVJCA6CEEHOVVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:57,646: WARNING/Worker-1] 2015-06-06 21:57:57.646181
[2015-06-06 21:57:57,646: WARNING/Worker-2] 2015-06-06 21:57:57.646156
[2015-06-06 21:57:57,646: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'J0PKP3F8PI30D4657SEJHVF49NVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:57,646: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HM07DDU8OJSSC877FEAONPR9D7VV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:58,808: WARNING/Worker-2] 2015-06-06 21:57:58.808078
[2015-06-06 21:57:58,808: WARNING/Worker-1] 2015-06-06 21:57:58.808186
[2015-06-06 21:57:58,808: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'K6NI9JGQBQ3TEUHPMJHFN9R2BFVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:57:58,808: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '4O86RDPPQ5II43FA02LRM3F9MVVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:58:00,039: WARNING/Worker-1] 2015-06-06 21:58:00.038970
[2015-06-06 21:58:00,039: WARNING/Worker-2] 2015-06-06 21:58:00.039041
[2015-06-06 21:58:00,039: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'OP19GVG098LQQ9D415FDIR15VNVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
[2015-06-06 21:58:00,039: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'APD8V79PV0TGKD6TTKUJNFPVJVVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
Licensed under CC BY-SA 3.0