Authentication using JWT (JSON Web Token)
Smita Vatgal
Engineer Golang/Python | Microservices | DevOps | AWS | Kubernetes | CICD | Automation
In Episode 2, we learned how to integrate a database to store the requests sent to the API. Now, let’s explore how to protect the APIs we’ve created.
Here’s the link for Episode 2.
Why should we protect APIs?
Protecting APIs is crucial for several reasons, primarily revolving around security, data integrity, and user privacy. APIs are often used to expose critical functionalities and data of an application. Without proper protection, anyone can access your API, leading to security breaches. Unprotected APIs can become targets for attackers who seek to steal sensitive user information, financial data, or intellectual property.
Authentication is a fundamental part of building secure and reliable applications that can scale safely in a complex, interconnected environment.
Why JWT?
There are several authentication methods besides JWT, each with its own use cases.
Examples: Session-Based Authentication, OAuth, API Key Authentication etc..
We chose JWT because it's stateless and scalable, meaning no need to store session data in a database, making it ideal for APIs and microservices. It's widely used in APIs and it's easy to implement in FastAPI :).
JWT stands for JSON Web Token, which is an open standard for securely transmitting information between parties as a JSON object.
It has 3 parts : Header, Payload and Signature
How Does JWT Authentication Work?
领英推荐
How to Integrate with FastAPI?
First step install dependencies..
pip install fastapi uvicorn sqlalchemy pydantic passlib python-jose
User details are needed to create token and validation so we need user class
class UserDB(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
Derive JWT settings like secret key, expiry limit etc.
SECRET_KEY = "myfastapisecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
Functions to verify password, authenticate user and access token
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def authenticate_user(username: str, password: str, db: Session):
user = db.query(UserDB).filter(UserDB.username == username).first()
if not user or not verify_password(password, user.hashed_password):
return None
return user
Complete code looks like below
from fastapi import FastAPI, HTTPException, Depends, status
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from typing import List
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# SQLite Database URL
DATABASE_URL = "sqlite:///./test.db"
# Create Engine & Session
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base Class for ORM Models
Base = declarative_base()
# Database Model
class ItemDB(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
age = Column(Integer)
education = Column(String, nullable=True)
# User Model and Database (add a simple user model)
class UserDB(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
# Create Tables
Base.metadata.create_all(bind=engine)
# Password Hashing and JWT Settings
SECRET_KEY = "myfastapisecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Pydantic Models
class ItemCreate(BaseModel):
name: str
age: int
education: str = None
class ItemResponse(ItemCreate):
id: int
class User(BaseModel):
username: str
class Token(BaseModel):
access_token: str
token_type: str
# Dependency for DB Session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Utility Functions for JWT Authentication
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def authenticate_user(username: str, password: str, db: Session):
user = db.query(UserDB).filter(UserDB.username == username).first()
if not user or not verify_password(password, user.hashed_password):
return None
return user
# FastAPI App
app = FastAPI()
# Create User (For registering a new user)
@app.post("/users/")
def create_user(username: str, password: str, db: Session = Depends(get_db)):
hashed_password = pwd_context.hash(password)
db_user = UserDB(username=username, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return {"message": "User created successfully"}
# Login to get JWT Token
@app.post("/token", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(
{"sub": user.username}, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
# Protect Routes with JWT
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(UserDB).filter(UserDB.username == username).first()
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# CRUD Operations for Items (Protected with JWT)
@app.post("/items/", response_model=ItemResponse)
def create_item(item: ItemCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
db_item = ItemDB(**item.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@app.get("/items/", response_model=List[ItemResponse])
def get_items(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
return db.query(ItemDB).all()
@app.get("/items/{item_id}", response_model=ItemResponse)
def get_item(item_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
item = db.query(ItemDB).filter(ItemDB.id == item_id).first()
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.put("/items/{item_id}", response_model=ItemResponse)
def update_item(item_id: int, item: ItemCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
db_item = db.query(ItemDB).filter(ItemDB.id == item_id).first()
if not db_item:
raise HTTPException(status_code=404, detail="Item not found")
for key, value in item.dict().items():
setattr(db_item, key, value)
db.commit()
db.refresh(db_item)
return db_item
@app.delete("/items/{item_id}", response_model=ItemResponse)
def delete_item(item_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
db_item = db.query(ItemDB).filter(ItemDB.id == item_id).first()
if not db_item:
raise HTTPException(status_code=404, detail="Item not found")
db.delete(db_item)
db.commit()
return db_item
How will you test this?
Happy coding!