-
-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathauth.py
More file actions
52 lines (41 loc) · 1.66 KB
/
auth.py
File metadata and controls
52 lines (41 loc) · 1.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pvsite_datamodel.sqlmodels import ClientSQL
from sqlalchemy.orm import Session
from .session import get_session
token_auth_scheme = HTTPBearer()
class Auth:
"""Fast api dependency that validates an JWT token."""
def __init__(self, domain: str, api_audience: str, algorithm: str):
self._domain = domain
self._api_audience = api_audience
self._algorithm = algorithm
self._jwks_client = jwt.PyJWKClient(f"https://{domain}/.well-known/jwks.json")
def __call__(
self,
auth_credentials: HTTPAuthorizationCredentials = Depends(token_auth_scheme),
session: Session = Depends(get_session),
):
token = auth_credentials.credentials
try:
signing_key = self._jwks_client.get_signing_key_from_jwt(token).key
except (jwt.exceptions.PyJWKClientError, jwt.exceptions.DecodeError) as e:
raise HTTPException(status_code=401, detail=str(e))
try:
jwt.decode(
token,
signing_key,
algorithms=self._algorithm,
audience=self._api_audience,
issuer=f"https://{self._domain}/",
)
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
if session is None:
return None
# @TODO: get client corresponding to auth
# See: https://github.com/openclimatefix/pv-site-api/issues/90
client = session.query(ClientSQL).first()
assert client is not None
return client