DynamoDB in examples, Example 1.2: Asynchronous api calls (tornado)
The trick is to use tornado AsyncHTTPClient instead of one used in botocore.
tornado-botocore just patch botocore code, it is dirty workaround but I didn't found easier way to make it works.
I use it with botocore version 0.65.0 and It may be incompatible with other versions.
pip freeze botocore==0.65.0 tornado==4.1 tornado-botocore==0.1.6
Updated User Wallet example:
import logging import re import six import uuid from functools import partial from tornado import gen from tornado.ioloop import IOLoop from tornado_botocore import Botocore logger = logging.getLogger() class AmazonException(Exception): def __init__(self, message, code='unknown'): self.message = message self.code = code def __str__(self): return self.message class DDBException(AmazonException): ITEM_ENCODE_ERROR = 'ItemEncodeError' ITEM_DECODE_ERROR = 'ItemDecodeError' class DDBField(object): @classmethod def _validate(cls, value): raise NotImplementedError('Not implemented.') @classmethod def decode(cls, value): try: return cls._validate(value) except (TypeError, ValueError): raise DDBException( message='Invalid value for {cls} decode.'.format(cls=cls.__name__), code=DDBException.ITEM_DECODE_ERROR) @classmethod def encode(cls, value): try: return str(cls._validate(value)) except (TypeError, ValueError): raise DDBException( message='Invalid value for {cls} encode.'.format(cls=cls.__name__), code=DDBException.ITEM_ENCODE_ERROR) class DDBIntField(DDBField): AMAZON_TYPE = 'N' @classmethod def _validate(cls, value): if isinstance(value, int): return value return int(value) class DDBUUIDField(DDBField): AMAZON_TYPE = 'S' _UUID_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}') @classmethod def _validate(cls, value): if not isinstance(value, str): value = str(value) if cls._UUID_REGEXP.match(value) is None: raise ValueError('UUID required.') return value class DDBTable(object): TABLE_NAME = '' REGION_NAME = 'us-west-2' KEY_SCHEMA = [] LOCAL_SECONDARY_INDEXES = [] GLOBAL_SECONDARY_INDEXES = [] PROVISIONED_THROUGHPUT = {} FIELDS = {} _AMAZON_SESSION = None def _get_table_name(self): return self.TABLE_NAME def _get_table_kwargs(self): key_fields = set() for key in self.KEY_SCHEMA: key_fields.add(key['AttributeName']) for index in self.LOCAL_SECONDARY_INDEXES: for key in index['KeySchema']: key_fields.add(key['AttributeName']) for index in self.GLOBAL_SECONDARY_INDEXES: for key in index['KeySchema']: key_fields.add(key['AttributeName']) attribute_definitions = [] for field_name in key_fields: attribute_definitions.append({ 'AttributeName': field_name, 'AttributeType': self.FIELDS[field_name].AMAZON_TYPE }) kwargs = { 'TableName': self._get_table_name(), 'AttributeDefinitions': attribute_definitions, 'KeySchema': self.KEY_SCHEMA, 'ProvisionedThroughput': self.PROVISIONED_THROUGHPUT, } if getattr(self, 'LOCAL_SECONDARY_INDEXES', None): kwargs['LocalSecondaryIndexes'] = self.LOCAL_SECONDARY_INDEXES if getattr(self, 'GLOBAL_SECONDARY_INDEXES', None): kwargs['GlobalSecondaryIndexes'] = self.GLOBAL_SECONDARY_INDEXES return kwargs def _get_endpoint_url(self): return None def _dynamodb(self, operation): if DDBTable._AMAZON_SESSION is None: ddb_operation = Botocore( service='dynamodb', operation=operation, region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url()) DDBTable._AMAZON_SESSION = ddb_operation.session else: ddb_operation = Botocore( service='dynamodb', operation=operation, region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url(), session=DDBTable._AMAZON_SESSION) return ddb_operation def create_table(self): try: message = self._dynamodb(operation='DescribeTable').call( TableName=self._get_table_name()) except AmazonException as e: if e.code != 'ResourceNotFoundException': raise e logger.warning('Creation {table_name} table ...'.format( table_name=self._get_table_name())) message = self._dynamodb(operation='CreateTable').call( **self._get_table_kwargs()) else: logger.warning('{table_name} table already exists.'.format( table_name=self._get_table_name())) def encode_item(self, data, keys=None, update=False): if not data: return {} keys = keys or data.keys() item = {} for key in keys: if key not in data: continue val = self.FIELDS[key].encode(value=data[key]) if update: item[key] = { 'Value': { self.FIELDS[key].AMAZON_TYPE: val }, 'Action': 'PUT' } else: item[key] = { self.FIELDS[key].AMAZON_TYPE: val } return item def decode_item(self, item, keys=None): data = {} for key, val in six.iteritems(item): if key not in self.FIELDS: continue if keys and key not in keys: continue data[key] = self.FIELDS[key].decode( val[self.FIELDS[key].AMAZON_TYPE]) return data class DDBUserWallet(DDBTable): TABLE_NAME = 'user_wallet' KEY_SCHEMA = [{ 'AttributeName': 'user_id', 'KeyType': 'HASH', }] PROVISIONED_THROUGHPUT = { 'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1 } FIELDS = { 'user_id': DDBUUIDField, 'balance': DDBIntField, } @gen.coroutine def update(self, user_id, balance): message = yield gen.Task(self._dynamodb(operation='UpdateItem').call, TableName=self._get_table_name(), Key=self.encode_item(data={'user_id': user_id}), AttributeUpdates=self.encode_item(data={'balance': balance}, update=True)) raise gen.Return(message) @gen.coroutine def get(self, user_id): message = yield gen.Task(self._dynamodb(operation='GetItem').call, TableName=self._get_table_name(), Key=self.encode_item(data={'user_id': user_id})) data = self.decode_item(item=message['Item']) raise gen.Return(data) # not required, just for example def update_(self, user_id, balance): """ Synchronous method """ message = self._dynamodb(operation='UpdateItem').call( TableName=self._get_table_name(), Key=self.encode_item(data={'user_id': user_id}), AttributeUpdates=self.encode_item(data={'balance': balance}, update=True)) return message def get_(self, user_id, callback): """ Example without coroutine """ return self._dynamodb(operation='GetItem').call( TableName=self._get_table_name(), Key=self.encode_item(data={'user_id': user_id}), callback=callback) if __name__ == '__main__': user_id = uuid.uuid4() user_wallet = DDBUserWallet() user_wallet.create_table() # You still can run code synchronous if required user_wallet.update_(user_id=user_id, balance=100) # run asynchronous with callback user_wallet.get_(user_id=user_id, callback=print) # You even can run methods wrapped with @coroutine synchronously ioloop = IOLoop.instance() result = ioloop.run_sync(partial(user_wallet.get, user_id=user_id)) print(result) # output: # WARNING:root:user_wallet table already exists. # {'Item': {'user_id': {'S': 'badff6d6-41d4-46fb-ae74-ba19a2e69cb1'}, 'balance': {'N': '100'}}, 'ResponseMetadata': {'RequestId': '469JL4V0C9GQQJUHCF512VUGVNVV4KQNSO5AEMVJF66Q9ASUAAJG'}} # {'user_id': 'badff6d6-41d4-46fb-ae74-ba19a2e69cb1', 'balance': 100}
Tornado application:
from main import DDBUserWallet from tornado import web, ioloop, gen, options class UserWalletHandler(web.RequestHandler): @gen.coroutine def post(self, user_id): balance = self.get_body_argument('balance') user_wallet = DDBUserWallet() yield user_wallet.update(user_id=user_id, balance=int(balance)) self.write('Updated\n') @gen.coroutine def get(self, user_id): user_wallet = DDBUserWallet() response = yield user_wallet.get(user_id=user_id) self.write('{balance}\n'.format(balance=response['balance'])) application = web.Application([ (r'/wallet/(?P<user_id>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})', UserWalletHandler), ], debug=True) if __name__ == "__main__": options.parse_command_line() DDBUserWallet().create_table() application.listen(5000) ioloop.IOLoop.instance().start() # nanvel-air:example_1_2 nanvel$ curl --data "balance=123" http://localhost:5000/wallet/aa4d10c5-dd78-42ca-a077-3789b52ebbe3 # Updated # nanvel-air:example_1_2 nanvel$ curl http://localhost:5000/wallet/aa4d10c5-dd78-42ca-a077-3789b52ebbe3 # 123
Licensed under CC BY-SA 3.0