Authentication Guide¶
Complete authentication with JWT tokens, refresh tokens, rate limiting, and authorization.
Setup¶
Configure Settings¶
from s3verless.core.settings import S3verlessSettings
settings = S3verlessSettings(
aws_bucket_name="my-bucket",
secret_key="your-super-secret-key-min-32-chars",
algorithm="HS256",
access_token_expire_minutes=30,
refresh_token_expire_days=7,
)
Initialize Auth Service¶
User Management¶
Create User¶
user = await auth.create_user(
s3_client,
username="john",
email="john@example.com",
password="SecurePass123!",
full_name="John Doe",
)
Password Requirements¶
Default requirements: - Minimum 8 characters - At least one uppercase letter - At least one lowercase letter - At least one digit
Find Users¶
# By username
user = await auth.get_user_by_username(s3_client, "john")
# By email
user = await auth.get_user_by_email(s3_client, "john@example.com")
# By ID
user = await auth.get_user_by_id(s3_client, user_id)
Authentication¶
Login¶
user = await auth.authenticate_user(s3_client, "john", "SecurePass123!")
if user:
tokens = await auth.create_token_pair(
s3_client,
user,
device_info="Web Browser",
ip_address=request.client.host,
)
# Returns: {access_token, refresh_token, token_type, expires_in}
Refresh Tokens¶
Logout¶
# Revoke single token
await auth.revoke_refresh_token(s3_client, refresh_token)
# Revoke all user tokens (logout everywhere)
await auth.revoke_all_user_tokens(s3_client, user.id)
FastAPI Integration¶
Dependencies¶
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
s3_client = Depends(get_s3_client),
auth: S3AuthService = Depends(get_auth_service),
):
try:
payload = auth.decode_token(token)
user = await auth.get_user_by_username(s3_client, payload["sub"])
if not user or not user.is_active:
raise HTTPException(401, "Invalid credentials")
return user
except Exception:
raise HTTPException(401, "Invalid credentials")
Protected Routes¶
@app.get("/me")
async def get_profile(user: S3User = Depends(get_current_user)):
return user
@app.put("/me")
async def update_profile(
data: UpdateProfile,
user: S3User = Depends(get_current_user),
s3_client = Depends(get_s3_client),
):
# Update user...
pass
Admin-Only Routes¶
async def require_admin(user: S3User = Depends(get_current_user)):
if not user.is_admin:
raise HTTPException(403, "Admin access required")
return user
@app.get("/admin/users")
async def list_users(admin: S3User = Depends(require_admin)):
# Admin-only logic...
pass
Rate Limiting¶
Setup¶
from s3verless.auth.rate_limit import RateLimiter, RateLimit
limiter = RateLimiter(
limits={
"login": RateLimit(max_requests=5, window_seconds=60),
"api": RateLimit(max_requests=100, window_seconds=60),
},
trusted_proxies=["10.0.0.1"], # Load balancer IPs
trust_x_forwarded_for=True,
)
Apply to Routes¶
@app.post("/auth/login")
async def login(request: Request, credentials: LoginRequest):
is_limited, info = await limiter.is_rate_limited(request, "login")
if is_limited:
raise HTTPException(
429,
"Too many requests",
headers=limiter.get_rate_limit_headers(info),
)
# Login logic...
Middleware¶
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
is_limited, info = await limiter.is_rate_limited(request, "api")
if is_limited:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"},
headers=limiter.get_rate_limit_headers(info),
)
response = await call_next(request)
return response
Token Blacklisting¶
For immediate token revocation:
from s3verless.auth.blacklist import TokenBlacklist
blacklist = TokenBlacklist(bucket_name)
# Add to blacklist
await blacklist.add(s3_client, token_jti, expires_at)
# Check in authentication
async def get_current_user(token: str = Depends(oauth2_scheme)):
payload = auth.decode_token(token)
if await blacklist.is_blacklisted(s3_client, payload.get("jti")):
raise HTTPException(401, "Token revoked")
# Continue authentication...
Session Management¶
View Active Sessions¶
sessions = await auth.get_user_active_sessions(s3_client, user.id)
# Returns: [{id, device_info, ip_address, created_at, expires_at}, ...]
Cleanup Expired Tokens¶
Run periodically:
Complete Auth Router Example¶
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register")
async def register(
username: str,
email: str,
password: str,
s3_client = Depends(get_s3_client),
auth: S3AuthService = Depends(get_auth_service),
):
try:
user = await auth.create_user(s3_client, username, email, password)
return {"message": "User created", "user_id": str(user.id)}
except S3ValidationError as e:
raise HTTPException(400, str(e))
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
s3_client = Depends(get_s3_client),
auth: S3AuthService = Depends(get_auth_service),
):
user = await auth.authenticate_user(
s3_client, form_data.username, form_data.password
)
if not user:
raise HTTPException(401, "Invalid credentials")
return await auth.create_token_pair(s3_client, user)
@router.post("/refresh")
async def refresh(
refresh_token: str,
s3_client = Depends(get_s3_client),
auth: S3AuthService = Depends(get_auth_service),
):
try:
return await auth.refresh_access_token(s3_client, refresh_token)
except S3AuthError:
raise HTTPException(401, "Invalid refresh token")
@router.post("/logout")
async def logout(
refresh_token: str,
s3_client = Depends(get_s3_client),
auth: S3AuthService = Depends(get_auth_service),
):
await auth.revoke_refresh_token(s3_client, refresh_token)
return {"message": "Logged out"}
Security Best Practices¶
- Use strong secret keys - At least 32 characters, randomly generated
- Short access token expiry - 15-30 minutes
- Rotate refresh tokens - New refresh token on each use
- Rate limit auth endpoints - Prevent brute force
- Use HTTPS - Always in production
- Validate passwords - Enforce complexity requirements
- Clean up tokens - Run periodic cleanup jobs