DynamoDB in examples, Example 1.4: DynamoDB and Celery using green threads
Usually we set concurrency equal to number of processors.
But if we using eventlet, we able to set much bigger value, like ~ 1000.
To enable eventlet pool use -P
option to celery worker.
$ pip install celery $ pip install eventlet
# main.py import datetime import logging import random import uuid import botocore.session from celery import Celery from celery.task import Task from celery.registry import tasks logger = logging.getLogger(__name__) class Config: pass app = Celery() app.config_from_object(Config) class UpdateBalance(Task): name = 'update_balance' def __init__(self, *args, **kwargs): super(UpdateBalance, self).__init__(*args, **kwargs) self.amazon_session = botocore.session.get_session() def run(self, user_id): client = self.amazon_session.create_client('dynamodb', region_name='us-west-2') balance = random.randint(1, 1000) # some hard task calculates user balance result = client.put_item( TableName='user_wallet', Item={ 'user_id': { 'S': str(user_id), }, 'balance': { 'N': str(balance), } }) logger.warning(datetime.datetime.now()) logger.warning(result) tasks.register(UpdateBalance) if __name__ == '__main__': for i in range(10): user_id = str(uuid.uuid4()) UpdateBalance.delay(user_id=user_id)
Results using eventlet:
$ celery worker -A main -P eventlet -c 1000 $ python main.py ```text -------------- celery@nanvel-air.local v3.1.18 (Cipater) ---- **** ----- --- * *** * -- Darwin-14.3.0-x86_64-i386-64bit -- * - **** --- - ** ---------- [config] - ** ---------- .> app: __main__:0x1023c92d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled - *** --- * --- .> concurrency: 1000 (eventlet) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [2015-06-06 21:57:23,698: WARNING/MainProcess] celery@nanvel-air.local ready. [2015-06-06 21:57:26,511: WARNING/MainProcess] 2015-06-06 21:57:26.511696 [2015-06-06 21:57:26,512: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '1BNOL3VSM33D1EUIP7QRP2QTUNVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,515: WARNING/MainProcess] 2015-06-06 21:57:26.515920 [2015-06-06 21:57:26,516: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HGVHJ9CRS5U3EJ4KG57C44N53BVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,541: WARNING/MainProcess] 2015-06-06 21:57:26.541750 [2015-06-06 21:57:26,542: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'RD0J3H6EL29B6CKUKLJ9AQT4HNVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,550: WARNING/MainProcess] 2015-06-06 21:57:26.550805 [2015-06-06 21:57:26,551: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'SP9M3F114NIEOAUQVQ0UU74JVRVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,590: WARNING/MainProcess] 2015-06-06 21:57:26.589971 [2015-06-06 21:57:26,590: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HTB2TU11O98IA1RA5IANC1T8GBVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,637: WARNING/MainProcess] 2015-06-06 21:57:26.637958 [2015-06-06 21:57:26,638: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'A9B7SON7IO7TBVA8CHCG1HMOSJVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,719: WARNING/MainProcess] 2015-06-06 21:57:26.719082 [2015-06-06 21:57:26,719: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HPKGKLUV80OR1I1I679457NV63VV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,729: WARNING/MainProcess] 2015-06-06 21:57:26.729478 [2015-06-06 21:57:26,729: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'AAIKV3SKH2QQJ1UDG0CBHCKKHNVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,739: WARNING/MainProcess] 2015-06-06 21:57:26.739881 [2015-06-06 21:57:26,740: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'QU4E7KQMC656O1MNB3C7F6A8MFVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:26,766: WARNING/MainProcess] 2015-06-06 21:57:26.766148 [2015-06-06 21:57:26,766: WARNING/MainProcess] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'LLB6K6385US0V5BRBI8SBVTKBRVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
Results using prefork:
$ celery worker -A main -c 2 $ python main.py
-------------- celery@nanvel-air.local v3.1.18 (Cipater) ---- **** ----- --- * *** * -- Darwin-14.3.0-x86_64-i386-64bit -- * - **** --- - ** ---------- [config] - ** ---------- .> app: __main__:0x107883850 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [2015-06-06 21:57:49,982: WARNING/MainProcess] celery@nanvel-air.local ready. [2015-06-06 21:57:55,286: WARNING/Worker-2] 2015-06-06 21:57:55.286161 [2015-06-06 21:57:55,286: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'S217PD538RR7TL4VUDPGUJLAR7VV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:55,308: WARNING/Worker-1] 2015-06-06 21:57:55.308731 [2015-06-06 21:57:55,309: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '5966EVKRRTJI1GJOCR262H1ISFVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:56,483: WARNING/Worker-2] 2015-06-06 21:57:56.483080 [2015-06-06 21:57:56,483: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'PQT4RTVJ4QHH39Q2I6IPGSAIAJVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:56,485: WARNING/Worker-1] 2015-06-06 21:57:56.485244 [2015-06-06 21:57:56,485: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'JEK3R8F4EN93OTKVJCA6CEEHOVVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:57,646: WARNING/Worker-1] 2015-06-06 21:57:57.646181 [2015-06-06 21:57:57,646: WARNING/Worker-2] 2015-06-06 21:57:57.646156 [2015-06-06 21:57:57,646: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'J0PKP3F8PI30D4657SEJHVF49NVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:57,646: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'HM07DDU8OJSSC877FEAONPR9D7VV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:58,808: WARNING/Worker-2] 2015-06-06 21:57:58.808078 [2015-06-06 21:57:58,808: WARNING/Worker-1] 2015-06-06 21:57:58.808186 [2015-06-06 21:57:58,808: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'K6NI9JGQBQ3TEUHPMJHFN9R2BFVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:57:58,808: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '4O86RDPPQ5II43FA02LRM3F9MVVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:58:00,039: WARNING/Worker-1] 2015-06-06 21:58:00.038970 [2015-06-06 21:58:00,039: WARNING/Worker-2] 2015-06-06 21:58:00.039041 [2015-06-06 21:58:00,039: WARNING/Worker-1] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'OP19GVG098LQQ9D415FDIR15VNVV4KQNSO5AEMVJF66Q9ASUAAJG'}} [2015-06-06 21:58:00,039: WARNING/Worker-2] {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'APD8V79PV0TGKD6TTKUJNFPVJVVV4KQNSO5AEMVJF66Q9ASUAAJG'}}
Licensed under CC BY-SA 3.0