Files
reddit_stock_analyzer/rstat_tool/database.py

422 lines
16 KiB
Python

# rstat_tool/database.py
import sqlite3
import time
from .ticker_extractor import COMMON_WORDS_BLACKLIST
from datetime import datetime, timedelta
DB_FILE = "reddit_stocks.db"
def get_db_connection():
"""Establishes a connection to the SQLite database."""
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
return conn
def initialize_db():
"""
Initializes the database and creates the necessary tables if they don't exist.
"""
conn = get_db_connection()
cursor = conn.cursor()
# --- Create tickers table ---
cursor.execute("""
CREATE TABLE IF NOT EXISTS tickers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL UNIQUE,
market_cap INTEGER,
closing_price REAL,
last_updated INTEGER
)
""")
# --- Create subreddits table ---
cursor.execute("""
CREATE TABLE IF NOT EXISTS subreddits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
""")
# --- Create mentions table ---
cursor.execute("""
CREATE TABLE IF NOT EXISTS mentions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker_id INTEGER,
subreddit_id INTEGER,
post_id TEXT NOT NULL,
mention_type TEXT NOT NULL,
mention_sentiment REAL, -- Renamed from sentiment_score for clarity
post_avg_sentiment REAL, -- NEW: Stores the avg sentiment of the whole post
mention_timestamp INTEGER NOT NULL,
FOREIGN KEY (ticker_id) REFERENCES tickers (id),
FOREIGN KEY (subreddit_id) REFERENCES subreddits (id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
post_url TEXT,
subreddit_id INTEGER,
post_timestamp INTEGER,
comment_count INTEGER,
avg_comment_sentiment REAL,
FOREIGN KEY (subreddit_id) REFERENCES subreddits (id)
)
""")
conn.commit()
conn.close()
print("Database initialized successfully.")
def clean_stale_tickers():
"""
Removes tickers and their associated mentions from the database
if the ticker symbol exists in the COMMON_WORDS_BLACKLIST.
"""
print("\n--- Cleaning Stale Tickers from Database ---")
conn = get_db_connection()
cursor = conn.cursor()
placeholders = ','.join('?' for _ in COMMON_WORDS_BLACKLIST)
query = f"SELECT id, symbol FROM tickers WHERE symbol IN ({placeholders})"
cursor.execute(query, tuple(COMMON_WORDS_BLACKLIST))
stale_tickers = cursor.fetchall()
if not stale_tickers:
print("No stale tickers to clean.")
conn.close()
return
for ticker in stale_tickers:
ticker_id = ticker['id']
ticker_symbol = ticker['symbol']
print(f"Removing stale ticker '{ticker_symbol}' (ID: {ticker_id})...")
cursor.execute("DELETE FROM mentions WHERE ticker_id = ?", (ticker_id,))
cursor.execute("DELETE FROM tickers WHERE id = ?", (ticker_id,))
deleted_count = conn.total_changes
conn.commit()
conn.close()
print(f"Cleanup complete. Removed {deleted_count} records.")
def clean_stale_subreddits(active_subreddits):
"""
Removes all data associated with subreddits that are NOT in the active list.
"""
print("\n--- Cleaning Stale Subreddits from Database ---")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM subreddits")
db_subreddits = cursor.fetchall()
stale_sub_ids = []
for sub in db_subreddits:
if sub['name'] not in active_subreddits:
print(f"Found stale subreddit to remove: r/{sub['name']}")
stale_sub_ids.append(sub['id'])
if not stale_sub_ids:
print("No stale subreddits to clean.")
conn.close()
return
for sub_id in stale_sub_ids:
print(f" -> Deleting associated data for subreddit ID: {sub_id}")
cursor.execute("DELETE FROM mentions WHERE subreddit_id = ?", (sub_id,))
cursor.execute("DELETE FROM posts WHERE subreddit_id = ?", (sub_id,))
cursor.execute("DELETE FROM subreddits WHERE id = ?", (sub_id,))
conn.commit()
conn.close()
print("Stale subreddit cleanup complete.")
def get_db_connection():
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
return conn
def initialize_db():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS tickers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL UNIQUE,
market_cap INTEGER,
closing_price REAL,
last_updated INTEGER
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS subreddits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS mentions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker_id INTEGER,
subreddit_id INTEGER,
post_id TEXT NOT NULL,
mention_type TEXT NOT NULL,
mention_sentiment REAL,
post_avg_sentiment REAL,
mention_timestamp INTEGER NOT NULL,
FOREIGN KEY (ticker_id) REFERENCES tickers (id),
FOREIGN KEY (subreddit_id) REFERENCES subreddits (id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
post_url TEXT,
subreddit_id INTEGER,
post_timestamp INTEGER,
comment_count INTEGER,
avg_comment_sentiment REAL,
FOREIGN KEY (subreddit_id) REFERENCES subreddits (id)
)
""")
conn.commit()
conn.close()
print("Database initialized successfully.")
def add_mention(conn, ticker_id, subreddit_id, post_id, mention_type, timestamp, mention_sentiment, post_avg_sentiment=None):
cursor = conn.cursor()
try:
cursor.execute(
"""
INSERT INTO mentions (ticker_id, subreddit_id, post_id, mention_type, mention_timestamp, mention_sentiment, post_avg_sentiment)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(ticker_id, subreddit_id, post_id, mention_type, timestamp, mention_sentiment, post_avg_sentiment)
)
conn.commit()
except sqlite3.IntegrityError:
pass
def get_or_create_entity(conn, table_name, column_name, value):
"""Generic function to get or create an entity and return its ID."""
cursor = conn.cursor()
cursor.execute(f"SELECT id FROM {table_name} WHERE {column_name} = ?", (value,))
result = cursor.fetchone()
if result:
return result['id']
else:
cursor.execute(f"INSERT INTO {table_name} ({column_name}) VALUES (?)", (value,))
conn.commit()
return cursor.lastrowid
def update_ticker_financials(conn, ticker_id, market_cap, closing_price):
"""Updates the financials and timestamp for a specific ticker."""
cursor = conn.cursor()
current_timestamp = int(time.time())
cursor.execute(
"UPDATE tickers SET market_cap = ?, closing_price = ?, last_updated = ? WHERE id = ?",
(market_cap, closing_price, current_timestamp, ticker_id)
)
conn.commit()
def get_ticker_info(conn, ticker_id):
"""Retrieves all info for a specific ticker by its ID."""
cursor = conn.cursor()
cursor.execute("SELECT * FROM tickers WHERE id = ?", (ticker_id,))
return cursor.fetchone()
def generate_summary_report(limit=20):
"""Queries the DB to generate a summary for the command-line tool."""
print(f"\n--- Top {limit} Tickers by Mention Count ---")
conn = get_db_connection()
cursor = conn.cursor()
# --- UPDATED QUERY: Changed m.sentiment_score to m.mention_sentiment ---
query = """
SELECT
t.symbol, t.market_cap, t.closing_price,
COUNT(m.id) as mention_count,
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions,
SUM(CASE WHEN m.mention_sentiment BETWEEN -0.1 AND 0.1 THEN 1 ELSE 0 END) as neutral_mentions
FROM mentions m JOIN tickers t ON m.ticker_id = t.id
GROUP BY t.symbol, t.market_cap, t.closing_price
ORDER BY mention_count DESC
LIMIT ?;
"""
results = cursor.execute(query, (limit,)).fetchall()
header = f"{'Ticker':<8} | {'Mentions':<8} | {'Bullish':<8} | {'Bearish':<8} | {'Neutral':<8} | {'Market Cap':<15} | {'Close Price':<12}"
print(header)
print("-" * (len(header) + 2)) # Adjusted separator length
for row in results:
market_cap_str = "N/A"
if row['market_cap'] and row['market_cap'] > 0:
mc = row['market_cap']
if mc >= 1e12: market_cap_str = f"${mc/1e12:.2f}T"
elif mc >= 1e9: market_cap_str = f"${mc/1e9:.2f}B"
else: market_cap_str = f"${mc/1e6:.2f}M"
closing_price_str = f"${row['closing_price']:.2f}" if row['closing_price'] else "N/A"
print(
f"{row['symbol']:<8} | "
f"{row['mention_count']:<8} | "
f"{row['bullish_mentions']:<8} | "
f"{row['bearish_mentions']:<8} | "
f"{row['neutral_mentions']:<8} | "
f"{market_cap_str:<15} | "
f"{closing_price_str:<12}"
)
conn.close()
def get_all_scanned_subreddits():
"""Gets a unique list of all subreddits we have data for."""
conn = get_db_connection()
results = conn.execute("SELECT DISTINCT name FROM subreddits ORDER BY name ASC;").fetchall()
conn.close()
return [row['name'] for row in results]
def add_or_update_post_analysis(conn, post_data):
"""
Inserts a new post analysis record or updates an existing one.
This prevents duplicate entries for the same post.
"""
cursor = conn.cursor()
# Use the UNIQUE post_id to replace old data with new on conflict
cursor.execute(
"""
INSERT INTO posts (post_id, title, post_url, subreddit_id, post_timestamp, comment_count, avg_comment_sentiment)
VALUES (:post_id, :title, :post_url, :subreddit_id, :post_timestamp, :comment_count, :avg_comment_sentiment)
ON CONFLICT(post_id) DO UPDATE SET
comment_count = excluded.comment_count,
avg_comment_sentiment = excluded.avg_comment_sentiment;
""",
post_data
)
conn.commit()
def get_deep_dive_details(ticker_symbol):
"""
Gets all analyzed posts that mention a specific ticker.
"""
conn = get_db_connection()
query = """
SELECT DISTINCT p.*, s.name as subreddit_name FROM posts p
JOIN mentions m ON p.post_id = m.post_id
JOIN tickers t ON m.ticker_id = t.id
JOIN subreddits s ON p.subreddit_id = s.id
WHERE LOWER(t.symbol) = LOWER(?)
ORDER BY p.post_timestamp DESC;
"""
results = conn.execute(query, (ticker_symbol,)).fetchall()
conn.close()
return results
def get_overall_summary(limit=50):
conn = get_db_connection()
query = """
SELECT
t.symbol, t.market_cap, t.closing_price,
COUNT(m.id) as mention_count,
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions,
SUM(CASE WHEN m.mention_sentiment BETWEEN -0.1 AND 0.1 THEN 1 ELSE 0 END) as neutral_mentions
FROM mentions m JOIN tickers t ON m.ticker_id = t.id
GROUP BY t.symbol, t.market_cap, t.closing_price
ORDER BY mention_count DESC LIMIT ?;
"""
results = conn.execute(query, (limit,)).fetchall()
conn.close()
return results
def get_subreddit_summary(subreddit_name, limit=50):
conn = get_db_connection()
query = """
SELECT
t.symbol, t.market_cap, t.closing_price,
COUNT(m.id) as mention_count,
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions,
SUM(CASE WHEN m.mention_sentiment BETWEEN -0.1 AND 0.1 THEN 1 ELSE 0 END) as neutral_mentions
FROM mentions m
JOIN tickers t ON m.ticker_id = t.id
JOIN subreddits s ON m.subreddit_id = s.id
WHERE LOWER(s.name) = LOWER(?)
GROUP BY t.symbol, t.market_cap, t.closing_price
ORDER BY mention_count DESC LIMIT ?;
"""
results = conn.execute(query, (subreddit_name, limit)).fetchall()
conn.close()
return results
def get_image_view_summary(subreddit_name):
conn = get_db_connection()
query = """
SELECT
t.symbol,
COUNT(CASE WHEN m.mention_type = 'post' THEN 1 END) as post_mentions,
COUNT(CASE WHEN m.mention_type = 'comment' THEN 1 END) as comment_mentions,
COUNT(CASE WHEN m.mention_sentiment > 0.1 THEN 1 END) as bullish_mentions,
COUNT(CASE WHEN m.mention_sentiment < -0.1 THEN 1 END) as bearish_mentions
FROM mentions m
JOIN tickers t ON m.ticker_id = t.id
JOIN subreddits s ON m.subreddit_id = s.id
WHERE LOWER(s.name) = LOWER(?)
GROUP BY t.symbol
ORDER BY (post_mentions + comment_mentions) DESC
LIMIT 10;
"""
results = conn.execute(query, (subreddit_name,)).fetchall()
conn.close()
return results
def get_weekly_summary_for_subreddit(subreddit_name):
conn = get_db_connection()
seven_days_ago = datetime.utcnow() - timedelta(days=7)
seven_days_ago_timestamp = int(seven_days_ago.timestamp())
query = """
SELECT
t.symbol,
COUNT(CASE WHEN m.mention_type = 'post' THEN 1 END) as post_mentions,
COUNT(CASE WHEN m.mention_type = 'comment' THEN 1 END) as comment_mentions,
COUNT(CASE WHEN m.mention_sentiment > 0.1 THEN 1 END) as bullish_mentions,
COUNT(CASE WHEN m.mention_sentiment < -0.1 THEN 1 END) as bearish_mentions
FROM mentions m
JOIN tickers t ON m.ticker_id = t.id
JOIN subreddits s ON m.subreddit_id = s.id
WHERE LOWER(s.name) = LOWER(?) AND m.mention_timestamp >= ?
GROUP BY t.symbol
ORDER BY (post_mentions + comment_mentions) DESC
LIMIT 10;
"""
results = conn.execute(query, (subreddit_name, seven_days_ago_timestamp)).fetchall()
conn.close()
return results
def get_overall_image_view_summary():
"""
Gets a summary of top tickers across ALL subreddits for the image view.
"""
conn = get_db_connection()
query = """
SELECT
t.symbol,
COUNT(CASE WHEN m.mention_type = 'post' THEN 1 END) as post_mentions,
COUNT(CASE WHEN m.mention_type = 'comment' THEN 1 END) as comment_mentions,
COUNT(CASE WHEN m.mention_sentiment > 0.1 THEN 1 END) as bullish_mentions,
COUNT(CASE WHEN m.mention_sentiment < -0.1 THEN 1 END) as bearish_mentions
FROM mentions m
JOIN tickers t ON m.ticker_id = t.id
-- No JOIN or WHERE for subreddit, as we want all of them
GROUP BY t.symbol
ORDER BY (post_mentions + comment_mentions) DESC
LIMIT 10;
"""
results = conn.execute(query).fetchall()
conn.close()
return results