Files
fastapi-cache/fastapi_cache/decorator.py

233 lines
8.1 KiB
Python
Raw Normal View History

import logging
2022-10-25 08:52:59 +07:00
import sys
from functools import wraps
from inspect import Parameter, Signature, isawaitable, iscoroutinefunction
from typing import (
Awaitable,
Callable,
List,
Optional,
Type,
TypeVar,
Union,
cast,
)
2022-10-30 11:03:16 +04:00
2022-10-25 08:52:59 +07:00
if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
from fastapi.concurrency import run_in_threadpool
from fastapi.dependencies.utils import (
get_typed_return_annotation,
get_typed_signature,
)
from starlette.requests import Request
from starlette.responses import Response
from starlette.status import HTTP_304_NOT_MODIFIED
2020-08-26 18:04:57 +08:00
from fastapi_cache import FastAPICache
from fastapi_cache.coder import Coder
from fastapi_cache.types import KeyBuilder
2020-08-26 18:04:57 +08:00
logger: logging.Logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
2022-10-25 08:52:59 +07:00
P = ParamSpec("P")
R = TypeVar("R")
def _augment_signature(signature: Signature, *extra: Parameter) -> Signature:
if not extra:
return signature
parameters = list(signature.parameters.values())
2023-05-09 17:33:07 +01:00
variadic_keyword_params: List[Parameter] = []
while parameters and parameters[-1].kind is Parameter.VAR_KEYWORD:
variadic_keyword_params.append(parameters.pop())
return signature.replace(parameters=[*parameters, *extra, *variadic_keyword_params])
def _locate_param(
sig: Signature, dep: Parameter, to_inject: List[Parameter]
) -> Parameter:
"""Locate an existing parameter in the decorated endpoint
If not found, returns the injectable parameter, and adds it to the to_inject list.
"""
param = next(
(p for p in sig.parameters.values() if p.annotation is dep.annotation), None
)
if param is None:
to_inject.append(dep)
param = dep
return param
def _uncacheable(request: Optional[Request]) -> bool:
"""Determine if this request should not be cached
Returns true if:
- Caching has been disabled globally
- This is not a GET request
- The request has a Cache-Control header with a value of "no-store"
"""
if not FastAPICache.get_enable():
return True
if request is None:
return False
if request.method != "GET":
return True
return request.headers.get("Cache-Control") == "no-store"
2020-08-26 18:04:57 +08:00
def cache(
2022-10-25 08:52:59 +07:00
expire: Optional[int] = None,
2022-10-30 11:03:16 +04:00
coder: Optional[Type[Coder]] = None,
key_builder: Optional[KeyBuilder] = None,
namespace: str = "",
injected_dependency_namespace: str = "__fastapi_cache",
) -> Callable[[Union[Callable[P, Awaitable[R]], Callable[P, R]]], Callable[P, Awaitable[Union[R, Response]]]]:
2020-08-26 18:04:57 +08:00
"""
cache all function
2024-07-24 22:24:30 +08:00
:param injected_dependency_namespace:
2020-08-26 18:04:57 +08:00
:param namespace:
:param expire:
:param coder:
:param key_builder:
2022-08-01 00:04:10 +02:00
2020-08-26 18:04:57 +08:00
:return:
"""
injected_request = Parameter(
name=f"{injected_dependency_namespace}_request",
annotation=Request,
kind=Parameter.KEYWORD_ONLY,
)
injected_response = Parameter(
name=f"{injected_dependency_namespace}_response",
annotation=Response,
kind=Parameter.KEYWORD_ONLY,
)
def wrapper(
func: Union[Callable[P, Awaitable[R]], Callable[P, R]]
) -> Callable[P, Awaitable[Union[R, Response]]]:
# get_typed_signature ensures that any forward references are resolved first
wrapped_signature = get_typed_signature(func)
2023-05-09 17:12:50 +01:00
to_inject: List[Parameter] = []
request_param = _locate_param(wrapped_signature, injected_request, to_inject)
response_param = _locate_param(wrapped_signature, injected_response, to_inject)
return_type = get_typed_return_annotation(func)
2020-08-26 18:04:57 +08:00
@wraps(func)
async def inner(*args: P.args, **kwargs: P.kwargs) -> Union[R, Response]:
nonlocal coder
nonlocal expire
nonlocal key_builder
2022-10-25 08:52:59 +07:00
async def ensure_async_func(*args: P.args, **kwargs: P.kwargs) -> R:
"""Run cached sync functions in thread pool just like FastAPI."""
# if the wrapped function does NOT have request or response in
# its function signature, make sure we don't pass them in as
# keyword arguments
kwargs.pop(injected_request.name, None)
kwargs.pop(injected_response.name, None)
if iscoroutinefunction(func):
2022-10-14 14:09:02 +02:00
# async, return as is.
# unintuitively, we have to await once here, so that caller
# does not have to await twice. See
# https://stackoverflow.com/a/59268198/532513
2024-11-11 08:29:14 +00:00
return await func(*args, **kwargs) # type: ignore[no-any-return]
else:
# sync, wrap in thread and return async
2022-10-14 14:09:02 +02:00
# see above why we have to await even although caller also awaits.
2023-05-09 17:08:32 +01:00
return await run_in_threadpool(func, *args, **kwargs) # type: ignore[arg-type]
2021-01-06 20:00:58 +08:00
copy_kwargs = kwargs.copy()
2023-05-09 17:08:32 +01:00
request: Optional[Request] = copy_kwargs.pop(request_param.name, None) # type: ignore[assignment]
response: Optional[Response] = copy_kwargs.pop(response_param.name, None) # type: ignore[assignment]
if _uncacheable(request):
return await ensure_async_func(*args, **kwargs)
prefix = FastAPICache.get_prefix()
coder = coder or FastAPICache.get_coder()
expire = expire or FastAPICache.get_expire()
key_builder = key_builder or FastAPICache.get_key_builder()
2020-08-26 18:04:57 +08:00
backend = FastAPICache.get_backend()
cache_status_header = FastAPICache.get_cache_status_header()
2021-01-06 20:00:58 +08:00
cache_key = key_builder(
func,
f"{prefix}:{namespace}",
request=request,
response=response,
args=args,
kwargs=copy_kwargs,
)
if isawaitable(cache_key):
cache_key = await cache_key
assert isinstance(cache_key, str) # noqa: S101 # assertion is a type guard
try:
ttl, cached = await backend.get_with_ttl(cache_key)
except Exception:
2023-02-15 10:45:19 +08:00
logger.warning(
f"Error retrieving cache key '{cache_key}' from backend:",
exc_info=True,
2023-02-15 10:45:19 +08:00
)
ttl, cached = 0, None
if cached is None or (request is not None and request.headers.get("Cache-Control") == "no-cache") : # cache miss
result = await ensure_async_func(*args, **kwargs)
to_cache = coder.encode(result)
try:
await backend.set(cache_key, to_cache, expire)
except Exception:
2023-02-15 10:45:19 +08:00
logger.warning(
f"Error setting cache key '{cache_key}' in backend:",
exc_info=True,
2023-02-15 10:45:19 +08:00
)
2020-08-26 18:04:57 +08:00
if response:
response.headers.update(
{
"Cache-Control": f"max-age={expire}",
"ETag": f"W/{hash(to_cache)}",
cache_status_header: "MISS",
}
)
else: # cache hit
2020-08-26 18:04:57 +08:00
if response:
etag = f"W/{hash(cached)}"
response.headers.update(
{
"Cache-Control": f"max-age={ttl}",
"ETag": etag,
cache_status_header: "HIT",
}
)
if_none_match = request and request.headers.get("if-none-match")
2020-08-26 18:04:57 +08:00
if if_none_match == etag:
response.status_code = HTTP_304_NOT_MODIFIED
2020-08-26 18:04:57 +08:00
return response
2022-08-01 00:04:10 +02:00
result = cast(R, coder.decode_as_type(cached, type_=return_type))
2024-11-11 08:29:14 +00:00
if response and isinstance(result, Response):
result.headers.update(response.headers)
return result
2020-08-26 18:04:57 +08:00
inner.__signature__ = _augment_signature(wrapped_signature, *to_inject) # type: ignore[attr-defined]
2020-08-26 18:04:57 +08:00
return inner
return wrapper