#!/usr/bin/env python3 """ DevBox Email Database Management Utilities This script provides database management utilities for the DevBox email collection system. Offers tools for database maintenance, backups, and data analysis. Usage: python database_manager.py --help python database_manager.py --export python database_manager.py --backup python database_manager.py --stats python database_manager.py --cleanup-duplicates python database_manager.py --validate-emails """ import sqlite3 import csv import argparse import json import sys from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple import logging import shutil import os import re # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DevBoxEmailManager: """ Database manager for DevBox email subscriptions """ def __init__(self, db_file: str = "db/devbox.sqlite3"): self.db_file = db_file self.ensure_database_exists() def ensure_database_exists(self): """Ensure database exists and is properly initialized""" if not os.path.exists(self.db_file): logger.warning(f"Database {self.db_file} not found. Creating new database...") self.init_database() else: logger.info(f"Using existing database: {self.db_file}") # Verify table structure self._verify_schema() def get_connection(self): """Get database connection with error handling""" try: conn = sqlite3.connect(self.db_file) conn.row_factory = sqlite3.Row return conn except Exception as e: logger.error(f"Failed to connect to database: {e}") raise def _verify_schema(self): """Verify database schema is correct""" conn = self.get_connection() try: # Check if table exists and has correct columns cursor = conn.execute("PRAGMA table_info(email_signups)") columns = {row[1] for row in cursor.fetchall()} expected_columns = {'id', 'email', 'source', 'created_at', 'user_agent', 'ip_address'} if not expected_columns.issubset(columns): logger.warning("Database schema incomplete. Updating...") self._update_schema(conn) finally: conn.close() def _update_schema(self, conn): """Update database schema if needed""" try: # Add missing columns if they don't exist try: conn.execute("ALTER TABLE email_signups ADD COLUMN user_agent TEXT") logger.info("Added user_agent column") except sqlite3.OperationalError: pass # Column already exists try: conn.execute("ALTER TABLE email_signups ADD COLUMN ip_address TEXT") logger.info("Added ip_address column") except sqlite3.OperationalError: pass # Column already exists conn.commit() except Exception as e: logger.error(f"Failed to update schema: {e}") def init_database(self): """Initialize database with required tables""" conn = self.get_connection() try: conn.execute(""" CREATE TABLE IF NOT EXISTS email_signups ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE NOT NULL, source TEXT DEFAULT 'devbox-landing', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, user_agent TEXT, ip_address TEXT ) """) # Create indexes for better performance conn.execute("CREATE INDEX IF NOT EXISTS idx_email ON email_signups(email)") conn.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON email_signups(created_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON email_signups(source)") conn.commit() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise finally: conn.close() def export_to_csv(self, filename: Optional[str] = None, source_filter: Optional[str] = None) -> str: """Export emails to CSV file""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") source_suffix = f"_{source_filter}" if source_filter else "" filename = f"devbox_emails{source_suffix}_{timestamp}.csv" conn = self.get_connection() try: query = """ SELECT id, email, source, created_at, user_agent, ip_address FROM email_signups """ params = [] if source_filter: query += " WHERE source = ?" params.append(source_filter) query += " ORDER BY created_at DESC" emails = conn.execute(query, params).fetchall() with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) writer.writerow(['ID', 'Email', 'Source', 'Created At', 'User Agent', 'IP Address']) for email in emails: writer.writerow([ email['id'], email['email'], email['source'] or 'devbox-landing', email['created_at'], email['user_agent'] or '', email['ip_address'] or '' ]) logger.info(f"Exported {len(emails)} emails to {filename}") print(f"โœ… Exported {len(emails)} emails to {filename}") return filename except Exception as e: logger.error(f"Export failed: {e}") raise finally: conn.close() def get_detailed_stats(self) -> Dict: """Get comprehensive statistics""" conn = self.get_connection() try: # Basic stats basic_stats = conn.execute(""" SELECT COUNT(*) as total_signups, COUNT(CASE WHEN DATE(created_at) = DATE('now') THEN 1 END) as today_signups, COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-7 days') THEN 1 END) as week_signups, COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-30 days') THEN 1 END) as month_signups, MIN(created_at) as first_signup, MAX(created_at) as latest_signup FROM email_signups """).fetchone() if not basic_stats or basic_stats['total_signups'] == 0: return { 'message': 'No email signups found in database', 'basic_stats': { 'total_signups': 0, 'today_signups': 0, 'week_signups': 0, 'month_signups': 0, 'first_signup': None, 'latest_signup': None }, 'sources': {}, 'daily_signups': [], 'top_domains': [] } # Source breakdown sources = conn.execute(""" SELECT COALESCE(source, 'devbox-landing') as source, COUNT(*) as count FROM email_signups GROUP BY COALESCE(source, 'devbox-landing') ORDER BY count DESC """).fetchall() # Daily signups for last 30 days daily_signups = conn.execute(""" SELECT DATE(created_at) as signup_date, COUNT(*) as daily_count FROM email_signups WHERE DATE(created_at) >= DATE('now', '-30 days') GROUP BY DATE(created_at) ORDER BY signup_date DESC """).fetchall() # Top domains top_domains = conn.execute(""" SELECT CASE WHEN INSTR(email, '@') > 0 THEN SUBSTR(email, INSTR(email, '@') + 1) ELSE 'invalid' END as domain, COUNT(*) as count FROM email_signups GROUP BY domain ORDER BY count DESC LIMIT 10 """).fetchall() return { 'basic_stats': dict(basic_stats), 'sources': {row['source']: row['count'] for row in sources}, 'daily_signups': [dict(row) for row in daily_signups], 'top_domains': [dict(row) for row in top_domains] } except Exception as e: logger.error(f"Failed to get stats: {e}") raise finally: conn.close() def cleanup_duplicates(self) -> int: """Remove duplicate emails (keep the earliest)""" conn = self.get_connection() try: # Find duplicates duplicates = conn.execute(""" SELECT email, COUNT(*) as count, MIN(id) as keep_id FROM email_signups GROUP BY LOWER(email) HAVING COUNT(*) > 1 """).fetchall() if not duplicates: print("โœ… No duplicates found") return 0 removed_count = 0 for dup in duplicates: # Remove all but the earliest (smallest ID) result = conn.execute(""" DELETE FROM email_signups WHERE LOWER(email) = LOWER(?) AND id != ? """, (dup['email'], dup['keep_id'])) removed_count += result.rowcount logger.info(f"Removed {result.rowcount} duplicate(s) for {dup['email']}") print(f"๐Ÿงน Removed {result.rowcount} duplicate(s) for {dup['email']}") conn.commit() logger.info(f"Cleanup complete. Removed {removed_count} duplicate records.") print(f"โœ… Cleanup complete. Removed {removed_count} duplicate records.") return removed_count except Exception as e: logger.error(f"Cleanup failed: {e}") raise finally: conn.close() def backup_database(self, backup_file: Optional[str] = None) -> str: """Create a backup of the database""" if backup_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"devbox_emails_backup_{timestamp}.db" try: if not os.path.exists(self.db_file): raise FileNotFoundError(f"Source database {self.db_file} does not exist") shutil.copy2(self.db_file, backup_file) file_size = os.path.getsize(backup_file) logger.info(f"Database backed up to {backup_file} ({file_size:,} bytes)") print(f"โœ… Database backed up to {backup_file} ({file_size:,} bytes)") return backup_file except Exception as e: logger.error(f"Backup failed: {e}") raise def analyze_growth(self, days: int = 30) -> Dict: """Analyze signup growth trends""" conn = self.get_connection() try: growth_data = conn.execute(""" SELECT DATE(created_at) as date, COUNT(*) as signups, SUM(COUNT(*)) OVER (ORDER BY DATE(created_at)) as cumulative FROM email_signups WHERE DATE(created_at) >= DATE('now', '-' || ? || ' days') GROUP BY DATE(created_at) ORDER BY date """, (days,)).fetchall() if not growth_data: return { 'message': f'No data available for the last {days} days', 'period_days': days, 'total_signups_in_period': 0, 'growth_rate_percent': 0, 'daily_data': [], 'average_daily_signups': 0 } # Calculate growth rate total_signups = growth_data[-1]['cumulative'] if growth_data else 0 first_day_cumulative = growth_data[0]['cumulative'] - growth_data[0]['signups'] if growth_data else 0 signups_in_period = total_signups - first_day_cumulative growth_rate = 0 if first_day_cumulative > 0: growth_rate = (signups_in_period / first_day_cumulative) * 100 return { 'period_days': days, 'total_signups_in_period': signups_in_period, 'growth_rate_percent': round(growth_rate, 2), 'daily_data': [dict(row) for row in growth_data], 'average_daily_signups': round(signups_in_period / days, 2) if days > 0 else 0 } except Exception as e: logger.error(f"Growth analysis failed: {e}") raise finally: conn.close() def validate_emails(self) -> Dict: """Validate email formats in the database""" email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') conn = self.get_connection() try: all_emails = conn.execute("SELECT id, email FROM email_signups").fetchall() if not all_emails: return { 'total_emails': 0, 'valid_emails': 0, 'invalid_emails': 0, 'invalid_list': [] } valid_emails = [] invalid_emails = [] for row in all_emails: email = row['email'].strip() if row['email'] else '' if email and email_pattern.match(email): valid_emails.append(email) else: invalid_emails.append({'id': row['id'], 'email': email}) return { 'total_emails': len(all_emails), 'valid_emails': len(valid_emails), 'invalid_emails': len(invalid_emails), 'invalid_list': invalid_emails } except Exception as e: logger.error(f"Email validation failed: {e}") raise finally: conn.close() def get_email_count(self) -> int: """Get total number of subscribed emails""" conn = self.get_connection() try: result = conn.execute("SELECT COUNT(*) as count FROM email_signups").fetchone() return result['count'] if result else 0 except Exception as e: logger.error(f"Failed to get email count: {e}") return 0 finally: conn.close() def print_stats_table(stats: Dict): """Print statistics in a nice table format""" print("\n๐Ÿ“Š DevBox Email Statistics") print("=" * 50) basic = stats.get('basic_stats', {}) print(f"Total Signups: {basic.get('total_signups', 0):,}") print(f"Today: {basic.get('today_signups', 0):,}") print(f"This Week: {basic.get('week_signups', 0):,}") print(f"This Month: {basic.get('month_signups', 0):,}") print(f"First Signup: {basic.get('first_signup', 'N/A')}") print(f"Latest Signup: {basic.get('latest_signup', 'N/A')}") sources = stats.get('sources', {}) if sources: print(f"\n๐Ÿ“ˆ Sources:") for source, count in sources.items(): print(f" {source}: {count:,}") domains = stats.get('top_domains', []) if domains: print(f"\n๐ŸŒ Top Domains:") for domain in domains[:5]: print(f" {domain['domain']}: {domain['count']:,}") def main(): """Command line interface for database management""" parser = argparse.ArgumentParser( description='DevBox Email Database Manager', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python database_manager.py --stats python database_manager.py --export python database_manager.py --export --source-filter devbox-landing python database_manager.py --backup python database_manager.py --cleanup-duplicates python database_manager.py --validate-emails python database_manager.py --analyze-growth 14 """ ) parser.add_argument('--export', action='store_true', help='Export emails to CSV') parser.add_argument('--backup', action='store_true', help='Create database backup') parser.add_argument('--stats', action='store_true', help='Show detailed statistics') parser.add_argument('--cleanup-duplicates', action='store_true', help='Remove duplicate emails') parser.add_argument('--analyze-growth', type=int, metavar='DAYS', help='Analyze growth over N days') parser.add_argument('--validate-emails', action='store_true', help='Validate email formats') parser.add_argument('--source-filter', type=str, help='Filter by source (for export)') parser.add_argument('--db-file', type=str, default='devbox_emails.db', help='Database file path') parser.add_argument('--count', action='store_true', help='Show total email count') args = parser.parse_args() # If no arguments provided, show help if len(sys.argv) == 1: parser.print_help() return 0 try: manager = DevBoxEmailManager(args.db_file) if args.count: count = manager.get_email_count() print(f"๐Ÿ“ง Total emails in database: {count:,}") if args.export: filename = manager.export_to_csv(source_filter=args.source_filter) # filename is already printed by the method if args.backup: backup_file = manager.backup_database() # backup_file is already printed by the method if args.stats: stats = manager.get_detailed_stats() if 'message' in stats: print(f"โ„น๏ธ {stats['message']}") else: print_stats_table(stats) if args.cleanup_duplicates: removed = manager.cleanup_duplicates() # results are already printed by the method if args.analyze_growth: growth = manager.analyze_growth(args.analyze_growth) print(f"\n๐Ÿ“ˆ Growth Analysis ({args.analyze_growth} days):") print("=" * 40) if 'message' in growth: print(f"โ„น๏ธ {growth['message']}") else: print(f"Total signups in period: {growth['total_signups_in_period']:,}") print(f"Growth rate: {growth['growth_rate_percent']}%") print(f"Average daily signups: {growth['average_daily_signups']}") if args.validate_emails: validation = manager.validate_emails() print(f"\nโœ‰๏ธ Email Validation Results:") print("=" * 35) print(f"Total emails: {validation['total_emails']:,}") print(f"Valid emails: {validation['valid_emails']:,}") print(f"Invalid emails: {validation['invalid_emails']:,}") if validation['invalid_emails'] > 0: print(f"\nโš ๏ธ Invalid emails found:") for invalid in validation['invalid_list'][:10]: # Show first 10 print(f" ID {invalid['id']}: {invalid['email']}") if len(validation['invalid_list']) > 10: print(f" ... and {len(validation['invalid_list']) - 10} more") return 0 except KeyboardInterrupt: print("\n๐Ÿ›‘ Operation cancelled by user") return 1 except Exception as e: logger.error(f"Operation failed: {e}") print(f"โŒ Error: {e}") return 1 if __name__ == "__main__": exit(main())