Functionality to fetch financial data in paralell. Improves speed a lot.

This commit is contained in:
2025-07-28 12:15:46 +02:00
parent 5319bc554a
commit 55ea5d187f

View File

@@ -5,249 +5,168 @@ import json
import os
import time
import sys
import subprocess
from dotenv import load_dotenv
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import praw
import yfinance as yf
import pandas as pd
from . import database
from .ticker_extractor import extract_tickers
from .sentiment_analyzer import get_sentiment_score
from .logger_setup import setup_logging, logger as log
def load_subreddits(filepath):
"""Loads a list of subreddits from a JSON file."""
try:
with open(filepath, "r") as f:
with open(filepath, 'r') as f:
return json.load(f).get("subreddits", [])
except (FileNotFoundError, json.JSONDecodeError) as e:
log.error(f"Error loading config file '{filepath}': {e}")
return None
def get_reddit_instance():
"""Initializes and returns a PRAW Reddit instance."""
env_path = Path(__file__).parent.parent / ".env"
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
client_id = os.getenv("REDDIT_CLIENT_ID")
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
user_agent = os.getenv("REDDIT_USER_AGENT")
if not all([client_id, client_secret, user_agent]):
log.error("Error: Reddit API credentials not found in .env file.")
return None
return praw.Reddit(
client_id=client_id, client_secret=client_secret, user_agent=user_agent
)
return praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent)
def get_financial_data_via_fetcher(ticker_symbol):
def fetch_financial_data(ticker_symbol):
"""
Calls two separate, isolated fetcher scripts to get market cap and closing price,
bypassing the internal library conflict.
Fetches market cap and the most recent closing price for a single ticker.
This function is designed to be thread-safe and robust.
"""
financials = {"market_cap": None, "closing_price": None}
project_root = Path(__file__).parent.parent
# --- Call 1: Get Market Cap ---
try:
mc_script_path = project_root / "fetch_market_cap.py"
command_mc = [sys.executable, str(mc_script_path), ticker_symbol]
result_mc = subprocess.run(
command_mc, capture_output=True, text=True, check=True, timeout=30
)
financials.update(json.loads(result_mc.stdout))
except Exception as e:
log.warning(f"Market cap fetcher failed for {ticker_symbol}: {e}")
ticker = yf.Ticker(ticker_symbol)
market_cap = ticker.info.get('marketCap')
data = ticker.history(period="2d", auto_adjust=False)
closing_price = None
if not data.empty:
last_close_raw = data['Close'].iloc[-1]
if pd.notna(last_close_raw):
closing_price = float(last_close_raw)
return ticker_symbol, {"market_cap": market_cap, "closing_price": closing_price}
except Exception:
return ticker_symbol, None
# --- Call 2: Get Closing Price ---
try:
cp_script_path = project_root / "fetch_close_price.py"
command_cp = [sys.executable, str(cp_script_path), ticker_symbol]
result_cp = subprocess.run(
command_cp, capture_output=True, text=True, check=True, timeout=30
)
financials.update(json.loads(result_cp.stdout))
except Exception as e:
log.warning(f"Closing price fetcher failed for {ticker_symbol}: {e}")
return financials
# --- HELPER FUNCTION: Contains all the optimized logic for one post ---
def _process_submission(
submission, subreddit_id, conn, comment_limit, fetch_financials
):
def _process_submission(submission, subreddit_id, conn, comment_limit):
"""
Processes a single Reddit submission with optimized logic.
- Uses a single loop over comments.
- Caches ticker IDs to reduce DB lookups.
Processes a single Reddit submission to find and save mentions.
Crucially, it returns a set of all unique ticker symbols found.
FINANCIAL DATA IS NOT FETCHED HERE.
"""
current_time = time.time()
# 1. Initialize data collectors for this post
tickers_in_title = set(extract_tickers(submission.title))
all_tickers_found_in_post = set(tickers_in_title)
all_comment_sentiments = []
ticker_id_cache = {} # In-memory cache for ticker IDs for this post
ticker_id_cache = {}
submission.comments.replace_more(limit=0)
all_comments = submission.comments.list()[:comment_limit]
# 2. --- SINGLE LOOP OVER COMMENTS ---
# We gather all necessary information in one pass.
for comment in all_comments:
comment_sentiment = get_sentiment_score(comment.body)
all_comment_sentiments.append(comment_sentiment) # For the deep dive
tickers_in_comment = set(extract_tickers(comment.body))
if not tickers_in_comment:
continue
all_tickers_found_in_post.update(tickers_in_comment)
# Apply the hybrid logic
# Process title mentions first
if tickers_in_title:
# If the title has tickers, every comment is a mention for them
for ticker_symbol in tickers_in_title:
if ticker_symbol not in ticker_id_cache:
ticker_id_cache[ticker_symbol] = database.get_or_create_entity(
conn, "tickers", "symbol", ticker_symbol
)
ticker_id = ticker_id_cache[ticker_symbol]
database.add_mention(
conn,
ticker_id,
subreddit_id,
submission.id,
"comment",
int(comment.created_utc),
comment_sentiment,
)
else:
# If no title tickers, only direct mentions in comments count
for ticker_symbol in tickers_in_comment:
if ticker_symbol not in ticker_id_cache:
ticker_id_cache[ticker_symbol] = database.get_or_create_entity(
conn, "tickers", "symbol", ticker_symbol
)
ticker_id = ticker_id_cache[ticker_symbol]
database.add_mention(
conn,
ticker_id,
subreddit_id,
submission.id,
"comment",
int(comment.created_utc),
comment_sentiment,
)
# 3. Process title mentions (if any)
if tickers_in_title:
log.info(
f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments."
)
log.info(f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments.")
post_sentiment = get_sentiment_score(submission.title)
for ticker_symbol in tickers_in_title:
if ticker_symbol not in ticker_id_cache:
ticker_id_cache[ticker_symbol] = database.get_or_create_entity(
conn, "tickers", "symbol", ticker_symbol
)
ticker_id = ticker_id_cache[ticker_symbol]
database.add_mention(
conn,
ticker_id,
subreddit_id,
submission.id,
"post",
int(submission.created_utc),
post_sentiment,
)
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
ticker_id_cache[ticker_symbol] = ticker_id
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'post', int(submission.created_utc), post_sentiment)
# 4. Fetch financial data if enabled
if fetch_financials:
for ticker_symbol in all_tickers_found_in_post:
# Process comments
for comment in all_comments:
comment_sentiment = get_sentiment_score(comment.body)
if tickers_in_title:
# If title has tickers, every comment is a mention for them
for ticker_symbol in tickers_in_title:
ticker_id = ticker_id_cache[ticker_symbol] # Guaranteed to be in cache
ticker_info = database.get_ticker_info(conn, ticker_id)
if not ticker_info["last_updated"] or (
current_time - ticker_info["last_updated"]
> database.MARKET_CAP_REFRESH_INTERVAL
):
log.info(f" -> Fetching financial data for {ticker_symbol}...")
financials = get_financial_data_via_fetcher(ticker_symbol)
database.update_ticker_financials(
conn,
ticker_id,
financials.get("market_cap"),
financials.get("closing_price"),
)
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
else:
# Otherwise, only direct mentions in comments count
tickers_in_comment = set(extract_tickers(comment.body))
if tickers_in_comment:
all_tickers_found_in_post.update(tickers_in_comment)
for ticker_symbol in tickers_in_comment:
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
# 5. Save deep dive analysis
avg_sentiment = (
sum(all_comment_sentiments) / len(all_comment_sentiments)
if all_comment_sentiments
else 0
)
# Save deep dive analysis (this is separate from mention counting)
all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments]
avg_sentiment = sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0
post_analysis_data = {
"post_id": submission.id,
"title": submission.title,
"post_url": f"https://reddit.com{submission.permalink}",
"subreddit_id": subreddit_id,
"post_timestamp": int(submission.created_utc),
"comment_count": len(all_comments),
"avg_comment_sentiment": avg_sentiment,
"post_id": submission.id, "title": submission.title,
"post_url": f"https://reddit.com{submission.permalink}", "subreddit_id": subreddit_id,
"post_timestamp": int(submission.created_utc), "comment_count": len(all_comments),
"avg_comment_sentiment": avg_sentiment
}
database.add_or_update_post_analysis(conn, post_analysis_data)
return all_tickers_found_in_post
def scan_subreddits(
reddit,
subreddits_list,
post_limit=100,
comment_limit=100,
days_to_scan=1,
fetch_financials=True,
):
def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1, fetch_financials=True):
"""
Scans subreddits to discover mentions, then performs a single batch update for financials if enabled.
"""
conn = database.get_db_connection()
post_age_limit = days_to_scan * 86400
current_time = time.time()
all_tickers_to_update = set()
log.info(
f"Scanning {len(subreddits_list)} subreddit(s) for NEW posts in the last {days_to_scan} day(s)..."
)
log.info(f"Scanning {len(subreddits_list)} subreddit(s) for NEW posts...")
if not fetch_financials:
log.warning("NOTE: Financial data fetching is disabled for this run.")
for subreddit_name in subreddits_list:
try:
normalized_sub_name = subreddit_name.lower()
subreddit_id = database.get_or_create_entity(
conn, "subreddits", "name", normalized_sub_name
)
subreddit_id = database.get_or_create_entity(conn, 'subreddits', 'name', normalized_sub_name)
subreddit = reddit.subreddit(normalized_sub_name)
log.info(f"Scanning r/{normalized_sub_name}...")
for submission in subreddit.new(limit=post_limit):
if (current_time - submission.created_utc) > post_age_limit:
log.info(
f" -> Reached posts older than the {days_to_scan}-day limit."
)
log.info(f" -> Reached posts older than the {days_to_scan}-day limit.")
break
# Call the new helper function for each post
_process_submission(
submission, subreddit_id, conn, comment_limit, fetch_financials
)
tickers_found = _process_submission(submission, subreddit_id, conn, comment_limit)
if tickers_found:
all_tickers_to_update.update(tickers_found)
except Exception as e:
log.error(
f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True
)
log.error(f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True)
conn.close()
log.critical("\n--- Scan Complete ---")
log.critical("\n--- Reddit Scan Complete ---")
if fetch_financials and all_tickers_to_update:
log.critical(f"\n--- Starting Batch Financial Update for {len(all_tickers_to_update)} Discovered Tickers ---")
tickers_from_db = {t['symbol']: t['id'] for t in database.get_all_tickers()}
tickers_needing_update_symbols = [symbol for symbol in all_tickers_to_update if symbol in tickers_from_db]
financial_data_batch = {}
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(fetch_financial_data, tickers_needing_update_symbols)
for symbol, data in results:
if data:
financial_data_batch[symbol] = data
if financial_data_batch:
conn = database.get_db_connection()
for symbol, financials in financial_data_batch.items():
database.update_ticker_financials(
conn, tickers_from_db[symbol],
financials.get('market_cap'),
financials.get('closing_price')
)
conn.close()
log.critical("--- Batch Financial Update Complete ---")
def main():
@@ -322,100 +241,111 @@ def main():
database.initialize_db()
if args.update_top_tickers:
# --- Mode 1: Update Top Tickers ---
log.critical("--- Starting Financial Data Update for Top Tickers ---")
top_daily = database.get_top_daily_ticker_symbols()
top_weekly = database.get_top_weekly_ticker_symbols()
all_sub_names = database.get_all_scanned_subreddits()
for sub_name in all_sub_names:
top_daily.extend(database.get_top_daily_ticker_symbols_for_subreddit(sub_name))
top_weekly.extend(database.get_top_weekly_ticker_symbols_for_subreddit(sub_name))
tickers_to_update = sorted(list(set(top_daily + top_weekly)))
# 1. Start with an empty set to hold all unique tickers
tickers_to_update = set()
# 2. Get the overall top tickers
log.info("-> Checking overall top daily and weekly tickers...")
top_daily_overall = database.get_top_daily_ticker_symbols()
top_weekly_overall = database.get_top_weekly_ticker_symbols()
tickers_to_update.update(top_daily_overall)
tickers_to_update.update(top_weekly_overall)
# 3. Get all subreddits and loop through them
all_subreddits = database.get_all_scanned_subreddits()
log.info(
f"-> Checking top tickers for {len(all_subreddits)} individual subreddit(s)..."
)
for sub_name in all_subreddits:
log.debug(f" -> Checking r/{sub_name}...")
top_daily_sub = database.get_top_daily_ticker_symbols_for_subreddit(
sub_name
)
top_weekly_sub = database.get_top_weekly_ticker_symbols_for_subreddit(
sub_name
)
tickers_to_update.update(top_daily_sub)
tickers_to_update.update(top_weekly_sub)
unique_top_tickers = sorted(list(tickers_to_update))
if not unique_top_tickers:
if not tickers_to_update:
log.info("No top tickers found in the last week. Nothing to update.")
else:
log.info(
f"Found {len(unique_top_tickers)} unique top tickers to update: {', '.join(unique_top_tickers)}"
)
log.info(f"Found {len(tickers_to_update)} unique top tickers to update. Fetching in parallel...")
financial_data_batch = {}
successful_updates = 0
failed_updates = 0
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(fetch_financial_data, tickers_to_update)
for symbol, data in results:
# A successful fetch is one where data is returned and has a closing price
if data and data.get('closing_price') is not None:
log.info(f" -> SUCCESS: Fetched data for {symbol}")
financial_data_batch[symbol] = data
successful_updates += 1
else:
log.warning(f" -> FAILED: Could not fetch valid financial data for {symbol}")
failed_updates += 1
if not financial_data_batch:
log.error("Failed to fetch any batch financial data. Aborting update.")
else:
conn = database.get_db_connection()
for ticker_symbol in unique_top_tickers:
# 4. Find the ticker's ID to perform the update
ticker_info = database.get_ticker_by_symbol(ticker_symbol)
if ticker_info:
log.info(f" -> Updating financials for {ticker_info['symbol']}...")
financials = get_financial_data_via_fetcher(ticker_info["symbol"])
all_tickers_from_db = database.get_all_tickers()
ticker_map = {t['symbol']: t['id'] for t in all_tickers_from_db}
for symbol, financials in financial_data_batch.items():
if symbol in ticker_map:
database.update_ticker_financials(
conn,
ticker_info["id"],
financials.get("market_cap"),
financials.get("closing_price"),
conn, ticker_map[symbol],
financials.get('market_cap'),
financials.get('closing_price')
)
conn.close()
log.critical("--- Top Ticker Financial Data Update Complete ---")
log.critical(f" Successful updates: {successful_updates}")
log.critical(f" Failed updates: {failed_updates}")
elif args.update_financials_only:
# --- Mode 2: Update All or a Single Ticker ---
update_mode = args.update_financials_only
tickers_to_update = []
if update_mode == "ALL_TICKERS":
log.critical("--- Starting Financial Data Update for ALL tickers ---")
all_tickers = database.get_all_tickers()
log.info(f"Found {len(all_tickers)} tickers in the database to update.")
conn = database.get_db_connection()
for ticker in all_tickers:
symbol = ticker["symbol"]
log.info(f" -> Updating financials for {symbol}...")
financials = get_financial_data_via_fetcher(symbol)
database.update_ticker_financials(
conn,
ticker["id"],
financials.get("market_cap"),
financials.get("closing_price"),
)
conn.close()
all_tickers_from_db = database.get_all_tickers()
tickers_to_update = [t['symbol'] for t in all_tickers_from_db]
else:
ticker_symbol_to_update = update_mode
log.critical(
f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---"
)
ticker_info = database.get_ticker_by_symbol(ticker_symbol_to_update)
if ticker_info:
log.critical(f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---")
if database.get_ticker_by_symbol(ticker_symbol_to_update):
tickers_to_update = [ticker_symbol_to_update]
else:
log.error(f"Ticker '{ticker_symbol_to_update}' not found in the database.")
if tickers_to_update:
log.info(f"Found {len(tickers_to_update)} unique tickers to update. Fetching in parallel...")
financial_data_batch = {}
successful_updates = 0
failed_updates = 0
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(fetch_financial_data, tickers_to_update)
for symbol, data in results:
# A successful fetch is one where data is returned and has a closing price
if data and data.get('closing_price') is not None:
log.info(f" -> SUCCESS: Fetched data for {symbol}")
financial_data_batch[symbol] = data
successful_updates += 1
else:
log.warning(f" -> FAILED: Could not fetch valid financial data for {symbol}")
failed_updates += 1
if not financial_data_batch:
log.error("Failed to fetch any batch financial data. Aborting update.")
else:
conn = database.get_db_connection()
log.info(f" -> Updating financials for {ticker_info['symbol']}...")
financials = get_financial_data_via_fetcher(ticker_info["symbol"])
all_tickers_from_db = database.get_all_tickers()
ticker_map = {t['symbol']: t['id'] for t in all_tickers_from_db}
for symbol, financials in financial_data_batch.items():
if symbol in ticker_map:
database.update_ticker_financials(
conn,
ticker_info["id"],
financials.get("market_cap"),
financials.get("closing_price"),
conn, ticker_map[symbol],
financials.get('market_cap'),
financials.get('closing_price')
)
conn.close()
else:
log.error(
f"Ticker '{ticker_symbol_to_update}' not found in the database."
)
log.critical("--- Financial Data Update Complete ---")
log.critical(f" Successful updates: {successful_updates}")
log.critical(f" Failed updates: {failed_updates}")
else:
# --- Mode 3: Default Reddit Scan ---
@@ -432,8 +362,7 @@ def main():
return
reddit = get_reddit_instance()
if not reddit:
return
if not reddit: return
scan_subreddits(
reddit,
@@ -441,9 +370,8 @@ def main():
post_limit=args.posts,
comment_limit=args.comments,
days_to_scan=args.days,
fetch_financials=(not args.no_financials),
fetch_financials=(not args.no_financials)
)
if __name__ == "__main__":
main()