Files
devbox-web/devbox-back/database_manager.py
2025-06-09 15:18:16 +02:00

541 lines
21 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
DevBox Email Database Management Utilities
This script provides database management utilities for the DevBox email collection system.
Offers tools for database maintenance, backups, and data analysis.
Usage:
python database_manager.py --help
python database_manager.py --export
python database_manager.py --backup
python database_manager.py --stats
python database_manager.py --cleanup-duplicates
python database_manager.py --validate-emails
"""
import sqlite3
import csv
import argparse
import json
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
import shutil
import os
import re
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DevBoxEmailManager:
"""
Database manager for DevBox email subscriptions
"""
def __init__(self, db_file: str = "devbox_emails.db"):
self.db_file = db_file
self.ensure_database_exists()
def ensure_database_exists(self):
"""Ensure database exists and is properly initialized"""
if not os.path.exists(self.db_file):
logger.warning(f"Database {self.db_file} not found. Creating new database...")
self.init_database()
else:
logger.info(f"Using existing database: {self.db_file}")
# Verify table structure
self._verify_schema()
def get_connection(self):
"""Get database connection with error handling"""
try:
conn = sqlite3.connect(self.db_file)
conn.row_factory = sqlite3.Row
return conn
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
def _verify_schema(self):
"""Verify database schema is correct"""
conn = self.get_connection()
try:
# Check if table exists and has correct columns
cursor = conn.execute("PRAGMA table_info(email_signups)")
columns = {row[1] for row in cursor.fetchall()}
expected_columns = {'id', 'email', 'source', 'created_at', 'user_agent', 'ip_address'}
if not expected_columns.issubset(columns):
logger.warning("Database schema incomplete. Updating...")
self._update_schema(conn)
finally:
conn.close()
def _update_schema(self, conn):
"""Update database schema if needed"""
try:
# Add missing columns if they don't exist
try:
conn.execute("ALTER TABLE email_signups ADD COLUMN user_agent TEXT")
logger.info("Added user_agent column")
except sqlite3.OperationalError:
pass # Column already exists
try:
conn.execute("ALTER TABLE email_signups ADD COLUMN ip_address TEXT")
logger.info("Added ip_address column")
except sqlite3.OperationalError:
pass # Column already exists
conn.commit()
except Exception as e:
logger.error(f"Failed to update schema: {e}")
def init_database(self):
"""Initialize database with required tables"""
conn = self.get_connection()
try:
conn.execute("""
CREATE TABLE IF NOT EXISTS email_signups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
source TEXT DEFAULT 'devbox-landing',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_agent TEXT,
ip_address TEXT
)
""")
# Create indexes for better performance
conn.execute("CREATE INDEX IF NOT EXISTS idx_email ON email_signups(email)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON email_signups(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON email_signups(source)")
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
finally:
conn.close()
def export_to_csv(self, filename: Optional[str] = None, source_filter: Optional[str] = None) -> str:
"""Export emails to CSV file"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
source_suffix = f"_{source_filter}" if source_filter else ""
filename = f"devbox_emails{source_suffix}_{timestamp}.csv"
conn = self.get_connection()
try:
query = """
SELECT id, email, source, created_at, user_agent, ip_address
FROM email_signups
"""
params = []
if source_filter:
query += " WHERE source = ?"
params.append(source_filter)
query += " ORDER BY created_at DESC"
emails = conn.execute(query, params).fetchall()
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['ID', 'Email', 'Source', 'Created At', 'User Agent', 'IP Address'])
for email in emails:
writer.writerow([
email['id'],
email['email'],
email['source'] or 'devbox-landing',
email['created_at'],
email['user_agent'] or '',
email['ip_address'] or ''
])
logger.info(f"Exported {len(emails)} emails to {filename}")
print(f"✅ Exported {len(emails)} emails to {filename}")
return filename
except Exception as e:
logger.error(f"Export failed: {e}")
raise
finally:
conn.close()
def get_detailed_stats(self) -> Dict:
"""Get comprehensive statistics"""
conn = self.get_connection()
try:
# Basic stats
basic_stats = conn.execute("""
SELECT
COUNT(*) as total_signups,
COUNT(CASE WHEN DATE(created_at) = DATE('now') THEN 1 END) as today_signups,
COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-7 days') THEN 1 END) as week_signups,
COUNT(CASE WHEN DATE(created_at) >= DATE('now', '-30 days') THEN 1 END) as month_signups,
MIN(created_at) as first_signup,
MAX(created_at) as latest_signup
FROM email_signups
""").fetchone()
if not basic_stats or basic_stats['total_signups'] == 0:
return {
'message': 'No email signups found in database',
'basic_stats': {
'total_signups': 0,
'today_signups': 0,
'week_signups': 0,
'month_signups': 0,
'first_signup': None,
'latest_signup': None
},
'sources': {},
'daily_signups': [],
'top_domains': []
}
# Source breakdown
sources = conn.execute("""
SELECT COALESCE(source, 'devbox-landing') as source, COUNT(*) as count
FROM email_signups
GROUP BY COALESCE(source, 'devbox-landing')
ORDER BY count DESC
""").fetchall()
# Daily signups for last 30 days
daily_signups = conn.execute("""
SELECT
DATE(created_at) as signup_date,
COUNT(*) as daily_count
FROM email_signups
WHERE DATE(created_at) >= DATE('now', '-30 days')
GROUP BY DATE(created_at)
ORDER BY signup_date DESC
""").fetchall()
# Top domains
top_domains = conn.execute("""
SELECT
CASE
WHEN INSTR(email, '@') > 0
THEN SUBSTR(email, INSTR(email, '@') + 1)
ELSE 'invalid'
END as domain,
COUNT(*) as count
FROM email_signups
GROUP BY domain
ORDER BY count DESC
LIMIT 10
""").fetchall()
return {
'basic_stats': dict(basic_stats),
'sources': {row['source']: row['count'] for row in sources},
'daily_signups': [dict(row) for row in daily_signups],
'top_domains': [dict(row) for row in top_domains]
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
raise
finally:
conn.close()
def cleanup_duplicates(self) -> int:
"""Remove duplicate emails (keep the earliest)"""
conn = self.get_connection()
try:
# Find duplicates
duplicates = conn.execute("""
SELECT email, COUNT(*) as count, MIN(id) as keep_id
FROM email_signups
GROUP BY LOWER(email)
HAVING COUNT(*) > 1
""").fetchall()
if not duplicates:
print("✅ No duplicates found")
return 0
removed_count = 0
for dup in duplicates:
# Remove all but the earliest (smallest ID)
result = conn.execute("""
DELETE FROM email_signups
WHERE LOWER(email) = LOWER(?) AND id != ?
""", (dup['email'], dup['keep_id']))
removed_count += result.rowcount
logger.info(f"Removed {result.rowcount} duplicate(s) for {dup['email']}")
print(f"🧹 Removed {result.rowcount} duplicate(s) for {dup['email']}")
conn.commit()
logger.info(f"Cleanup complete. Removed {removed_count} duplicate records.")
print(f"✅ Cleanup complete. Removed {removed_count} duplicate records.")
return removed_count
except Exception as e:
logger.error(f"Cleanup failed: {e}")
raise
finally:
conn.close()
def backup_database(self, backup_file: Optional[str] = None) -> str:
"""Create a backup of the database"""
if backup_file is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"devbox_emails_backup_{timestamp}.db"
try:
if not os.path.exists(self.db_file):
raise FileNotFoundError(f"Source database {self.db_file} does not exist")
shutil.copy2(self.db_file, backup_file)
file_size = os.path.getsize(backup_file)
logger.info(f"Database backed up to {backup_file} ({file_size:,} bytes)")
print(f"✅ Database backed up to {backup_file} ({file_size:,} bytes)")
return backup_file
except Exception as e:
logger.error(f"Backup failed: {e}")
raise
def analyze_growth(self, days: int = 30) -> Dict:
"""Analyze signup growth trends"""
conn = self.get_connection()
try:
growth_data = conn.execute("""
SELECT
DATE(created_at) as date,
COUNT(*) as signups,
SUM(COUNT(*)) OVER (ORDER BY DATE(created_at)) as cumulative
FROM email_signups
WHERE DATE(created_at) >= DATE('now', '-' || ? || ' days')
GROUP BY DATE(created_at)
ORDER BY date
""", (days,)).fetchall()
if not growth_data:
return {
'message': f'No data available for the last {days} days',
'period_days': days,
'total_signups_in_period': 0,
'growth_rate_percent': 0,
'daily_data': [],
'average_daily_signups': 0
}
# Calculate growth rate
total_signups = growth_data[-1]['cumulative'] if growth_data else 0
first_day_cumulative = growth_data[0]['cumulative'] - growth_data[0]['signups'] if growth_data else 0
signups_in_period = total_signups - first_day_cumulative
growth_rate = 0
if first_day_cumulative > 0:
growth_rate = (signups_in_period / first_day_cumulative) * 100
return {
'period_days': days,
'total_signups_in_period': signups_in_period,
'growth_rate_percent': round(growth_rate, 2),
'daily_data': [dict(row) for row in growth_data],
'average_daily_signups': round(signups_in_period / days, 2) if days > 0 else 0
}
except Exception as e:
logger.error(f"Growth analysis failed: {e}")
raise
finally:
conn.close()
def validate_emails(self) -> Dict:
"""Validate email formats in the database"""
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
conn = self.get_connection()
try:
all_emails = conn.execute("SELECT id, email FROM email_signups").fetchall()
if not all_emails:
return {
'total_emails': 0,
'valid_emails': 0,
'invalid_emails': 0,
'invalid_list': []
}
valid_emails = []
invalid_emails = []
for row in all_emails:
email = row['email'].strip() if row['email'] else ''
if email and email_pattern.match(email):
valid_emails.append(email)
else:
invalid_emails.append({'id': row['id'], 'email': email})
return {
'total_emails': len(all_emails),
'valid_emails': len(valid_emails),
'invalid_emails': len(invalid_emails),
'invalid_list': invalid_emails
}
except Exception as e:
logger.error(f"Email validation failed: {e}")
raise
finally:
conn.close()
def get_email_count(self) -> int:
"""Get total number of subscribed emails"""
conn = self.get_connection()
try:
result = conn.execute("SELECT COUNT(*) as count FROM email_signups").fetchone()
return result['count'] if result else 0
except Exception as e:
logger.error(f"Failed to get email count: {e}")
return 0
finally:
conn.close()
def print_stats_table(stats: Dict):
"""Print statistics in a nice table format"""
print("\n📊 DevBox Email Statistics")
print("=" * 50)
basic = stats.get('basic_stats', {})
print(f"Total Signups: {basic.get('total_signups', 0):,}")
print(f"Today: {basic.get('today_signups', 0):,}")
print(f"This Week: {basic.get('week_signups', 0):,}")
print(f"This Month: {basic.get('month_signups', 0):,}")
print(f"First Signup: {basic.get('first_signup', 'N/A')}")
print(f"Latest Signup: {basic.get('latest_signup', 'N/A')}")
sources = stats.get('sources', {})
if sources:
print(f"\n📈 Sources:")
for source, count in sources.items():
print(f" {source}: {count:,}")
domains = stats.get('top_domains', [])
if domains:
print(f"\n🌐 Top Domains:")
for domain in domains[:5]:
print(f" {domain['domain']}: {domain['count']:,}")
def main():
"""Command line interface for database management"""
parser = argparse.ArgumentParser(
description='DevBox Email Database Manager',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python database_manager.py --stats
python database_manager.py --export
python database_manager.py --export --source-filter devbox-landing
python database_manager.py --backup
python database_manager.py --cleanup-duplicates
python database_manager.py --validate-emails
python database_manager.py --analyze-growth 14
"""
)
parser.add_argument('--export', action='store_true',
help='Export emails to CSV')
parser.add_argument('--backup', action='store_true',
help='Create database backup')
parser.add_argument('--stats', action='store_true',
help='Show detailed statistics')
parser.add_argument('--cleanup-duplicates', action='store_true',
help='Remove duplicate emails')
parser.add_argument('--analyze-growth', type=int, metavar='DAYS',
help='Analyze growth over N days')
parser.add_argument('--validate-emails', action='store_true',
help='Validate email formats')
parser.add_argument('--source-filter', type=str,
help='Filter by source (for export)')
parser.add_argument('--db-file', type=str, default='devbox_emails.db',
help='Database file path')
parser.add_argument('--count', action='store_true',
help='Show total email count')
args = parser.parse_args()
# If no arguments provided, show help
if len(sys.argv) == 1:
parser.print_help()
return 0
try:
manager = DevBoxEmailManager(args.db_file)
if args.count:
count = manager.get_email_count()
print(f"📧 Total emails in database: {count:,}")
if args.export:
filename = manager.export_to_csv(source_filter=args.source_filter)
# filename is already printed by the method
if args.backup:
backup_file = manager.backup_database()
# backup_file is already printed by the method
if args.stats:
stats = manager.get_detailed_stats()
if 'message' in stats:
print(f" {stats['message']}")
else:
print_stats_table(stats)
if args.cleanup_duplicates:
removed = manager.cleanup_duplicates()
# results are already printed by the method
if args.analyze_growth:
growth = manager.analyze_growth(args.analyze_growth)
print(f"\n📈 Growth Analysis ({args.analyze_growth} days):")
print("=" * 40)
if 'message' in growth:
print(f" {growth['message']}")
else:
print(f"Total signups in period: {growth['total_signups_in_period']:,}")
print(f"Growth rate: {growth['growth_rate_percent']}%")
print(f"Average daily signups: {growth['average_daily_signups']}")
if args.validate_emails:
validation = manager.validate_emails()
print(f"\n✉️ Email Validation Results:")
print("=" * 35)
print(f"Total emails: {validation['total_emails']:,}")
print(f"Valid emails: {validation['valid_emails']:,}")
print(f"Invalid emails: {validation['invalid_emails']:,}")
if validation['invalid_emails'] > 0:
print(f"\n⚠️ Invalid emails found:")
for invalid in validation['invalid_list'][:10]: # Show first 10
print(f" ID {invalid['id']}: {invalid['email']}")
if len(validation['invalid_list']) > 10:
print(f" ... and {len(validation['invalid_list']) - 10} more")
return 0
except KeyboardInterrupt:
print("\n🛑 Operation cancelled by user")
return 1
except Exception as e:
logger.error(f"Operation failed: {e}")
print(f"❌ Error: {e}")
return 1
if __name__ == "__main__":
exit(main())