# rstat_tool/main.py import argparse import json import os import time import sys from dotenv import load_dotenv from pathlib import Path from concurrent.futures import ThreadPoolExecutor import praw import yfinance as yf import pandas as pd from . import database from .ticker_extractor import extract_golden_tickers, extract_potential_tickers from .sentiment_analyzer import get_sentiment_score from .logger_setup import setup_logging, logger as log def load_subreddits(filepath): """Loads a list of subreddits from a JSON file.""" try: with open(filepath, "r") as f: return json.load(f).get("subreddits", []) except (FileNotFoundError, json.JSONDecodeError) as e: log.error(f"Error loading config file '{filepath}': {e}") return None def get_reddit_instance(): """Initializes and returns a PRAW Reddit instance.""" env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) client_id = os.getenv("REDDIT_CLIENT_ID") client_secret = os.getenv("REDDIT_CLIENT_SECRET") user_agent = os.getenv("REDDIT_USER_AGENT") if not all([client_id, client_secret, user_agent]): log.error("Error: Reddit API credentials not found in .env file.") return None return praw.Reddit( client_id=client_id, client_secret=client_secret, user_agent=user_agent ) def fetch_financial_data(ticker_symbol): """ Fetches market cap and the most recent closing price for a single ticker. This function is designed to be thread-safe and robust. """ try: ticker = yf.Ticker(ticker_symbol) market_cap = ticker.info.get("marketCap") data = ticker.history(period="2d", auto_adjust=False) closing_price = None if not data.empty: last_close_raw = data["Close"].iloc[-1] if pd.notna(last_close_raw): closing_price = float(last_close_raw) return ticker_symbol, {"market_cap": market_cap, "closing_price": closing_price} except Exception: return ticker_symbol, None def _process_submission(submission, subreddit_id, conn, comment_limit): """ Processes a single Reddit submission with a more precise "Golden Ticker" logic. - If a '$' ticker exists anywhere, the entire submission is in "Golden Only" mode. - Falls back to potential tickers only if no '$' tickers are found anywhere. """ # 1. --- Establish Mode: Golden or Potential --- # Scan the entire submission (title + selftext) to determine the mode. post_text_for_discovery = submission.title + " " + submission.selftext golden_tickers_in_post = extract_golden_tickers(post_text_for_discovery) is_golden_mode = bool(golden_tickers_in_post) if is_golden_mode: log.info( f" -> Golden Ticker(s) Found: {', '.join(golden_tickers_in_post)}. Engaging Golden-Only Mode." ) # In Golden Mode, we ONLY care about tickers with a '$'. tickers_in_title = extract_golden_tickers(submission.title) else: log.info(" -> No Golden Tickers. Falling back to potential ticker search.") # In Potential Mode, we look for any valid-looking capitalized word. tickers_in_title = extract_potential_tickers(submission.title) all_tickers_found_in_post = set(tickers_in_title) ticker_id_cache = {} # 2. --- Process Title Mentions --- if tickers_in_title: log.info( f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments." ) post_sentiment = get_sentiment_score(submission.title) for ticker_symbol in tickers_in_title: # All title tickers are saved as 'post' type mentions ticker_id = database.get_or_create_entity( conn, "tickers", "symbol", ticker_symbol ) ticker_id_cache[ticker_symbol] = ticker_id database.add_mention( conn, ticker_id, subreddit_id, submission.id, "post", int(submission.created_utc), post_sentiment, ) # 3. --- Process Comments (Single, Efficient Loop) --- submission.comments.replace_more(limit=0) all_comments = submission.comments.list()[:comment_limit] for comment in all_comments: comment_sentiment = get_sentiment_score(comment.body) if tickers_in_title: # If the title had tickers, every comment is a mention for them. # We don't need to scan the comment text for tickers here. for ticker_symbol in tickers_in_title: ticker_id = ticker_id_cache[ticker_symbol] # Guaranteed to be in cache database.add_mention( conn, ticker_id, subreddit_id, submission.id, "comment", int(comment.created_utc), comment_sentiment, ) else: # If no title tickers, we must scan the comment for direct mentions. # The type of ticker we look for depends on the mode. if is_golden_mode: # This case is rare (no golden in title, but some in comments) but important. tickers_in_comment = extract_golden_tickers(comment.body) else: tickers_in_comment = extract_potential_tickers(comment.body) if tickers_in_comment: all_tickers_found_in_post.update(tickers_in_comment) for ticker_symbol in tickers_in_comment: ticker_id = database.get_or_create_entity( conn, "tickers", "symbol", ticker_symbol ) database.add_mention( conn, ticker_id, subreddit_id, submission.id, "comment", int(comment.created_utc), comment_sentiment, ) # 4. --- Save Deep Dive Analysis --- all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments] avg_sentiment = ( sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0 ) post_analysis_data = { "post_id": submission.id, "title": submission.title, "post_url": f"https://reddit.com{submission.permalink}", "subreddit_id": subreddit_id, "post_timestamp": int(submission.created_utc), "comment_count": len(all_comments), "avg_comment_sentiment": avg_sentiment, } database.add_or_update_post_analysis(conn, post_analysis_data) return all_tickers_found_in_post def scan_subreddits( reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1, fetch_financials=True, ): """ Scans subreddits to discover mentions, then performs a single batch update for financials if enabled. """ conn = database.get_db_connection() post_age_limit = days_to_scan * 86400 current_time = time.time() all_tickers_to_update = set() log.info(f"Scanning {len(subreddits_list)} subreddit(s) for NEW posts...") if not fetch_financials: log.warning("NOTE: Financial data fetching is disabled for this run.") for subreddit_name in subreddits_list: try: normalized_sub_name = subreddit_name.lower() subreddit_id = database.get_or_create_entity( conn, "subreddits", "name", normalized_sub_name ) subreddit = reddit.subreddit(normalized_sub_name) log.info(f"Scanning r/{normalized_sub_name}...") for submission in subreddit.new(limit=post_limit): if (current_time - submission.created_utc) > post_age_limit: log.info( f" -> Reached posts older than the {days_to_scan}-day limit." ) break tickers_found = _process_submission( submission, subreddit_id, conn, comment_limit ) if tickers_found: all_tickers_to_update.update(tickers_found) except Exception as e: log.error( f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True ) conn.close() log.critical("\n--- Reddit Scan Complete ---") if fetch_financials and all_tickers_to_update: log.critical( f"\n--- Starting Batch Financial Update for {len(all_tickers_to_update)} Discovered Tickers ---" ) tickers_from_db = {t["symbol"]: t["id"] for t in database.get_all_tickers()} tickers_needing_update_symbols = [ symbol for symbol in all_tickers_to_update if symbol in tickers_from_db ] financial_data_batch = {} with ThreadPoolExecutor(max_workers=10) as executor: results = executor.map(fetch_financial_data, tickers_needing_update_symbols) for symbol, data in results: if data: financial_data_batch[symbol] = data if financial_data_batch: conn = database.get_db_connection() for symbol, financials in financial_data_batch.items(): database.update_ticker_financials( conn, tickers_from_db[symbol], financials.get("market_cap"), financials.get("closing_price"), ) conn.close() log.critical("--- Batch Financial Update Complete ---") def main(): """Main function to run the Reddit stock analysis tool.""" parser = argparse.ArgumentParser( description="Analyze stock ticker mentions on Reddit.", formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( "-f", "--config", default="subreddits.json", help="Path to the JSON file for scanning. (Default: subreddits.json)", ) parser.add_argument( "-s", "--subreddit", help="Scan a single subreddit, ignoring the config file." ) parser.add_argument( "-d", "--days", type=int, default=1, help="Number of past days to scan for new posts. (Default: 1)", ) parser.add_argument( "-p", "--posts", type=int, default=200, help="Max posts to check per subreddit. (Default: 200)", ) parser.add_argument( "-c", "--comments", type=int, default=100, help="Number of comments to scan per post. (Default: 100)", ) parser.add_argument( "-n", "--no-financials", action="store_true", help="Disable fetching of financial data during the Reddit scan.", ) parser.add_argument( "--update-top-tickers", action="store_true", help="Update financial data only for tickers currently in the Top 10 daily/weekly dashboards.", ) parser.add_argument( "-u", "--update-financials-only", nargs="?", const="ALL_TICKERS", # A special value to signify "update all" default=None, metavar="TICKER", help="Update financials. Provide a ticker symbol to update just one,\nor use the flag alone to update all tickers in the database.", ) parser.add_argument( "--debug", action="store_true", help="Enable detailed debug logging to the console.", ) parser.add_argument( "--stdout", action="store_true", help="Print all log messages to the console." ) args = parser.parse_args() setup_logging(console_verbose=args.stdout, debug_mode=args.debug) database.initialize_db() if args.update_top_tickers: # --- Mode 1: Update Top Tickers --- log.critical("--- Starting Financial Data Update for Top Tickers ---") top_daily = database.get_top_daily_ticker_symbols() top_weekly = database.get_top_weekly_ticker_symbols() all_sub_names = database.get_all_scanned_subreddits() for sub_name in all_sub_names: top_daily.extend( database.get_top_daily_ticker_symbols_for_subreddit(sub_name) ) top_weekly.extend( database.get_top_weekly_ticker_symbols_for_subreddit(sub_name) ) tickers_to_update = sorted(list(set(top_daily + top_weekly))) if not tickers_to_update: log.info("No top tickers found in the last week. Nothing to update.") else: log.info( f"Found {len(tickers_to_update)} unique top tickers to update. Fetching in parallel..." ) financial_data_batch = {} successful_updates = 0 failed_updates = 0 with ThreadPoolExecutor(max_workers=10) as executor: results = executor.map(fetch_financial_data, tickers_to_update) for symbol, data in results: # A successful fetch is one where data is returned and has a closing price if data and data.get("closing_price") is not None: log.info(f" -> SUCCESS: Fetched data for {symbol}") financial_data_batch[symbol] = data successful_updates += 1 else: log.warning( f" -> FAILED: Could not fetch valid financial data for {symbol}" ) failed_updates += 1 if not financial_data_batch: log.error("Failed to fetch any batch financial data. Aborting update.") else: conn = database.get_db_connection() all_tickers_from_db = database.get_all_tickers() ticker_map = {t["symbol"]: t["id"] for t in all_tickers_from_db} for symbol, financials in financial_data_batch.items(): if symbol in ticker_map: database.update_ticker_financials( conn, ticker_map[symbol], financials.get("market_cap"), financials.get("closing_price"), ) conn.close() log.critical("--- Top Ticker Financial Data Update Complete ---") log.critical(f" Successful updates: {successful_updates}") log.critical(f" Failed updates: {failed_updates}") elif args.update_financials_only: # --- Mode 2: Update All or a Single Ticker --- update_mode = args.update_financials_only tickers_to_update = [] if update_mode == "ALL_TICKERS": log.critical("--- Starting Financial Data Update for ALL tickers ---") all_tickers_from_db = database.get_all_tickers() tickers_to_update = [t["symbol"] for t in all_tickers_from_db] else: ticker_symbol_to_update = update_mode log.critical( f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---" ) if database.get_ticker_by_symbol(ticker_symbol_to_update): tickers_to_update = [ticker_symbol_to_update] else: log.error( f"Ticker '{ticker_symbol_to_update}' not found in the database." ) if tickers_to_update: log.info( f"Found {len(tickers_to_update)} unique tickers to update. Fetching in parallel..." ) financial_data_batch = {} successful_updates = 0 failed_updates = 0 with ThreadPoolExecutor(max_workers=10) as executor: results = executor.map(fetch_financial_data, tickers_to_update) for symbol, data in results: # A successful fetch is one where data is returned and has a closing price if data and data.get("closing_price") is not None: log.info(f" -> SUCCESS: Fetched data for {symbol}") financial_data_batch[symbol] = data successful_updates += 1 else: log.warning( f" -> FAILED: Could not fetch valid financial data for {symbol}" ) failed_updates += 1 if not financial_data_batch: log.error("Failed to fetch any batch financial data. Aborting update.") else: conn = database.get_db_connection() all_tickers_from_db = database.get_all_tickers() ticker_map = {t["symbol"]: t["id"] for t in all_tickers_from_db} for symbol, financials in financial_data_batch.items(): if symbol in ticker_map: database.update_ticker_financials( conn, ticker_map[symbol], financials.get("market_cap"), financials.get("closing_price"), ) conn.close() log.critical("--- Financial Data Update Complete ---") log.critical(f" Successful updates: {successful_updates}") log.critical(f" Failed updates: {failed_updates}") else: # --- Mode 3: Default Reddit Scan --- log.critical("--- Starting Reddit Scan Mode ---") if args.subreddit: subreddits_to_scan = [args.subreddit] log.info(f"Targeted Scan Mode: Focusing on r/{args.subreddit}") else: log.info(f"Config Scan Mode: Loading subreddits from {args.config}") subreddits_to_scan = load_subreddits(args.config) if not subreddits_to_scan: log.error("Error: No subreddits to scan.") return reddit = get_reddit_instance() if not reddit: return scan_subreddits( reddit, subreddits_to_scan, post_limit=args.posts, comment_limit=args.comments, days_to_scan=args.days, fetch_financials=(not args.no_financials), ) if __name__ == "__main__": main()