format doc.
This commit is contained in:
@@ -18,18 +18,20 @@ from .ticker_extractor import extract_tickers
|
|||||||
from .sentiment_analyzer import get_sentiment_score
|
from .sentiment_analyzer import get_sentiment_score
|
||||||
from .logger_setup import setup_logging, logger as log
|
from .logger_setup import setup_logging, logger as log
|
||||||
|
|
||||||
|
|
||||||
def load_subreddits(filepath):
|
def load_subreddits(filepath):
|
||||||
"""Loads a list of subreddits from a JSON file."""
|
"""Loads a list of subreddits from a JSON file."""
|
||||||
try:
|
try:
|
||||||
with open(filepath, 'r') as f:
|
with open(filepath, "r") as f:
|
||||||
return json.load(f).get("subreddits", [])
|
return json.load(f).get("subreddits", [])
|
||||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||||
log.error(f"Error loading config file '{filepath}': {e}")
|
log.error(f"Error loading config file '{filepath}': {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_reddit_instance():
|
def get_reddit_instance():
|
||||||
"""Initializes and returns a PRAW Reddit instance."""
|
"""Initializes and returns a PRAW Reddit instance."""
|
||||||
env_path = Path(__file__).parent.parent / '.env'
|
env_path = Path(__file__).parent.parent / ".env"
|
||||||
load_dotenv(dotenv_path=env_path)
|
load_dotenv(dotenv_path=env_path)
|
||||||
client_id = os.getenv("REDDIT_CLIENT_ID")
|
client_id = os.getenv("REDDIT_CLIENT_ID")
|
||||||
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
|
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
|
||||||
@@ -37,7 +39,10 @@ def get_reddit_instance():
|
|||||||
if not all([client_id, client_secret, user_agent]):
|
if not all([client_id, client_secret, user_agent]):
|
||||||
log.error("Error: Reddit API credentials not found in .env file.")
|
log.error("Error: Reddit API credentials not found in .env file.")
|
||||||
return None
|
return None
|
||||||
return praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent)
|
return praw.Reddit(
|
||||||
|
client_id=client_id, client_secret=client_secret, user_agent=user_agent
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def fetch_financial_data(ticker_symbol):
|
def fetch_financial_data(ticker_symbol):
|
||||||
"""
|
"""
|
||||||
@@ -46,17 +51,18 @@ def fetch_financial_data(ticker_symbol):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ticker = yf.Ticker(ticker_symbol)
|
ticker = yf.Ticker(ticker_symbol)
|
||||||
market_cap = ticker.info.get('marketCap')
|
market_cap = ticker.info.get("marketCap")
|
||||||
data = ticker.history(period="2d", auto_adjust=False)
|
data = ticker.history(period="2d", auto_adjust=False)
|
||||||
closing_price = None
|
closing_price = None
|
||||||
if not data.empty:
|
if not data.empty:
|
||||||
last_close_raw = data['Close'].iloc[-1]
|
last_close_raw = data["Close"].iloc[-1]
|
||||||
if pd.notna(last_close_raw):
|
if pd.notna(last_close_raw):
|
||||||
closing_price = float(last_close_raw)
|
closing_price = float(last_close_raw)
|
||||||
return ticker_symbol, {"market_cap": market_cap, "closing_price": closing_price}
|
return ticker_symbol, {"market_cap": market_cap, "closing_price": closing_price}
|
||||||
except Exception:
|
except Exception:
|
||||||
return ticker_symbol, None
|
return ticker_symbol, None
|
||||||
|
|
||||||
|
|
||||||
def _process_submission(submission, subreddit_id, conn, comment_limit):
|
def _process_submission(submission, subreddit_id, conn, comment_limit):
|
||||||
"""
|
"""
|
||||||
Processes a single Reddit submission to find and save mentions.
|
Processes a single Reddit submission to find and save mentions.
|
||||||
@@ -72,12 +78,24 @@ def _process_submission(submission, subreddit_id, conn, comment_limit):
|
|||||||
|
|
||||||
# Process title mentions first
|
# Process title mentions first
|
||||||
if tickers_in_title:
|
if tickers_in_title:
|
||||||
log.info(f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments.")
|
log.info(
|
||||||
|
f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments."
|
||||||
|
)
|
||||||
post_sentiment = get_sentiment_score(submission.title)
|
post_sentiment = get_sentiment_score(submission.title)
|
||||||
for ticker_symbol in tickers_in_title:
|
for ticker_symbol in tickers_in_title:
|
||||||
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
ticker_id = database.get_or_create_entity(
|
||||||
|
conn, "tickers", "symbol", ticker_symbol
|
||||||
|
)
|
||||||
ticker_id_cache[ticker_symbol] = ticker_id
|
ticker_id_cache[ticker_symbol] = ticker_id
|
||||||
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'post', int(submission.created_utc), post_sentiment)
|
database.add_mention(
|
||||||
|
conn,
|
||||||
|
ticker_id,
|
||||||
|
subreddit_id,
|
||||||
|
submission.id,
|
||||||
|
"post",
|
||||||
|
int(submission.created_utc),
|
||||||
|
post_sentiment,
|
||||||
|
)
|
||||||
|
|
||||||
# Process comments
|
# Process comments
|
||||||
for comment in all_comments:
|
for comment in all_comments:
|
||||||
@@ -86,30 +104,63 @@ def _process_submission(submission, subreddit_id, conn, comment_limit):
|
|||||||
# If title has tickers, every comment is a mention for them
|
# If title has tickers, every comment is a mention for them
|
||||||
for ticker_symbol in tickers_in_title:
|
for ticker_symbol in tickers_in_title:
|
||||||
ticker_id = ticker_id_cache[ticker_symbol] # Guaranteed to be in cache
|
ticker_id = ticker_id_cache[ticker_symbol] # Guaranteed to be in cache
|
||||||
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
|
database.add_mention(
|
||||||
|
conn,
|
||||||
|
ticker_id,
|
||||||
|
subreddit_id,
|
||||||
|
submission.id,
|
||||||
|
"comment",
|
||||||
|
int(comment.created_utc),
|
||||||
|
comment_sentiment,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Otherwise, only direct mentions in comments count
|
# Otherwise, only direct mentions in comments count
|
||||||
tickers_in_comment = set(extract_tickers(comment.body))
|
tickers_in_comment = set(extract_tickers(comment.body))
|
||||||
if tickers_in_comment:
|
if tickers_in_comment:
|
||||||
all_tickers_found_in_post.update(tickers_in_comment)
|
all_tickers_found_in_post.update(tickers_in_comment)
|
||||||
for ticker_symbol in tickers_in_comment:
|
for ticker_symbol in tickers_in_comment:
|
||||||
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
ticker_id = database.get_or_create_entity(
|
||||||
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
|
conn, "tickers", "symbol", ticker_symbol
|
||||||
|
)
|
||||||
|
database.add_mention(
|
||||||
|
conn,
|
||||||
|
ticker_id,
|
||||||
|
subreddit_id,
|
||||||
|
submission.id,
|
||||||
|
"comment",
|
||||||
|
int(comment.created_utc),
|
||||||
|
comment_sentiment,
|
||||||
|
)
|
||||||
|
|
||||||
# Save deep dive analysis (this is separate from mention counting)
|
# Save deep dive analysis (this is separate from mention counting)
|
||||||
all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments]
|
all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments]
|
||||||
avg_sentiment = sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0
|
avg_sentiment = (
|
||||||
|
sum(all_comment_sentiments) / len(all_comment_sentiments)
|
||||||
|
if all_comment_sentiments
|
||||||
|
else 0
|
||||||
|
)
|
||||||
post_analysis_data = {
|
post_analysis_data = {
|
||||||
"post_id": submission.id, "title": submission.title,
|
"post_id": submission.id,
|
||||||
"post_url": f"https://reddit.com{submission.permalink}", "subreddit_id": subreddit_id,
|
"title": submission.title,
|
||||||
"post_timestamp": int(submission.created_utc), "comment_count": len(all_comments),
|
"post_url": f"https://reddit.com{submission.permalink}",
|
||||||
"avg_comment_sentiment": avg_sentiment
|
"subreddit_id": subreddit_id,
|
||||||
|
"post_timestamp": int(submission.created_utc),
|
||||||
|
"comment_count": len(all_comments),
|
||||||
|
"avg_comment_sentiment": avg_sentiment,
|
||||||
}
|
}
|
||||||
database.add_or_update_post_analysis(conn, post_analysis_data)
|
database.add_or_update_post_analysis(conn, post_analysis_data)
|
||||||
|
|
||||||
return all_tickers_found_in_post
|
return all_tickers_found_in_post
|
||||||
|
|
||||||
def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1, fetch_financials=True):
|
|
||||||
|
def scan_subreddits(
|
||||||
|
reddit,
|
||||||
|
subreddits_list,
|
||||||
|
post_limit=100,
|
||||||
|
comment_limit=100,
|
||||||
|
days_to_scan=1,
|
||||||
|
fetch_financials=True,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Scans subreddits to discover mentions, then performs a single batch update for financials if enabled.
|
Scans subreddits to discover mentions, then performs a single batch update for financials if enabled.
|
||||||
"""
|
"""
|
||||||
@@ -125,30 +176,42 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100,
|
|||||||
for subreddit_name in subreddits_list:
|
for subreddit_name in subreddits_list:
|
||||||
try:
|
try:
|
||||||
normalized_sub_name = subreddit_name.lower()
|
normalized_sub_name = subreddit_name.lower()
|
||||||
subreddit_id = database.get_or_create_entity(conn, 'subreddits', 'name', normalized_sub_name)
|
subreddit_id = database.get_or_create_entity(
|
||||||
|
conn, "subreddits", "name", normalized_sub_name
|
||||||
|
)
|
||||||
subreddit = reddit.subreddit(normalized_sub_name)
|
subreddit = reddit.subreddit(normalized_sub_name)
|
||||||
log.info(f"Scanning r/{normalized_sub_name}...")
|
log.info(f"Scanning r/{normalized_sub_name}...")
|
||||||
|
|
||||||
for submission in subreddit.new(limit=post_limit):
|
for submission in subreddit.new(limit=post_limit):
|
||||||
if (current_time - submission.created_utc) > post_age_limit:
|
if (current_time - submission.created_utc) > post_age_limit:
|
||||||
log.info(f" -> Reached posts older than the {days_to_scan}-day limit.")
|
log.info(
|
||||||
|
f" -> Reached posts older than the {days_to_scan}-day limit."
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
tickers_found = _process_submission(submission, subreddit_id, conn, comment_limit)
|
tickers_found = _process_submission(
|
||||||
|
submission, subreddit_id, conn, comment_limit
|
||||||
|
)
|
||||||
if tickers_found:
|
if tickers_found:
|
||||||
all_tickers_to_update.update(tickers_found)
|
all_tickers_to_update.update(tickers_found)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True)
|
log.error(
|
||||||
|
f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True
|
||||||
|
)
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
log.critical("\n--- Reddit Scan Complete ---")
|
log.critical("\n--- Reddit Scan Complete ---")
|
||||||
|
|
||||||
if fetch_financials and all_tickers_to_update:
|
if fetch_financials and all_tickers_to_update:
|
||||||
log.critical(f"\n--- Starting Batch Financial Update for {len(all_tickers_to_update)} Discovered Tickers ---")
|
log.critical(
|
||||||
|
f"\n--- Starting Batch Financial Update for {len(all_tickers_to_update)} Discovered Tickers ---"
|
||||||
|
)
|
||||||
|
|
||||||
tickers_from_db = {t['symbol']: t['id'] for t in database.get_all_tickers()}
|
tickers_from_db = {t["symbol"]: t["id"] for t in database.get_all_tickers()}
|
||||||
tickers_needing_update_symbols = [symbol for symbol in all_tickers_to_update if symbol in tickers_from_db]
|
tickers_needing_update_symbols = [
|
||||||
|
symbol for symbol in all_tickers_to_update if symbol in tickers_from_db
|
||||||
|
]
|
||||||
|
|
||||||
financial_data_batch = {}
|
financial_data_batch = {}
|
||||||
with ThreadPoolExecutor(max_workers=10) as executor:
|
with ThreadPoolExecutor(max_workers=10) as executor:
|
||||||
@@ -161,9 +224,10 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100,
|
|||||||
conn = database.get_db_connection()
|
conn = database.get_db_connection()
|
||||||
for symbol, financials in financial_data_batch.items():
|
for symbol, financials in financial_data_batch.items():
|
||||||
database.update_ticker_financials(
|
database.update_ticker_financials(
|
||||||
conn, tickers_from_db[symbol],
|
conn,
|
||||||
financials.get('market_cap'),
|
tickers_from_db[symbol],
|
||||||
financials.get('closing_price')
|
financials.get("market_cap"),
|
||||||
|
financials.get("closing_price"),
|
||||||
)
|
)
|
||||||
conn.close()
|
conn.close()
|
||||||
log.critical("--- Batch Financial Update Complete ---")
|
log.critical("--- Batch Financial Update Complete ---")
|
||||||
@@ -247,14 +311,20 @@ def main():
|
|||||||
top_weekly = database.get_top_weekly_ticker_symbols()
|
top_weekly = database.get_top_weekly_ticker_symbols()
|
||||||
all_sub_names = database.get_all_scanned_subreddits()
|
all_sub_names = database.get_all_scanned_subreddits()
|
||||||
for sub_name in all_sub_names:
|
for sub_name in all_sub_names:
|
||||||
top_daily.extend(database.get_top_daily_ticker_symbols_for_subreddit(sub_name))
|
top_daily.extend(
|
||||||
top_weekly.extend(database.get_top_weekly_ticker_symbols_for_subreddit(sub_name))
|
database.get_top_daily_ticker_symbols_for_subreddit(sub_name)
|
||||||
|
)
|
||||||
|
top_weekly.extend(
|
||||||
|
database.get_top_weekly_ticker_symbols_for_subreddit(sub_name)
|
||||||
|
)
|
||||||
tickers_to_update = sorted(list(set(top_daily + top_weekly)))
|
tickers_to_update = sorted(list(set(top_daily + top_weekly)))
|
||||||
|
|
||||||
if not tickers_to_update:
|
if not tickers_to_update:
|
||||||
log.info("No top tickers found in the last week. Nothing to update.")
|
log.info("No top tickers found in the last week. Nothing to update.")
|
||||||
else:
|
else:
|
||||||
log.info(f"Found {len(tickers_to_update)} unique top tickers to update. Fetching in parallel...")
|
log.info(
|
||||||
|
f"Found {len(tickers_to_update)} unique top tickers to update. Fetching in parallel..."
|
||||||
|
)
|
||||||
|
|
||||||
financial_data_batch = {}
|
financial_data_batch = {}
|
||||||
successful_updates = 0
|
successful_updates = 0
|
||||||
@@ -264,12 +334,14 @@ def main():
|
|||||||
results = executor.map(fetch_financial_data, tickers_to_update)
|
results = executor.map(fetch_financial_data, tickers_to_update)
|
||||||
for symbol, data in results:
|
for symbol, data in results:
|
||||||
# A successful fetch is one where data is returned and has a closing price
|
# A successful fetch is one where data is returned and has a closing price
|
||||||
if data and data.get('closing_price') is not None:
|
if data and data.get("closing_price") is not None:
|
||||||
log.info(f" -> SUCCESS: Fetched data for {symbol}")
|
log.info(f" -> SUCCESS: Fetched data for {symbol}")
|
||||||
financial_data_batch[symbol] = data
|
financial_data_batch[symbol] = data
|
||||||
successful_updates += 1
|
successful_updates += 1
|
||||||
else:
|
else:
|
||||||
log.warning(f" -> FAILED: Could not fetch valid financial data for {symbol}")
|
log.warning(
|
||||||
|
f" -> FAILED: Could not fetch valid financial data for {symbol}"
|
||||||
|
)
|
||||||
failed_updates += 1
|
failed_updates += 1
|
||||||
|
|
||||||
if not financial_data_batch:
|
if not financial_data_batch:
|
||||||
@@ -277,14 +349,15 @@ def main():
|
|||||||
else:
|
else:
|
||||||
conn = database.get_db_connection()
|
conn = database.get_db_connection()
|
||||||
all_tickers_from_db = database.get_all_tickers()
|
all_tickers_from_db = database.get_all_tickers()
|
||||||
ticker_map = {t['symbol']: t['id'] for t in all_tickers_from_db}
|
ticker_map = {t["symbol"]: t["id"] for t in all_tickers_from_db}
|
||||||
|
|
||||||
for symbol, financials in financial_data_batch.items():
|
for symbol, financials in financial_data_batch.items():
|
||||||
if symbol in ticker_map:
|
if symbol in ticker_map:
|
||||||
database.update_ticker_financials(
|
database.update_ticker_financials(
|
||||||
conn, ticker_map[symbol],
|
conn,
|
||||||
financials.get('market_cap'),
|
ticker_map[symbol],
|
||||||
financials.get('closing_price')
|
financials.get("market_cap"),
|
||||||
|
financials.get("closing_price"),
|
||||||
)
|
)
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -299,17 +372,23 @@ def main():
|
|||||||
if update_mode == "ALL_TICKERS":
|
if update_mode == "ALL_TICKERS":
|
||||||
log.critical("--- Starting Financial Data Update for ALL tickers ---")
|
log.critical("--- Starting Financial Data Update for ALL tickers ---")
|
||||||
all_tickers_from_db = database.get_all_tickers()
|
all_tickers_from_db = database.get_all_tickers()
|
||||||
tickers_to_update = [t['symbol'] for t in all_tickers_from_db]
|
tickers_to_update = [t["symbol"] for t in all_tickers_from_db]
|
||||||
else:
|
else:
|
||||||
ticker_symbol_to_update = update_mode
|
ticker_symbol_to_update = update_mode
|
||||||
log.critical(f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---")
|
log.critical(
|
||||||
|
f"--- Starting Financial Data Update for single ticker: {ticker_symbol_to_update} ---"
|
||||||
|
)
|
||||||
if database.get_ticker_by_symbol(ticker_symbol_to_update):
|
if database.get_ticker_by_symbol(ticker_symbol_to_update):
|
||||||
tickers_to_update = [ticker_symbol_to_update]
|
tickers_to_update = [ticker_symbol_to_update]
|
||||||
else:
|
else:
|
||||||
log.error(f"Ticker '{ticker_symbol_to_update}' not found in the database.")
|
log.error(
|
||||||
|
f"Ticker '{ticker_symbol_to_update}' not found in the database."
|
||||||
|
)
|
||||||
|
|
||||||
if tickers_to_update:
|
if tickers_to_update:
|
||||||
log.info(f"Found {len(tickers_to_update)} unique tickers to update. Fetching in parallel...")
|
log.info(
|
||||||
|
f"Found {len(tickers_to_update)} unique tickers to update. Fetching in parallel..."
|
||||||
|
)
|
||||||
|
|
||||||
financial_data_batch = {}
|
financial_data_batch = {}
|
||||||
successful_updates = 0
|
successful_updates = 0
|
||||||
@@ -319,12 +398,14 @@ def main():
|
|||||||
results = executor.map(fetch_financial_data, tickers_to_update)
|
results = executor.map(fetch_financial_data, tickers_to_update)
|
||||||
for symbol, data in results:
|
for symbol, data in results:
|
||||||
# A successful fetch is one where data is returned and has a closing price
|
# A successful fetch is one where data is returned and has a closing price
|
||||||
if data and data.get('closing_price') is not None:
|
if data and data.get("closing_price") is not None:
|
||||||
log.info(f" -> SUCCESS: Fetched data for {symbol}")
|
log.info(f" -> SUCCESS: Fetched data for {symbol}")
|
||||||
financial_data_batch[symbol] = data
|
financial_data_batch[symbol] = data
|
||||||
successful_updates += 1
|
successful_updates += 1
|
||||||
else:
|
else:
|
||||||
log.warning(f" -> FAILED: Could not fetch valid financial data for {symbol}")
|
log.warning(
|
||||||
|
f" -> FAILED: Could not fetch valid financial data for {symbol}"
|
||||||
|
)
|
||||||
failed_updates += 1
|
failed_updates += 1
|
||||||
|
|
||||||
if not financial_data_batch:
|
if not financial_data_batch:
|
||||||
@@ -332,14 +413,15 @@ def main():
|
|||||||
else:
|
else:
|
||||||
conn = database.get_db_connection()
|
conn = database.get_db_connection()
|
||||||
all_tickers_from_db = database.get_all_tickers()
|
all_tickers_from_db = database.get_all_tickers()
|
||||||
ticker_map = {t['symbol']: t['id'] for t in all_tickers_from_db}
|
ticker_map = {t["symbol"]: t["id"] for t in all_tickers_from_db}
|
||||||
|
|
||||||
for symbol, financials in financial_data_batch.items():
|
for symbol, financials in financial_data_batch.items():
|
||||||
if symbol in ticker_map:
|
if symbol in ticker_map:
|
||||||
database.update_ticker_financials(
|
database.update_ticker_financials(
|
||||||
conn, ticker_map[symbol],
|
conn,
|
||||||
financials.get('market_cap'),
|
ticker_map[symbol],
|
||||||
financials.get('closing_price')
|
financials.get("market_cap"),
|
||||||
|
financials.get("closing_price"),
|
||||||
)
|
)
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -362,7 +444,8 @@ def main():
|
|||||||
return
|
return
|
||||||
|
|
||||||
reddit = get_reddit_instance()
|
reddit = get_reddit_instance()
|
||||||
if not reddit: return
|
if not reddit:
|
||||||
|
return
|
||||||
|
|
||||||
scan_subreddits(
|
scan_subreddits(
|
||||||
reddit,
|
reddit,
|
||||||
@@ -370,8 +453,9 @@ def main():
|
|||||||
post_limit=args.posts,
|
post_limit=args.posts,
|
||||||
comment_limit=args.comments,
|
comment_limit=args.comments,
|
||||||
days_to_scan=args.days,
|
days_to_scan=args.days,
|
||||||
fetch_financials=(not args.no_financials)
|
fetch_financials=(not args.no_financials),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
Reference in New Issue
Block a user