430 lines
16 KiB
Python
430 lines
16 KiB
Python
# rstat_tool/database.py
|
|
|
|
import sqlite3
|
|
import time
|
|
from .ticker_extractor import COMMON_WORDS_BLACKLIST, extract_golden_tickers, extract_potential_tickers
|
|
from .logger_setup import logger as log
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
DB_FILE = "reddit_stocks.db"
|
|
MARKET_CAP_REFRESH_INTERVAL = 86400
|
|
|
|
|
|
def clean_stale_tickers():
|
|
"""
|
|
Removes tickers and their associated mentions from the database
|
|
if the ticker symbol exists in the COMMON_WORDS_BLACKLIST.
|
|
"""
|
|
log.info("\n--- Cleaning Stale Tickers from Database ---")
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
placeholders = ",".join("?" for _ in COMMON_WORDS_BLACKLIST)
|
|
query = f"SELECT id, symbol FROM tickers WHERE symbol IN ({placeholders})"
|
|
|
|
cursor.execute(query, tuple(COMMON_WORDS_BLACKLIST))
|
|
stale_tickers = cursor.fetchall()
|
|
|
|
if not stale_tickers:
|
|
log.info("No stale tickers to clean.")
|
|
conn.close()
|
|
return
|
|
|
|
for ticker in stale_tickers:
|
|
ticker_id = ticker["id"]
|
|
ticker_symbol = ticker["symbol"]
|
|
log.info(f"Removing stale ticker '{ticker_symbol}' (ID: {ticker_id})...")
|
|
cursor.execute("DELETE FROM mentions WHERE ticker_id = ?", (ticker_id,))
|
|
cursor.execute("DELETE FROM tickers WHERE id = ?", (ticker_id,))
|
|
|
|
deleted_count = conn.total_changes
|
|
conn.commit()
|
|
conn.close()
|
|
log.info(f"Cleanup complete. Removed {deleted_count} records.")
|
|
|
|
|
|
def clean_stale_subreddits(active_subreddits):
|
|
"""
|
|
Removes all data associated with subreddits that are NOT in the active list.
|
|
"""
|
|
log.info("\n--- Cleaning Stale Subreddits from Database ---")
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Convert the list of active subreddits from the config file to a lowercase set for fast,
|
|
# case-insensitive lookups.
|
|
active_subreddits_lower = {sub.lower() for sub in active_subreddits}
|
|
|
|
cursor.execute("SELECT id, name FROM subreddits")
|
|
db_subreddits = cursor.fetchall()
|
|
stale_sub_ids = []
|
|
for sub in db_subreddits:
|
|
if sub["name"] not in active_subreddits_lower:
|
|
log.info(f"Found stale subreddit to remove: r/{sub['name']}")
|
|
stale_sub_ids.append(sub["id"])
|
|
if not stale_sub_ids:
|
|
log.info("No stale subreddits to clean.")
|
|
conn.close()
|
|
return
|
|
for sub_id in stale_sub_ids:
|
|
log.info(f" -> Deleting associated data for subreddit ID: {sub_id}")
|
|
cursor.execute("DELETE FROM mentions WHERE subreddit_id = ?", (sub_id,))
|
|
cursor.execute("DELETE FROM posts WHERE subreddit_id = ?", (sub_id,))
|
|
cursor.execute("DELETE FROM subreddits WHERE id = ?", (sub_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
log.info("Stale subreddit cleanup complete.")
|
|
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def initialize_db():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS tickers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
symbol TEXT NOT NULL UNIQUE,
|
|
market_cap INTEGER,
|
|
closing_price REAL,
|
|
last_updated INTEGER
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS subreddits (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS mentions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ticker_id INTEGER,
|
|
subreddit_id INTEGER,
|
|
post_id TEXT NOT NULL,
|
|
mention_type TEXT NOT NULL,
|
|
mention_sentiment REAL,
|
|
post_avg_sentiment REAL,
|
|
mention_timestamp INTEGER NOT NULL,
|
|
FOREIGN KEY (ticker_id) REFERENCES tickers (id),
|
|
FOREIGN KEY (subreddit_id) REFERENCES subreddits (id)
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS posts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
post_id TEXT NOT NULL UNIQUE,
|
|
title TEXT NOT NULL,
|
|
post_url TEXT,
|
|
subreddit_id INTEGER,
|
|
post_timestamp INTEGER,
|
|
comment_count INTEGER,
|
|
avg_comment_sentiment REAL,
|
|
FOREIGN KEY (subreddit_id) REFERENCES subreddits (id)
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
log.info("Database initialized successfully.")
|
|
|
|
|
|
def add_mention(
|
|
conn,
|
|
ticker_id,
|
|
subreddit_id,
|
|
post_id,
|
|
mention_type,
|
|
timestamp,
|
|
mention_sentiment,
|
|
post_avg_sentiment=None,
|
|
):
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO mentions (ticker_id, subreddit_id, post_id, mention_type, mention_timestamp, mention_sentiment, post_avg_sentiment)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
ticker_id,
|
|
subreddit_id,
|
|
post_id,
|
|
mention_type,
|
|
timestamp,
|
|
mention_sentiment,
|
|
post_avg_sentiment,
|
|
),
|
|
)
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
pass
|
|
|
|
|
|
def get_or_create_entity(conn, table_name, column_name, value):
|
|
"""Generic function to get or create an entity and return its ID."""
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT id FROM {table_name} WHERE {column_name} = ?", (value,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return result["id"]
|
|
else:
|
|
cursor.execute(f"INSERT INTO {table_name} ({column_name}) VALUES (?)", (value,))
|
|
conn.commit()
|
|
return cursor.lastrowid
|
|
|
|
|
|
def update_ticker_financials(conn, ticker_id, market_cap, closing_price):
|
|
"""Updates the financials and timestamp for a specific ticker."""
|
|
cursor = conn.cursor()
|
|
current_timestamp = int(time.time())
|
|
cursor.execute(
|
|
"UPDATE tickers SET market_cap = ?, closing_price = ?, last_updated = ? WHERE id = ?",
|
|
(market_cap, closing_price, current_timestamp, ticker_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def get_ticker_info(conn, ticker_id):
|
|
"""Retrieves all info for a specific ticker by its ID."""
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM tickers WHERE id = ?", (ticker_id,))
|
|
return cursor.fetchone()
|
|
|
|
|
|
def add_or_update_post_analysis(conn, post_data):
|
|
"""
|
|
Inserts a new post analysis record or updates an existing one.
|
|
This prevents duplicate entries for the same post.
|
|
"""
|
|
cursor = conn.cursor()
|
|
# Use the UNIQUE post_id to replace old data with new on conflict
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO posts (post_id, title, post_url, subreddit_id, post_timestamp, comment_count, avg_comment_sentiment)
|
|
VALUES (:post_id, :title, :post_url, :subreddit_id, :post_timestamp, :comment_count, :avg_comment_sentiment)
|
|
ON CONFLICT(post_id) DO UPDATE SET
|
|
comment_count = excluded.comment_count,
|
|
avg_comment_sentiment = excluded.avg_comment_sentiment;
|
|
""",
|
|
post_data,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def get_week_start_end(for_date):
|
|
"""Calculates the start (Monday) and end (Sunday) of the week."""
|
|
start_of_week = for_date - timedelta(days=for_date.weekday())
|
|
end_of_week = start_of_week + timedelta(days=6)
|
|
start_of_week = start_of_week.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_of_week = end_of_week.replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
return start_of_week, end_of_week
|
|
|
|
def get_overall_daily_summary():
|
|
"""Gets the top tickers across all subreddits from the LAST 24 HOURS."""
|
|
conn = get_db_connection()
|
|
one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)
|
|
one_day_ago_timestamp = int(one_day_ago.timestamp())
|
|
query = """
|
|
SELECT t.symbol, t.market_cap, t.closing_price, COUNT(m.id) as total_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions
|
|
FROM mentions m JOIN tickers t ON m.ticker_id = t.id
|
|
WHERE m.mention_timestamp >= ?
|
|
GROUP BY t.symbol, t.market_cap, t.closing_price
|
|
ORDER BY total_mentions DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(query, (one_day_ago_timestamp,)).fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
def get_overall_weekly_summary():
|
|
"""Gets the top tickers across all subreddits for LAST WEEK (Mon-Sun)."""
|
|
conn = get_db_connection()
|
|
today = datetime.now(timezone.utc)
|
|
target_date_for_last_week = today - timedelta(days=7)
|
|
start_of_week, end_of_week = get_week_start_end(target_date_for_last_week)
|
|
start_timestamp = int(start_of_week.timestamp())
|
|
end_timestamp = int(end_of_week.timestamp())
|
|
query = """
|
|
SELECT t.symbol, t.market_cap, t.closing_price, COUNT(m.id) as total_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions
|
|
FROM mentions m JOIN tickers t ON m.ticker_id = t.id
|
|
WHERE m.mention_timestamp BETWEEN ? AND ?
|
|
GROUP BY t.symbol, t.market_cap, t.closing_price
|
|
ORDER BY total_mentions DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(query, (start_timestamp, end_timestamp)).fetchall()
|
|
conn.close()
|
|
return results, start_of_week, end_of_week
|
|
|
|
def get_daily_summary_for_subreddit(subreddit_name):
|
|
"""Gets a summary for a subreddit's DAILY view (last 24 hours)."""
|
|
conn = get_db_connection()
|
|
one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)
|
|
one_day_ago_timestamp = int(one_day_ago.timestamp())
|
|
query = """
|
|
SELECT t.symbol, t.market_cap, t.closing_price, COUNT(m.id) as total_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions
|
|
FROM mentions m JOIN tickers t ON m.ticker_id = t.id JOIN subreddits s ON m.subreddit_id = s.id
|
|
WHERE LOWER(s.name) = LOWER(?) AND m.mention_timestamp >= ?
|
|
GROUP BY t.symbol, t.market_cap, t.closing_price
|
|
ORDER BY total_mentions DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(query, (subreddit_name, one_day_ago_timestamp)).fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
def get_weekly_summary_for_subreddit(subreddit_name, for_date):
|
|
"""Gets a summary for a subreddit's WEEKLY view (for a specific week)."""
|
|
conn = get_db_connection()
|
|
start_of_week, end_of_week = get_week_start_end(for_date)
|
|
start_timestamp = int(start_of_week.timestamp())
|
|
end_timestamp = int(end_of_week.timestamp())
|
|
query = """
|
|
SELECT t.symbol, t.market_cap, t.closing_price, COUNT(m.id) as total_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment > 0.1 THEN 1 ELSE 0 END) as bullish_mentions,
|
|
SUM(CASE WHEN m.mention_sentiment < -0.1 THEN 1 ELSE 0 END) as bearish_mentions
|
|
FROM mentions m JOIN tickers t ON m.ticker_id = t.id JOIN subreddits s ON m.subreddit_id = s.id
|
|
WHERE LOWER(s.name) = LOWER(?) AND m.mention_timestamp BETWEEN ? AND ?
|
|
GROUP BY t.symbol, t.market_cap, t.closing_price
|
|
ORDER BY total_mentions DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(query, (subreddit_name, start_timestamp, end_timestamp)).fetchall()
|
|
conn.close()
|
|
return results, start_of_week, end_of_week
|
|
|
|
|
|
def get_deep_dive_details(ticker_symbol):
|
|
"""Gets all analyzed posts that mention a specific ticker."""
|
|
conn = get_db_connection()
|
|
query = """
|
|
SELECT DISTINCT p.*, s.name as subreddit_name FROM posts p
|
|
JOIN mentions m ON p.post_id = m.post_id JOIN tickers t ON m.ticker_id = t.id
|
|
JOIN subreddits s ON p.subreddit_id = s.id
|
|
WHERE LOWER(t.symbol) = LOWER(?) ORDER BY p.post_timestamp DESC;
|
|
"""
|
|
results = conn.execute(query, (ticker_symbol,)).fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
def get_all_scanned_subreddits():
|
|
"""Gets a unique list of all subreddits we have data for."""
|
|
conn = get_db_connection()
|
|
results = conn.execute(
|
|
"SELECT DISTINCT name FROM subreddits ORDER BY name ASC;"
|
|
).fetchall()
|
|
conn.close()
|
|
return [row["name"] for row in results]
|
|
|
|
|
|
def get_all_tickers():
|
|
"""Retrieves the ID and symbol of every ticker in the database."""
|
|
conn = get_db_connection()
|
|
results = conn.execute("SELECT id, symbol FROM tickers;").fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
def get_ticker_by_symbol(symbol):
|
|
"""
|
|
Retrieves a single ticker's ID and symbol from the database.
|
|
The search is case-insensitive. Returns a Row object or None if not found.
|
|
"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT id, symbol FROM tickers WHERE LOWER(symbol) = LOWER(?)", (symbol,)
|
|
)
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
def get_top_daily_ticker_symbols():
|
|
"""Gets a simple list of the Top 10 ticker symbols from the last 24 hours."""
|
|
conn = get_db_connection()
|
|
one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)
|
|
one_day_ago_timestamp = int(one_day_ago.timestamp())
|
|
query = """
|
|
SELECT t.symbol FROM mentions m JOIN tickers t ON m.ticker_id = t.id
|
|
WHERE m.mention_timestamp >= ?
|
|
GROUP BY t.symbol ORDER BY COUNT(m.id) DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(query, (one_day_ago_timestamp,)).fetchall()
|
|
conn.close()
|
|
return [row["symbol"] for row in results] # Return a simple list of strings
|
|
|
|
|
|
def get_top_weekly_ticker_symbols():
|
|
"""Gets a simple list of the Top 10 ticker symbols from the last 7 days."""
|
|
conn = get_db_connection()
|
|
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
|
|
seven_days_ago_timestamp = int(seven_days_ago.timestamp())
|
|
query = """
|
|
SELECT t.symbol FROM mentions m JOIN tickers t ON m.ticker_id = t.id
|
|
WHERE m.mention_timestamp >= ?
|
|
GROUP BY t.symbol ORDER BY COUNT(m.id) DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(query, (seven_days_ago_timestamp,)).fetchall()
|
|
conn.close()
|
|
return [row["symbol"] for row in results] # Return a simple list of strings
|
|
|
|
|
|
def get_top_daily_ticker_symbols_for_subreddit(subreddit_name):
|
|
"""Gets a list of the Top 10 daily ticker symbols for a specific subreddit."""
|
|
conn = get_db_connection()
|
|
one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)
|
|
one_day_ago_timestamp = int(one_day_ago.timestamp())
|
|
query = """
|
|
SELECT t.symbol FROM mentions m JOIN tickers t ON m.ticker_id = t.id
|
|
JOIN subreddits s ON m.subreddit_id = s.id
|
|
WHERE LOWER(s.name) = LOWER(?) AND m.mention_timestamp >= ?
|
|
GROUP BY t.symbol ORDER BY COUNT(m.id) DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(
|
|
query,
|
|
(
|
|
subreddit_name,
|
|
one_day_ago_timestamp,
|
|
),
|
|
).fetchall()
|
|
conn.close()
|
|
return [row["symbol"] for row in results]
|
|
|
|
|
|
def get_top_weekly_ticker_symbols_for_subreddit(subreddit_name):
|
|
"""Gets a list of the Top 10 weekly ticker symbols for a specific subreddit."""
|
|
conn = get_db_connection()
|
|
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
|
|
seven_days_ago_timestamp = int(seven_days_ago.timestamp())
|
|
query = """
|
|
SELECT t.symbol FROM mentions m JOIN tickers t ON m.ticker_id = t.id
|
|
JOIN subreddits s ON m.subreddit_id = s.id
|
|
WHERE LOWER(s.name) = LOWER(?) AND m.mention_timestamp >= ?
|
|
GROUP BY t.symbol ORDER BY COUNT(m.id) DESC LIMIT 10;
|
|
"""
|
|
results = conn.execute(
|
|
query,
|
|
(
|
|
subreddit_name,
|
|
seven_days_ago_timestamp,
|
|
),
|
|
).fetchall()
|
|
conn.close()
|
|
return [row["symbol"] for row in results]
|