mirror of
https://github.com/long2ice/fastapi-cache.git
synced 2026-03-24 20:47:54 +00:00
This ensures that any syntax issues are caught early (by type checkers and tests). Backends that are missing dependencies are skipped. By importing, this exposed an issue where the redis type annotations raised an exception, which has been fixed by using forward annotations. To help avoid import dependency hell, the Backend ABC has been moved to `fastapi_cache.types`. In the process, it has been made an actual ABC.
104 lines
3.3 KiB
Python
104 lines
3.3 KiB
Python
import datetime
|
|
from typing import TYPE_CHECKING, Optional, Tuple
|
|
|
|
from aiobotocore.client import AioBaseClient
|
|
from aiobotocore.session import AioSession, get_session
|
|
|
|
from fastapi_cache.types import Backend
|
|
|
|
if TYPE_CHECKING:
|
|
from types_aiobotocore_dynamodb import DynamoDBClient
|
|
else:
|
|
DynamoDBClient = AioBaseClient
|
|
|
|
|
|
class DynamoBackend(Backend):
|
|
"""
|
|
Amazon DynamoDB backend provider
|
|
|
|
This backend requires an existing table within your AWS environment to be passed during
|
|
backend init. If ttl is going to be used, this needs to be manually enabled on the table
|
|
using the `ttl` key. Dynamo will take care of deleting outdated objects, but this is not
|
|
instant so don't be alarmed when they linger around for a bit.
|
|
|
|
As with all AWS clients, credentials will be taken from the environment. Check the AWS SDK
|
|
for more information.
|
|
|
|
Usage:
|
|
>> dynamodb = DynamoBackend(table_name="your-cache", region="eu-west-1")
|
|
>> await dynamodb.init()
|
|
>> FastAPICache.init(dynamodb)
|
|
"""
|
|
|
|
client: DynamoDBClient
|
|
session: AioSession
|
|
table_name: str
|
|
region: Optional[str]
|
|
|
|
def __init__(self, table_name: str, region: Optional[str] = None) -> None:
|
|
self.session: AioSession = get_session()
|
|
self.table_name = table_name
|
|
self.region = region
|
|
|
|
async def init(self) -> None:
|
|
self.client = await self.session.create_client( # pyright: ignore[reportUnknownMemberType]
|
|
"dynamodb", region_name=self.region
|
|
).__aenter__()
|
|
|
|
async def close(self) -> None:
|
|
self.client = await self.client.__aexit__(None, None, None)
|
|
|
|
async def get_with_ttl(self, key: str) -> Tuple[int, Optional[bytes]]:
|
|
response = await self.client.get_item(TableName=self.table_name, Key={"key": {"S": key}})
|
|
|
|
if "Item" in response:
|
|
value = response["Item"].get("value", {}).get("B")
|
|
ttl = response["Item"].get("ttl", {}).get("N")
|
|
|
|
if not ttl:
|
|
return -1, value
|
|
|
|
# It's only eventually consistent so we need to check ourselves
|
|
expire = int(ttl) - int(datetime.datetime.now().timestamp())
|
|
if expire > 0:
|
|
return expire, value
|
|
|
|
return 0, None
|
|
|
|
async def get(self, key: str) -> Optional[bytes]:
|
|
response = await self.client.get_item(TableName=self.table_name, Key={"key": {"S": key}})
|
|
if "Item" in response:
|
|
return response["Item"].get("value", {}).get("B")
|
|
return None
|
|
|
|
async def set(self, key: str, value: bytes, expire: Optional[int] = None) -> None:
|
|
ttl = (
|
|
{
|
|
"ttl": {
|
|
"N": str(
|
|
int(
|
|
(
|
|
datetime.datetime.now() + datetime.timedelta(seconds=expire)
|
|
).timestamp()
|
|
)
|
|
)
|
|
}
|
|
}
|
|
if expire
|
|
else {}
|
|
)
|
|
|
|
await self.client.put_item(
|
|
TableName=self.table_name,
|
|
Item={
|
|
**{
|
|
"key": {"S": key},
|
|
"value": {"B": value},
|
|
},
|
|
**ttl,
|
|
},
|
|
)
|
|
|
|
async def clear(self, namespace: Optional[str] = None, key: Optional[str] = None) -> int:
|
|
raise NotImplementedError
|