Authentication using JWT (JSON Web Token)

Authentication using JWT (JSON Web Token)

In Episode 2, we learned how to integrate a database to store the requests sent to the API. Now, let’s explore how to protect the APIs we’ve created.

Here’s the link for Episode 2.

Why should we protect APIs?

Protecting APIs is crucial for several reasons, primarily revolving around security, data integrity, and user privacy. APIs are often used to expose critical functionalities and data of an application. Without proper protection, anyone can access your API, leading to security breaches. Unprotected APIs can become targets for attackers who seek to steal sensitive user information, financial data, or intellectual property.

Authentication is a fundamental part of building secure and reliable applications that can scale safely in a complex, interconnected environment.

Why JWT?

There are several authentication methods besides JWT, each with its own use cases.

Examples: Session-Based Authentication, OAuth, API Key Authentication etc..

We chose JWT because it's stateless and scalable, meaning no need to store session data in a database, making it ideal for APIs and microservices. It's widely used in APIs and it's easy to implement in FastAPI :).

JWT stands for JSON Web Token, which is an open standard for securely transmitting information between parties as a JSON object.

It has 3 parts : Header, Payload and Signature

How Does JWT Authentication Work?

  • User Login: The user sends their credentials (username and password) to the server.
  • Generate Token: If the credentials are valid, the server generates a JWT and sends it back to the client.
  • Token Storage: The client stores the JWT (usually in local storage or a cookie).
  • Make Authenticated Requests: The client includes the JWT in the Authorization header of subsequent requests to protected routes.
  • Verify Token: The server verifies the token’s validity and checks its expiration before granting access to protected resources.

How to Integrate with FastAPI?

First step install dependencies..

pip install fastapi uvicorn sqlalchemy pydantic passlib python-jose        

User details are needed to create token and validation so we need user class

class UserDB(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)        

Derive JWT settings like secret key, expiry limit etc.

SECRET_KEY = "myfastapisecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")        

Functions to verify password, authenticate user and access token

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def authenticate_user(username: str, password: str, db: Session):
    user = db.query(UserDB).filter(UserDB.username == username).first()
    if not user or not verify_password(password, user.hashed_password):
        return None
    return user        

Complete code looks like below

from fastapi import FastAPI, HTTPException, Depends, status
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from typing import List
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# SQLite Database URL
DATABASE_URL = "sqlite:///./test.db"

# Create Engine & Session
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base Class for ORM Models
Base = declarative_base()

# Database Model
class ItemDB(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    age = Column(Integer)
    education = Column(String, nullable=True)

# User Model and Database (add a simple user model)
class UserDB(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)


# Create Tables
Base.metadata.create_all(bind=engine)

# Password Hashing and JWT Settings
SECRET_KEY = "myfastapisecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Pydantic Models
class ItemCreate(BaseModel):
    name: str
    age: int
    education: str = None

class ItemResponse(ItemCreate):
    id: int

class User(BaseModel):
    username: str

class Token(BaseModel):
    access_token: str
    token_type: str

# Dependency for DB Session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Utility Functions for JWT Authentication
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def authenticate_user(username: str, password: str, db: Session):
    user = db.query(UserDB).filter(UserDB.username == username).first()
    if not user or not verify_password(password, user.hashed_password):
        return None
    return user


# FastAPI App
app = FastAPI()

# Create User (For registering a new user)

@app.post("/users/")
def create_user(username: str, password: str, db: Session = Depends(get_db)):
    hashed_password = pwd_context.hash(password)
    db_user = UserDB(username=username, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return {"message": "User created successfully"}

# Login to get JWT Token
@app.post("/token", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = authenticate_user(form_data.username, form_data.password, db)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid credentials")

    access_token = create_access_token(
        {"sub": user.username}, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    return {"access_token": access_token, "token_type": "bearer"}

# Protect Routes with JWT
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if not username:
            raise HTTPException(status_code=401, detail="Invalid token")
        user = db.query(UserDB).filter(UserDB.username == username).first()
        if not user:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

# CRUD Operations for Items (Protected with JWT)
@app.post("/items/", response_model=ItemResponse)
def create_item(item: ItemCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    db_item = ItemDB(**item.dict())
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

@app.get("/items/", response_model=List[ItemResponse])
def get_items(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    return db.query(ItemDB).all()

@app.get("/items/{item_id}", response_model=ItemResponse)
def get_item(item_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    item = db.query(ItemDB).filter(ItemDB.id == item_id).first()
    if not item:
        raise HTTPException(status_code=404, detail="Item not found")
    return item

@app.put("/items/{item_id}", response_model=ItemResponse)
def update_item(item_id: int, item: ItemCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    db_item = db.query(ItemDB).filter(ItemDB.id == item_id).first()
    if not db_item:
        raise HTTPException(status_code=404, detail="Item not found")

    for key, value in item.dict().items():
        setattr(db_item, key, value)

    db.commit()
    db.refresh(db_item)
    return db_item

@app.delete("/items/{item_id}", response_model=ItemResponse)
def delete_item(item_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    db_item = db.query(ItemDB).filter(ItemDB.id == item_id).first()
    if not db_item:
        raise HTTPException(status_code=404, detail="Item not found")

    db.delete(db_item)
    db.commit()
    return db_item        

How will you test this?

  • Open a POSTMAN and create users by hitting API : "https://127.0.0.1:8000/users/?username=test&password=pass123"
  • Create a token by passing created user credentials in API : https://127.0.0.1:8000/token/".
  • Test remaining API by passing the generated token value in bearer token.

Happy coding!        

要查看或添加评论,请登录

Smita Vatgal的更多文章

  • Git Isn’t Optional !

    Git Isn’t Optional !

    In Episode 4 we understood the need for dockerization and building basic docker file. Here is the link : https://www.

  • Dockerizing a REST API with JWT Authentication...

    Dockerizing a REST API with JWT Authentication...

    In Episode 3, we understood the need for JWT authentication and how to implement it in a REST API. link Everything…

    1 条评论
  • Data Visualization: Simplifying Complexity at a Glance

    Data Visualization: Simplifying Complexity at a Glance

    A chart can tell a story that rows of numbers can’t. That’s the power of data visualization! For example below is the…

    1 条评论
  • Integrating SQLite with FastAPI

    Integrating SQLite with FastAPI

    In our previous article, we walked through a step-by-step guide to building a simple REST API using FastAPI. Here’s the…

  • Building a Simple REST API with FastAPI...

    Building a Simple REST API with FastAPI...

    What is an API? An API (Application Programming Interface) is a set of rules and protocols that allows different…

    1 条评论

社区洞察

其他会员也浏览了