from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, EmailStr from contextlib import asynccontextmanager import sqlite3 import logging from datetime import datetime from typing import List, Optional import uvicorn import os # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Database configuration DATABASE_FILE = "db/devbox.sqlite3" def get_db_connection(): """Create and return a database connection""" conn = sqlite3.connect(DATABASE_FILE) conn.row_factory = sqlite3.Row # This allows dict-like access to rows return conn def init_database(): """Initialize the database with required tables""" conn = get_db_connection() try: conn.execute(""" CREATE TABLE IF NOT EXISTS email_signups ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE NOT NULL, source TEXT DEFAULT 'devbox-landing', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, user_agent TEXT, ip_address TEXT ) """) # Create indexes for better performance conn.execute("CREATE INDEX IF NOT EXISTS idx_email ON email_signups(email)") conn.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON email_signups(created_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON email_signups(source)") conn.commit() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Error initializing database: {e}") raise finally: conn.close() @asynccontextmanager async def lifespan(app: FastAPI): """Handle application startup and shutdown events""" # Startup logger.info("DevBox Email Collection API starting up...") init_database() logger.info("DevBox Email Collection API started successfully") yield # Shutdown logger.info("DevBox Email Collection API shutting down...") # Create FastAPI app with lifespan app = FastAPI( title="DevBox Email Collection API", description="Backend API for collecting email signups for DevBox landing page", version="1.0.0", lifespan=lifespan ) # CORS middleware to allow requests from your frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pydantic models class EmailSignup(BaseModel): email: EmailStr source: Optional[str] = "devbox-landing" class EmailSignupResponse(BaseModel): success: bool message: str email: str id: Optional[int] = None class EmailListResponse(BaseModel): total_count: int emails: List[dict] class StatsResponse(BaseModel): total_signups: int today_signups: int week_signups: int month_signups: int first_signup: Optional[str] latest_signup: Optional[str] sources: dict class UnsubscribeResponse(BaseModel): success: bool message: str # Helper function to get client IP def get_client_ip(request: Request) -> str: """Extract client IP address from request""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else "unknown" # API endpoints @app.post("/api/subscribe", response_model=EmailSignupResponse) async def subscribe_email(signup: EmailSignup, request: Request): """ Subscribe an email address to the DevBox mailing list """ conn = get_db_connection() try: # Check if email already exists existing = conn.execute( "SELECT id FROM email_signups WHERE email = ?", (signup.email,) ).fetchone() if existing: return EmailSignupResponse( success=False, message="Email already subscribed", email=signup.email ) # Get client information user_agent = request.headers.get("User-Agent", "") ip_address = get_client_ip(request) # Insert new email cursor = conn.execute( """ INSERT INTO email_signups (email, source, created_at, user_agent, ip_address) VALUES (?, ?, ?, ?, ?) """, (signup.email, signup.source, datetime.utcnow(), user_agent, ip_address) ) conn.commit() logger.info(f"New email subscription: {signup.email} from {ip_address}") return EmailSignupResponse( success=True, message="Successfully subscribed to DevBox updates!", email=signup.email, id=cursor.lastrowid ) except sqlite3.IntegrityError: return EmailSignupResponse( success=False, message="Email already subscribed", email=signup.email ) except Exception as e: logger.error(f"Error subscribing email {signup.email}: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: conn.close() @app.get("/api/emails", response_model=EmailListResponse) async def get_emails(limit: int = 100, offset: int = 0, source: Optional[str] = None): """ Get list of subscribed emails (for admin purposes) Note: In production, add authentication/authorization """ conn = get_db_connection() try: # Build query with optional source filter base_query = "SELECT COUNT(*) as count FROM email_signups" emails_query = """ SELECT id, email, source, created_at, ip_address FROM email_signups """ params = [] if source: base_query += " WHERE source = ?" emails_query += " WHERE source = ?" params.append(source) # Get total count total_count = conn.execute(base_query, params).fetchone()["count"] # Get emails with pagination emails_query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) emails = conn.execute(emails_query, params).fetchall() # Convert to list of dicts email_list = [dict(email) for email in emails] return EmailListResponse( total_count=total_count, emails=email_list ) except Exception as e: logger.error(f"Error fetching emails: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: conn.close() @app.get("/api/stats", response_model=StatsResponse) async def get_stats(): """ Get detailed statistics about email signups """ conn = get_db_connection() try: # Get basic stats stats = conn.execute(""" SELECT COUNT(*) as total_signups, COUNT(CASE WHEN DATE(created_at) = DATE('now') THEN 1 END) as today_signups, COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-7 days') THEN 1 END) as week_signups, COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-30 days') THEN 1 END) as month_signups, MIN(created_at) as first_signup, MAX(created_at) as latest_signup FROM email_signups """).fetchone() # Get source breakdown sources = conn.execute(""" SELECT source, COUNT(*) as count FROM email_signups GROUP BY source ORDER BY count DESC """).fetchall() sources_dict = {row["source"]: row["count"] for row in sources} return StatsResponse( total_signups=stats["total_signups"], today_signups=stats["today_signups"], week_signups=stats["week_signups"], month_signups=stats["month_signups"], first_signup=stats["first_signup"], latest_signup=stats["latest_signup"], sources=sources_dict ) except Exception as e: logger.error(f"Error fetching stats: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: conn.close() @app.delete("/api/unsubscribe/{email}", response_model=UnsubscribeResponse) async def unsubscribe_email(email: str): """ Remove an email from the subscription list """ conn = get_db_connection() try: cursor = conn.execute("DELETE FROM email_signups WHERE email = ?", (email,)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Email not found") logger.info(f"Email unsubscribed: {email}") return UnsubscribeResponse( success=True, message=f"Email {email} successfully unsubscribed" ) except HTTPException: raise except Exception as e: logger.error(f"Error unsubscribing email {email}: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: conn.close() @app.get("/api/export") async def export_emails(source: Optional[str] = None): """ Export emails to CSV format Note: In production, add authentication/authorization """ conn = get_db_connection() try: query = """ SELECT email, source, created_at, ip_address FROM email_signups """ params = [] if source: query += " WHERE source = ?" params.append(source) query += " ORDER BY created_at DESC" emails = conn.execute(query, params).fetchall() # Convert to CSV format import io import csv from fastapi.responses import StreamingResponse output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow(['Email', 'Source', 'Created At', 'IP Address']) # Write data for email in emails: writer.writerow([ email['email'], email['source'], email['created_at'], email['ip_address'] ]) output.seek(0) # Create filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"devbox_emails_{timestamp}.csv" return StreamingResponse( io.BytesIO(output.getvalue().encode('utf-8')), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename={filename}"} ) except Exception as e: logger.error(f"Error exporting emails: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: conn.close() @app.get("/api/search") async def search_emails(q: str, limit: int = 50): """ Search emails by email address or domain """ if len(q) < 2: raise HTTPException(status_code=400, detail="Search query must be at least 2 characters") conn = get_db_connection() try: emails = conn.execute(""" SELECT id, email, source, created_at FROM email_signups WHERE email LIKE ? ORDER BY created_at DESC LIMIT ? """, (f"%{q}%", limit)).fetchall() return { "query": q, "count": len(emails), "emails": [dict(email) for email in emails] } except Exception as e: logger.error(f"Error searching emails: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: conn.close() @app.get("/") async def root(): """ Root endpoint with API information """ return { "message": "DevBox Email Collection API", "version": "1.0.0", "timestamp": datetime.utcnow().isoformat(), "endpoints": { "subscribe": "POST /api/subscribe", "list_emails": "GET /api/emails", "stats": "GET /api/stats", "search": "GET /api/search?q=query", "export": "GET /api/export", "unsubscribe": "DELETE /api/unsubscribe/{email}", "health": "GET /health", "docs": "GET /docs" } } @app.get("/health") async def health_check(): """ Health check endpoint """ conn = get_db_connection() try: # Test database connection conn.execute("SELECT 1").fetchone() db_status = "healthy" except Exception as e: db_status = f"unhealthy: {str(e)}" logger.error(f"Database health check failed: {e}") finally: conn.close() return { "status": "healthy" if db_status == "healthy" else "unhealthy", "timestamp": datetime.utcnow().isoformat(), "database": db_status, "version": "1.0.0" } # Custom exception handler @app.exception_handler(404) async def not_found_handler(request: Request, exc): return { "error": "Not Found", "message": "The requested endpoint was not found", "available_endpoints": [ "/", "/health", "/docs", "/api/subscribe", "/api/emails", "/api/stats", "/api/search", "/api/export" ] } if __name__ == "__main__": # Check if running in development mode import sys dev_mode = "--dev" in sys.argv or os.getenv("DEV_MODE", "false").lower() == "true" if dev_mode: # Development settings uvicorn.run( "main:app", host="0.0.0.0", port=8080, reload=True, log_level="info" ) else: # Production settings uvicorn.run( app, host="0.0.0.0", port=8080, log_level="info" )