Files
reddit_stock_analyzer/rstat_tool/main.py

252 lines
12 KiB
Python

# rstat_tool/main.py
import argparse
import json
import os
import time
import sys
import subprocess
from dotenv import load_dotenv
from pathlib import Path
import praw
from . import database
from .ticker_extractor import extract_tickers
from .sentiment_analyzer import get_sentiment_score
from .logger_setup import setup_logging, logger as log
def load_subreddits(filepath):
"""Loads a list of subreddits from a JSON file."""
try:
with open(filepath, 'r') as f:
return json.load(f).get("subreddits", [])
except (FileNotFoundError, json.JSONDecodeError) as e:
log.error(f"Error loading config file '{filepath}': {e}")
return None
def get_reddit_instance():
"""Initializes and returns a PRAW Reddit instance."""
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
client_id = os.getenv("REDDIT_CLIENT_ID")
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
user_agent = os.getenv("REDDIT_USER_AGENT")
if not all([client_id, client_secret, user_agent]):
log.error("Error: Reddit API credentials not found in .env file.")
return None
return praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent)
def get_financial_data_via_fetcher(ticker_symbol):
"""
Calls two separate, isolated fetcher scripts to get market cap and closing price,
bypassing the internal library conflict.
"""
financials = {"market_cap": None, "closing_price": None}
project_root = Path(__file__).parent.parent
# --- Call 1: Get Market Cap ---
try:
mc_script_path = project_root / 'fetch_market_cap.py'
command_mc = [sys.executable, str(mc_script_path), ticker_symbol]
result_mc = subprocess.run(command_mc, capture_output=True, text=True, check=True, timeout=30)
financials.update(json.loads(result_mc.stdout))
except Exception as e:
log.warning(f"Market cap fetcher failed for {ticker_symbol}: {e}")
# --- Call 2: Get Closing Price ---
try:
cp_script_path = project_root / 'fetch_close_price.py'
command_cp = [sys.executable, str(cp_script_path), ticker_symbol]
result_cp = subprocess.run(command_cp, capture_output=True, text=True, check=True, timeout=30)
financials.update(json.loads(result_cp.stdout))
except Exception as e:
log.warning(f"Closing price fetcher failed for {ticker_symbol}: {e}")
return financials
def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1):
""" Scans subreddits and uses the fetcher to get financial data. """
conn = database.get_db_connection()
post_age_limit = days_to_scan * 86400
current_time = time.time()
log.info(f"Scanning {len(subreddits_list)} subreddit(s) for NEW posts in the last {days_to_scan} day(s)...")
for subreddit_name in subreddits_list:
try:
normalized_sub_name = subreddit_name.lower()
subreddit_id = database.get_or_create_entity(conn, 'subreddits', 'name', normalized_sub_name)
subreddit = reddit.subreddit(normalized_sub_name)
log.info(f"Scanning r/{normalized_sub_name}...")
for submission in subreddit.new(limit=post_limit):
if (current_time - submission.created_utc) > post_age_limit:
log.info(f" -> Reached posts older than the {days_to_scan}-day limit.")
break
tickers_in_title = set(extract_tickers(submission.title))
all_tickers_found_in_post = set(tickers_in_title)
submission.comments.replace_more(limit=0)
all_comments = submission.comments.list()[:comment_limit]
if tickers_in_title:
log.info(f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments.")
post_sentiment = get_sentiment_score(submission.title)
for ticker_symbol in tickers_in_title:
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'post', int(submission.created_utc), post_sentiment)
for comment in all_comments:
comment_sentiment = get_sentiment_score(comment.body)
for ticker_symbol in tickers_in_title:
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
else:
for comment in all_comments:
tickers_in_comment = set(extract_tickers(comment.body))
if tickers_in_comment:
all_tickers_found_in_post.update(tickers_in_comment)
comment_sentiment = get_sentiment_score(comment.body)
for ticker_symbol in tickers_in_comment:
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
for ticker_symbol in all_tickers_found_in_post:
log.debug(f" DEBUG: Checking ticker '{ticker_symbol}' for financial update.")
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
ticker_info = database.get_ticker_info(conn, ticker_id)
# Log the state we are about to check
log.debug(f" -> Ticker Info from DB: last_updated = {ticker_info['last_updated']}")
needs_update = False
if not ticker_info['last_updated']:
log.debug(" -> Condition MET: 'last_updated' is NULL. Needs update.")
needs_update = True
elif (current_time - ticker_info['last_updated'] > database.MARKET_CAP_REFRESH_INTERVAL):
log.debug(" -> Condition MET: Data is older than 24 hours. Needs update.")
needs_update = True
else:
log.debug(" -> Condition NOT MET: Data is fresh. Skipping update.")
if needs_update:
log.info(f" -> Fetching financial data for {ticker_symbol}...")
financials = get_financial_data_via_fetcher(ticker_symbol)
log.debug(f" -> Fetched data: {financials}")
database.update_ticker_financials(
conn, ticker_id,
financials.get('market_cap'),
financials.get('closing_price')
)
all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments]
avg_sentiment = sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0
post_analysis_data = {
"post_id": submission.id, "title": submission.title,
"post_url": f"https://reddit.com{submission.permalink}", "subreddit_id": subreddit_id,
"post_timestamp": int(submission.created_utc), "comment_count": len(all_comments),
"avg_comment_sentiment": avg_sentiment
}
database.add_or_update_post_analysis(conn, post_analysis_data)
except Exception as e:
log.error(f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True)
conn.close()
log.critical("\n--- Scan Complete ---")
def main():
"""Main function to run the Reddit stock analysis tool."""
parser = argparse.ArgumentParser(description="Analyze stock ticker mentions on Reddit.", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
"-u", "--update-financials-only",
nargs='?',
const="ALL_TICKERS", # A special value to signify "update all"
default=None,
metavar='TICKER',
help="Update financials. Provide a ticker symbol to update just one,\nor use the flag alone to update all tickers in the database."
)
parser.add_argument("-f", "--config", default="subreddits.json", help="Path to the JSON file for scanning. (Default: subreddits.json)")
parser.add_argument("-s", "--subreddit", help="Scan a single subreddit, ignoring the config file.")
parser.add_argument("-d", "--days", type=int, default=1, help="Number of past days to scan for new posts. (Default: 1)")
parser.add_argument("-p", "--posts", type=int, default=200, help="Max posts to check per subreddit. (Default: 200)")
parser.add_argument("-c", "--comments", type=int, default=100, help="Number of comments to scan per post. (Default: 100)")
parser.add_argument("--debug", action="store_true", help="Enable detailed debug logging to the console.")
parser.add_argument("--stdout", action="store_true", help="Print all log messages to the console.")
args = parser.parse_args()
setup_logging(console_verbose=args.stdout, debug_mode=args.debug)
database.initialize_db()
update_mode = args.update_financials_only
if update_mode: # This block runs if -u or --update-financials-only was used
if update_mode == "ALL_TICKERS":
# This is the "update all" case
log.critical("--- Starting Financial Data Update for ALL tickers ---")
all_tickers = database.get_all_tickers()
log.info(f"Found {len(all_tickers)} tickers in the database to update.")
conn = database.get_db_connection()
for ticker in all_tickers:
symbol = ticker['symbol']
log.info(f" -> Updating financials for {symbol}...")
financials = get_financial_data_via_fetcher(symbol)
database.update_ticker_financials(
conn, ticker['id'],
financials.get('market_cap'),
financials.get('closing_price')
)
conn.close()
log.critical("--- Financial Data Update Complete ---")
else:
# This is the "update single ticker" case
ticker_symbol_to_update = update_mode
log.critical(f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---")
# Find the ticker in the database
ticker_info = database.get_ticker_by_symbol(ticker_symbol_to_update)
if ticker_info:
conn = database.get_db_connection()
log.info(f" -> Updating financials for {ticker_info['symbol']}...")
financials = get_financial_data_via_fetcher(ticker_info['symbol'])
database.update_ticker_financials(
conn, ticker_info['id'],
financials.get('market_cap'),
financials.get('closing_price')
)
conn.close()
log.critical("--- Financial Data Update Complete ---")
else:
log.error(f"Ticker '{ticker_symbol_to_update}' not found in the database. Please run a scan first to discover it.")
else:
log.critical("--- Starting Reddit Scan Mode ---")
if args.subreddit:
subreddits_to_scan = [args.subreddit]
log.info(f"Targeted Scan Mode: Focusing on r/{args.subreddit}")
else:
log.info(f"Config Scan Mode: Loading subreddits from {args.config}")
subreddits_to_scan = load_subreddits(args.config)
if not subreddits_to_scan:
log.error("Error: No subreddits to scan.")
return
reddit = get_reddit_instance()
if not reddit: return
scan_subreddits(
reddit,
subreddits_to_scan,
post_limit=args.posts,
comment_limit=args.comments,
days_to_scan=args.days
)
if __name__ == "__main__":
main()