-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
76 lines (66 loc) · 2.74 KB
/
auth.py
File metadata and controls
76 lines (66 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# auth.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from database import get_db
from datetime import timedelta
from jose import JWTError, jwt
from typing import Optional
import models, schemas, utils
from database import get_db
import bcrypt
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ---------------------------
# REGISTER
# ---------------------------
@router.post("/register", response_model=schemas.UserOut)
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
existing_user = db.query(models.User).filter(models.User.email == user.email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
hashed_password = utils.get_password_hash(user.password)
new_user = models.User(email=user.email, hashed_password=hashed_password)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# ---------------------------
# LOGIN
# ---------------------------
# @router.post("/login", response_model=schemas.Token)
# def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
@router.post("/login")
def login(data: schemas.LoginRequest, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.email == data.email).first()
if not user or not utils.verify_password(data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token_expires = timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = utils.create_access_token(
data={"sub": user.email},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# ---------------------------
# GET CURRENT USER
# ---------------------------
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, utils.SECRET_KEY, algorithms=[utils.ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.email == email).first()
if user is None:
raise credentials_exception
return user
@router.get("/me", response_model=schemas.UserOut)
def read_current_user(current_user: models.User = Depends(get_current_user)):
return current_user