From 3c2a38d1a1d3808885cbe64eb6a1bece9041029d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A5l-Kristian=20Hamre?= Date: Mon, 28 Jul 2025 12:24:14 +0200 Subject: [PATCH] format doc. --- rstat_tool/main.py | 212 +++++++++++++++++++++++++++++++-------------- 1 file changed, 148 insertions(+), 64 deletions(-) diff --git a/rstat_tool/main.py b/rstat_tool/main.py index 84d9e5a..44e839a 100644 --- a/rstat_tool/main.py +++ b/rstat_tool/main.py @@ -18,18 +18,20 @@ from .ticker_extractor import extract_tickers from .sentiment_analyzer import get_sentiment_score from .logger_setup import setup_logging, logger as log + def load_subreddits(filepath): """Loads a list of subreddits from a JSON file.""" try: - with open(filepath, 'r') as f: + with open(filepath, "r") as f: return json.load(f).get("subreddits", []) except (FileNotFoundError, json.JSONDecodeError) as e: log.error(f"Error loading config file '{filepath}': {e}") return None + def get_reddit_instance(): """Initializes and returns a PRAW Reddit instance.""" - env_path = Path(__file__).parent.parent / '.env' + env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) client_id = os.getenv("REDDIT_CLIENT_ID") client_secret = os.getenv("REDDIT_CLIENT_SECRET") @@ -37,7 +39,10 @@ def get_reddit_instance(): if not all([client_id, client_secret, user_agent]): log.error("Error: Reddit API credentials not found in .env file.") return None - return praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent) + return praw.Reddit( + client_id=client_id, client_secret=client_secret, user_agent=user_agent + ) + def fetch_financial_data(ticker_symbol): """ @@ -46,17 +51,18 @@ def fetch_financial_data(ticker_symbol): """ try: ticker = yf.Ticker(ticker_symbol) - market_cap = ticker.info.get('marketCap') + market_cap = ticker.info.get("marketCap") data = ticker.history(period="2d", auto_adjust=False) closing_price = None if not data.empty: - last_close_raw = data['Close'].iloc[-1] + last_close_raw = data["Close"].iloc[-1] if pd.notna(last_close_raw): closing_price = float(last_close_raw) return ticker_symbol, {"market_cap": market_cap, "closing_price": closing_price} except Exception: return ticker_symbol, None + def _process_submission(submission, subreddit_id, conn, comment_limit): """ Processes a single Reddit submission to find and save mentions. @@ -69,15 +75,27 @@ def _process_submission(submission, subreddit_id, conn, comment_limit): submission.comments.replace_more(limit=0) all_comments = submission.comments.list()[:comment_limit] - + # Process title mentions first if tickers_in_title: - log.info(f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments.") + log.info( + f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments." + ) post_sentiment = get_sentiment_score(submission.title) for ticker_symbol in tickers_in_title: - ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol) + ticker_id = database.get_or_create_entity( + conn, "tickers", "symbol", ticker_symbol + ) ticker_id_cache[ticker_symbol] = ticker_id - database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'post', int(submission.created_utc), post_sentiment) + database.add_mention( + conn, + ticker_id, + subreddit_id, + submission.id, + "post", + int(submission.created_utc), + post_sentiment, + ) # Process comments for comment in all_comments: @@ -85,31 +103,64 @@ def _process_submission(submission, subreddit_id, conn, comment_limit): if tickers_in_title: # If title has tickers, every comment is a mention for them for ticker_symbol in tickers_in_title: - ticker_id = ticker_id_cache[ticker_symbol] # Guaranteed to be in cache - database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment) + ticker_id = ticker_id_cache[ticker_symbol] # Guaranteed to be in cache + database.add_mention( + conn, + ticker_id, + subreddit_id, + submission.id, + "comment", + int(comment.created_utc), + comment_sentiment, + ) else: # Otherwise, only direct mentions in comments count tickers_in_comment = set(extract_tickers(comment.body)) if tickers_in_comment: all_tickers_found_in_post.update(tickers_in_comment) for ticker_symbol in tickers_in_comment: - ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol) - database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment) + ticker_id = database.get_or_create_entity( + conn, "tickers", "symbol", ticker_symbol + ) + database.add_mention( + conn, + ticker_id, + subreddit_id, + submission.id, + "comment", + int(comment.created_utc), + comment_sentiment, + ) # Save deep dive analysis (this is separate from mention counting) all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments] - avg_sentiment = sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0 + avg_sentiment = ( + sum(all_comment_sentiments) / len(all_comment_sentiments) + if all_comment_sentiments + else 0 + ) post_analysis_data = { - "post_id": submission.id, "title": submission.title, - "post_url": f"https://reddit.com{submission.permalink}", "subreddit_id": subreddit_id, - "post_timestamp": int(submission.created_utc), "comment_count": len(all_comments), - "avg_comment_sentiment": avg_sentiment + "post_id": submission.id, + "title": submission.title, + "post_url": f"https://reddit.com{submission.permalink}", + "subreddit_id": subreddit_id, + "post_timestamp": int(submission.created_utc), + "comment_count": len(all_comments), + "avg_comment_sentiment": avg_sentiment, } database.add_or_update_post_analysis(conn, post_analysis_data) - + return all_tickers_found_in_post -def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1, fetch_financials=True): + +def scan_subreddits( + reddit, + subreddits_list, + post_limit=100, + comment_limit=100, + days_to_scan=1, + fetch_financials=True, +): """ Scans subreddits to discover mentions, then performs a single batch update for financials if enabled. """ @@ -117,7 +168,7 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, post_age_limit = days_to_scan * 86400 current_time = time.time() all_tickers_to_update = set() - + log.info(f"Scanning {len(subreddits_list)} subreddit(s) for NEW posts...") if not fetch_financials: log.warning("NOTE: Financial data fetching is disabled for this run.") @@ -125,30 +176,42 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, for subreddit_name in subreddits_list: try: normalized_sub_name = subreddit_name.lower() - subreddit_id = database.get_or_create_entity(conn, 'subreddits', 'name', normalized_sub_name) + subreddit_id = database.get_or_create_entity( + conn, "subreddits", "name", normalized_sub_name + ) subreddit = reddit.subreddit(normalized_sub_name) log.info(f"Scanning r/{normalized_sub_name}...") - + for submission in subreddit.new(limit=post_limit): if (current_time - submission.created_utc) > post_age_limit: - log.info(f" -> Reached posts older than the {days_to_scan}-day limit.") + log.info( + f" -> Reached posts older than the {days_to_scan}-day limit." + ) break - - tickers_found = _process_submission(submission, subreddit_id, conn, comment_limit) + + tickers_found = _process_submission( + submission, subreddit_id, conn, comment_limit + ) if tickers_found: all_tickers_to_update.update(tickers_found) - + except Exception as e: - log.error(f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True) - + log.error( + f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True + ) + conn.close() log.critical("\n--- Reddit Scan Complete ---") if fetch_financials and all_tickers_to_update: - log.critical(f"\n--- Starting Batch Financial Update for {len(all_tickers_to_update)} Discovered Tickers ---") - - tickers_from_db = {t['symbol']: t['id'] for t in database.get_all_tickers()} - tickers_needing_update_symbols = [symbol for symbol in all_tickers_to_update if symbol in tickers_from_db] + log.critical( + f"\n--- Starting Batch Financial Update for {len(all_tickers_to_update)} Discovered Tickers ---" + ) + + tickers_from_db = {t["symbol"]: t["id"] for t in database.get_all_tickers()} + tickers_needing_update_symbols = [ + symbol for symbol in all_tickers_to_update if symbol in tickers_from_db + ] financial_data_batch = {} with ThreadPoolExecutor(max_workers=10) as executor: @@ -156,14 +219,15 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, for symbol, data in results: if data: financial_data_batch[symbol] = data - + if financial_data_batch: conn = database.get_db_connection() for symbol, financials in financial_data_batch.items(): database.update_ticker_financials( - conn, tickers_from_db[symbol], - financials.get('market_cap'), - financials.get('closing_price') + conn, + tickers_from_db[symbol], + financials.get("market_cap"), + financials.get("closing_price"), ) conn.close() log.critical("--- Batch Financial Update Complete ---") @@ -247,14 +311,20 @@ def main(): top_weekly = database.get_top_weekly_ticker_symbols() all_sub_names = database.get_all_scanned_subreddits() for sub_name in all_sub_names: - top_daily.extend(database.get_top_daily_ticker_symbols_for_subreddit(sub_name)) - top_weekly.extend(database.get_top_weekly_ticker_symbols_for_subreddit(sub_name)) + top_daily.extend( + database.get_top_daily_ticker_symbols_for_subreddit(sub_name) + ) + top_weekly.extend( + database.get_top_weekly_ticker_symbols_for_subreddit(sub_name) + ) tickers_to_update = sorted(list(set(top_daily + top_weekly))) if not tickers_to_update: log.info("No top tickers found in the last week. Nothing to update.") else: - log.info(f"Found {len(tickers_to_update)} unique top tickers to update. Fetching in parallel...") + log.info( + f"Found {len(tickers_to_update)} unique top tickers to update. Fetching in parallel..." + ) financial_data_batch = {} successful_updates = 0 @@ -264,30 +334,33 @@ def main(): results = executor.map(fetch_financial_data, tickers_to_update) for symbol, data in results: # A successful fetch is one where data is returned and has a closing price - if data and data.get('closing_price') is not None: + if data and data.get("closing_price") is not None: log.info(f" -> SUCCESS: Fetched data for {symbol}") financial_data_batch[symbol] = data successful_updates += 1 else: - log.warning(f" -> FAILED: Could not fetch valid financial data for {symbol}") + log.warning( + f" -> FAILED: Could not fetch valid financial data for {symbol}" + ) failed_updates += 1 - + if not financial_data_batch: log.error("Failed to fetch any batch financial data. Aborting update.") else: conn = database.get_db_connection() all_tickers_from_db = database.get_all_tickers() - ticker_map = {t['symbol']: t['id'] for t in all_tickers_from_db} + ticker_map = {t["symbol"]: t["id"] for t in all_tickers_from_db} for symbol, financials in financial_data_batch.items(): if symbol in ticker_map: database.update_ticker_financials( - conn, ticker_map[symbol], - financials.get('market_cap'), - financials.get('closing_price') + conn, + ticker_map[symbol], + financials.get("market_cap"), + financials.get("closing_price"), ) conn.close() - + log.critical("--- Top Ticker Financial Data Update Complete ---") log.critical(f" Successful updates: {successful_updates}") log.critical(f" Failed updates: {failed_updates}") @@ -299,17 +372,23 @@ def main(): if update_mode == "ALL_TICKERS": log.critical("--- Starting Financial Data Update for ALL tickers ---") all_tickers_from_db = database.get_all_tickers() - tickers_to_update = [t['symbol'] for t in all_tickers_from_db] + tickers_to_update = [t["symbol"] for t in all_tickers_from_db] else: ticker_symbol_to_update = update_mode - log.critical(f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---") + log.critical( + f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---" + ) if database.get_ticker_by_symbol(ticker_symbol_to_update): - tickers_to_update = [ticker_symbol_to_update] + tickers_to_update = [ticker_symbol_to_update] else: - log.error(f"Ticker '{ticker_symbol_to_update}' not found in the database.") - + log.error( + f"Ticker '{ticker_symbol_to_update}' not found in the database." + ) + if tickers_to_update: - log.info(f"Found {len(tickers_to_update)} unique tickers to update. Fetching in parallel...") + log.info( + f"Found {len(tickers_to_update)} unique tickers to update. Fetching in parallel..." + ) financial_data_batch = {} successful_updates = 0 @@ -319,27 +398,30 @@ def main(): results = executor.map(fetch_financial_data, tickers_to_update) for symbol, data in results: # A successful fetch is one where data is returned and has a closing price - if data and data.get('closing_price') is not None: + if data and data.get("closing_price") is not None: log.info(f" -> SUCCESS: Fetched data for {symbol}") financial_data_batch[symbol] = data successful_updates += 1 else: - log.warning(f" -> FAILED: Could not fetch valid financial data for {symbol}") + log.warning( + f" -> FAILED: Could not fetch valid financial data for {symbol}" + ) failed_updates += 1 - + if not financial_data_batch: log.error("Failed to fetch any batch financial data. Aborting update.") else: conn = database.get_db_connection() all_tickers_from_db = database.get_all_tickers() - ticker_map = {t['symbol']: t['id'] for t in all_tickers_from_db} + ticker_map = {t["symbol"]: t["id"] for t in all_tickers_from_db} for symbol, financials in financial_data_batch.items(): if symbol in ticker_map: database.update_ticker_financials( - conn, ticker_map[symbol], - financials.get('market_cap'), - financials.get('closing_price') + conn, + ticker_map[symbol], + financials.get("market_cap"), + financials.get("closing_price"), ) conn.close() @@ -355,14 +437,15 @@ def main(): log.info(f"Targeted Scan Mode: Focusing on r/{args.subreddit}") else: log.info(f"Config Scan Mode: Loading subreddits from {args.config}") - subreddits_to_scan = load_subreddits(args.config) + subreddits_to_scan = load_subreddits(args.config) if not subreddits_to_scan: log.error("Error: No subreddits to scan.") return reddit = get_reddit_instance() - if not reddit: return + if not reddit: + return scan_subreddits( reddit, @@ -370,8 +453,9 @@ def main(): post_limit=args.posts, comment_limit=args.comments, days_to_scan=args.days, - fetch_financials=(not args.no_financials) + fetch_financials=(not args.no_financials), ) + if __name__ == "__main__": main()