def __init__(
self,
app: ASGIApp,
providers: typing.Dict[str, typing.Dict[str, typing.Any]],
public_paths: typing.Optional[typing.Set[str]] = None,
get_keys: typing.Optional[typing.Callable[[typing.Any], typing.Any]] = None,
key_refresh_minutes: typing.Optional[
typing.Union[int, typing.Dict[str, int]]
] = None,
) -> None:
self._app = app
for provider in providers:
_validate_provider(provider, providers[provider])
self._providers = providers
self._get_keys = get_keys or _get_keys
self._public_paths = public_paths or set()
if key_refresh_minutes is None:
self._timeout = {provider: None for provider in providers}
elif isinstance(key_refresh_minutes, dict):
self._timeout = {
provider: datetime.timedelta(minutes=key_refresh_minutes[provider])
for provider in providers
}
else:
self._timeout = {
provider: datetime.timedelta(minutes=key_refresh_minutes)
for provider in providers
}
# cached attribute and respective timeout
self._last_retrieval: typing.Dict[str, datetime.datetime] = {}
self._keys: typing.Dict[str, typing.Any] = {}