Files

465 lines
14 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, EmailStr
from contextlib import asynccontextmanager
import sqlite3
import logging
from datetime import datetime
from typing import List, Optional
import uvicorn
import os
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration
DATABASE_FILE = "db/devbox.sqlite3"
def get_db_connection():
"""Create and return a database connection"""
conn = sqlite3.connect(DATABASE_FILE)
conn.row_factory = sqlite3.Row # This allows dict-like access to rows
return conn
def init_database():
"""Initialize the database with required tables"""
conn = get_db_connection()
try:
conn.execute("""
CREATE TABLE IF NOT EXISTS email_signups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
source TEXT DEFAULT 'devbox-landing',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_agent TEXT,
ip_address TEXT
)
""")
# Create indexes for better performance
conn.execute("CREATE INDEX IF NOT EXISTS idx_email ON email_signups(email)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON email_signups(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON email_signups(source)")
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {e}")
raise
finally:
conn.close()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle application startup and shutdown events"""
# Startup
logger.info("DevBox Email Collection API starting up...")
init_database()
logger.info("DevBox Email Collection API started successfully")
yield
# Shutdown
logger.info("DevBox Email Collection API shutting down...")
# Create FastAPI app with lifespan
app = FastAPI(
title="DevBox Email Collection API",
description="Backend API for collecting email signups for DevBox landing page",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware to allow requests from your frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models
class EmailSignup(BaseModel):
email: EmailStr
source: Optional[str] = "devbox-landing"
class EmailSignupResponse(BaseModel):
success: bool
message: str
email: str
id: Optional[int] = None
class EmailListResponse(BaseModel):
total_count: int
emails: List[dict]
class StatsResponse(BaseModel):
total_signups: int
today_signups: int
week_signups: int
month_signups: int
first_signup: Optional[str]
latest_signup: Optional[str]
sources: dict
class UnsubscribeResponse(BaseModel):
success: bool
message: str
# Helper function to get client IP
def get_client_ip(request: Request) -> str:
"""Extract client IP address from request"""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
# API endpoints
@app.post("/api/subscribe", response_model=EmailSignupResponse)
async def subscribe_email(signup: EmailSignup, request: Request):
"""
Subscribe an email address to the DevBox mailing list
"""
conn = get_db_connection()
try:
# Check if email already exists
existing = conn.execute(
"SELECT id FROM email_signups WHERE email = ?",
(signup.email,)
).fetchone()
if existing:
return EmailSignupResponse(
success=False,
message="Email already subscribed",
email=signup.email
)
# Get client information
user_agent = request.headers.get("User-Agent", "")
ip_address = get_client_ip(request)
# Insert new email
cursor = conn.execute(
"""
INSERT INTO email_signups (email, source, created_at, user_agent, ip_address)
VALUES (?, ?, ?, ?, ?)
""",
(signup.email, signup.source, datetime.utcnow(), user_agent, ip_address)
)
conn.commit()
logger.info(f"New email subscription: {signup.email} from {ip_address}")
return EmailSignupResponse(
success=True,
message="Successfully subscribed to DevBox updates!",
email=signup.email,
id=cursor.lastrowid
)
except sqlite3.IntegrityError:
return EmailSignupResponse(
success=False,
message="Email already subscribed",
email=signup.email
)
except Exception as e:
logger.error(f"Error subscribing email {signup.email}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
conn.close()
@app.get("/api/emails", response_model=EmailListResponse)
async def get_emails(limit: int = 100, offset: int = 0, source: Optional[str] = None):
"""
Get list of subscribed emails (for admin purposes)
Note: In production, add authentication/authorization
"""
conn = get_db_connection()
try:
# Build query with optional source filter
base_query = "SELECT COUNT(*) as count FROM email_signups"
emails_query = """
SELECT id, email, source, created_at, ip_address
FROM email_signups
"""
params = []
if source:
base_query += " WHERE source = ?"
emails_query += " WHERE source = ?"
params.append(source)
# Get total count
total_count = conn.execute(base_query, params).fetchone()["count"]
# Get emails with pagination
emails_query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
emails = conn.execute(emails_query, params).fetchall()
# Convert to list of dicts
email_list = [dict(email) for email in emails]
return EmailListResponse(
total_count=total_count,
emails=email_list
)
except Exception as e:
logger.error(f"Error fetching emails: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
conn.close()
@app.get("/api/stats", response_model=StatsResponse)
async def get_stats():
"""
Get detailed statistics about email signups
"""
conn = get_db_connection()
try:
# Get basic stats
stats = conn.execute("""
SELECT
COUNT(*) as total_signups,
COUNT(CASE WHEN DATE(created_at) = DATE('now') THEN 1 END) as today_signups,
COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-7 days') THEN 1 END) as week_signups,
COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-30 days') THEN 1 END) as month_signups,
MIN(created_at) as first_signup,
MAX(created_at) as latest_signup
FROM email_signups
""").fetchone()
# Get source breakdown
sources = conn.execute("""
SELECT source, COUNT(*) as count
FROM email_signups
GROUP BY source
ORDER BY count DESC
""").fetchall()
sources_dict = {row["source"]: row["count"] for row in sources}
return StatsResponse(
total_signups=stats["total_signups"],
today_signups=stats["today_signups"],
week_signups=stats["week_signups"],
month_signups=stats["month_signups"],
first_signup=stats["first_signup"],
latest_signup=stats["latest_signup"],
sources=sources_dict
)
except Exception as e:
logger.error(f"Error fetching stats: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
conn.close()
@app.delete("/api/unsubscribe/{email}", response_model=UnsubscribeResponse)
async def unsubscribe_email(email: str):
"""
Remove an email from the subscription list
"""
conn = get_db_connection()
try:
cursor = conn.execute("DELETE FROM email_signups WHERE email = ?", (email,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Email not found")
logger.info(f"Email unsubscribed: {email}")
return UnsubscribeResponse(
success=True,
message=f"Email {email} successfully unsubscribed"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error unsubscribing email {email}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
conn.close()
@app.get("/api/export")
async def export_emails(source: Optional[str] = None):
"""
Export emails to CSV format
Note: In production, add authentication/authorization
"""
conn = get_db_connection()
try:
query = """
SELECT email, source, created_at, ip_address
FROM email_signups
"""
params = []
if source:
query += " WHERE source = ?"
params.append(source)
query += " ORDER BY created_at DESC"
emails = conn.execute(query, params).fetchall()
# Convert to CSV format
import io
import csv
from fastapi.responses import StreamingResponse
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['Email', 'Source', 'Created At', 'IP Address'])
# Write data
for email in emails:
writer.writerow([
email['email'],
email['source'],
email['created_at'],
email['ip_address']
])
output.seek(0)
# Create filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"devbox_emails_{timestamp}.csv"
return StreamingResponse(
io.BytesIO(output.getvalue().encode('utf-8')),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as e:
logger.error(f"Error exporting emails: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
conn.close()
@app.get("/api/search")
async def search_emails(q: str, limit: int = 50):
"""
Search emails by email address or domain
"""
if len(q) < 2:
raise HTTPException(status_code=400, detail="Search query must be at least 2 characters")
conn = get_db_connection()
try:
emails = conn.execute("""
SELECT id, email, source, created_at
FROM email_signups
WHERE email LIKE ?
ORDER BY created_at DESC
LIMIT ?
""", (f"%{q}%", limit)).fetchall()
return {
"query": q,
"count": len(emails),
"emails": [dict(email) for email in emails]
}
except Exception as e:
logger.error(f"Error searching emails: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
conn.close()
@app.get("/")
async def root():
"""
Root endpoint with API information
"""
return {
"message": "DevBox Email Collection API",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat(),
"endpoints": {
"subscribe": "POST /api/subscribe",
"list_emails": "GET /api/emails",
"stats": "GET /api/stats",
"search": "GET /api/search?q=query",
"export": "GET /api/export",
"unsubscribe": "DELETE /api/unsubscribe/{email}",
"health": "GET /health",
"docs": "GET /docs"
}
}
@app.get("/health")
async def health_check():
"""
Health check endpoint
"""
conn = get_db_connection()
try:
# Test database connection
conn.execute("SELECT 1").fetchone()
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {str(e)}"
logger.error(f"Database health check failed: {e}")
finally:
conn.close()
return {
"status": "healthy" if db_status == "healthy" else "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"database": db_status,
"version": "1.0.0"
}
# Custom exception handler
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
return {
"error": "Not Found",
"message": "The requested endpoint was not found",
"available_endpoints": [
"/",
"/health",
"/docs",
"/api/subscribe",
"/api/emails",
"/api/stats",
"/api/search",
"/api/export"
]
}
if __name__ == "__main__":
# Check if running in development mode
import sys
dev_mode = "--dev" in sys.argv or os.getenv("DEV_MODE", "false").lower() == "true"
if dev_mode:
# Development settings
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8080,
reload=True,
log_level="info"
)
else:
# Production settings
uvicorn.run(
app,
host="0.0.0.0",
port=8080,
log_level="info"
)