Files
fastapi-cache/fastapi_cache/backends/dynamodb.py

94 lines
3.1 KiB
Python
Raw Normal View History

2021-09-29 16:14:28 +02:00
import datetime
2022-11-07 16:39:17 +08:00
from typing import Optional, Tuple
2021-10-06 10:10:22 +02:00
2022-10-22 20:59:37 +04:00
from aiobotocore.client import AioBaseClient
from aiobotocore.session import get_session, AioSession
2021-09-29 16:14:28 +02:00
from fastapi_cache.backends import Backend
class DynamoBackend(Backend):
"""
Amazon DynamoDB backend provider
This backend requires an existing table within your AWS environment to be passed during
backend init. If ttl is going to be used, this needs to be manually enabled on the table
using the `ttl` key. Dynamo will take care of deleting outdated objects, but this is not
instant so don't be alarmed when they linger around for a bit.
As with all AWS clients, credentials will be taken from the environment. Check the AWS SDK
for more information.
2021-09-29 16:22:04 +02:00
Usage:
>> dynamodb = DynamoBackend(table_name="your-cache", region="eu-west-1")
>> await dynamodb.init()
>> FastAPICache.init(dynamodb)
2021-09-29 16:14:28 +02:00
"""
2022-10-22 20:59:37 +04:00
def __init__(self, table_name: str, region: Optional[str] = None) -> None:
self.session: AioSession = get_session()
2022-10-22 20:59:37 +04:00
self.client: Optional[AioBaseClient] = None # Needs async init
2021-09-29 16:14:28 +02:00
self.table_name = table_name
self.region = region
2022-10-22 20:59:37 +04:00
async def init(self) -> None:
2021-10-06 10:10:22 +02:00
self.client = await self.session.create_client(
"dynamodb", region_name=self.region
).__aenter__()
2021-09-29 16:14:28 +02:00
2022-10-22 20:59:37 +04:00
async def close(self) -> None:
2021-09-29 16:14:28 +02:00
self.client = await self.client.__aexit__(None, None, None)
async def get_with_ttl(self, key: str) -> Tuple[int, str]:
2021-10-06 10:10:22 +02:00
response = await self.client.get_item(TableName=self.table_name, Key={"key": {"S": key}})
2021-09-29 16:14:28 +02:00
if "Item" in response:
value = response["Item"].get("value", {}).get("S")
ttl = response["Item"].get("ttl", {}).get("N")
if not ttl:
return -1, value
# It's only eventually consistent so we need to check ourselves
expire = int(ttl) - int(datetime.datetime.now().timestamp())
if expire > 0:
return expire, value
return 0, None
2022-10-22 20:59:37 +04:00
async def get(self, key: str) -> str:
2021-10-06 10:10:22 +02:00
response = await self.client.get_item(TableName=self.table_name, Key={"key": {"S": key}})
2021-09-29 16:14:28 +02:00
if "Item" in response:
return response["Item"].get("value", {}).get("S")
2022-10-22 20:59:37 +04:00
async def set(self, key: str, value: str, expire: Optional[int] = None) -> None:
2021-10-06 10:10:22 +02:00
ttl = (
{
"ttl": {
"N": str(
int(
(
datetime.datetime.now() + datetime.timedelta(seconds=expire)
).timestamp()
)
)
}
}
if expire
else {}
)
2021-09-29 16:14:28 +02:00
2021-10-07 17:34:06 +02:00
await self.client.put_item(
2021-09-29 16:14:28 +02:00
TableName=self.table_name,
2021-10-06 10:10:22 +02:00
Item={
**{
"key": {"S": key},
"value": {"S": value},
2021-09-29 16:14:28 +02:00
},
2021-10-06 10:10:22 +02:00
**ttl,
},
2021-09-29 16:14:28 +02:00
)
2022-10-22 20:59:37 +04:00
async def clear(self, namespace: Optional[str] = None, key: Optional[str] = None) -> int:
2021-09-29 16:14:28 +02:00
raise NotImplementedError