Improve fetching of financial data.
This commit is contained in:
@@ -6,13 +6,17 @@ import glob
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
import praw
|
import praw
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
# --- CONFIGURATION ---
|
# --- CONFIGURATION ---
|
||||||
IMAGE_DIR = "images"
|
IMAGE_DIR = "images"
|
||||||
|
|
||||||
def get_reddit_instance():
|
def get_reddit_instance():
|
||||||
"""Initializes and returns a PRAW Reddit instance from .env credentials."""
|
"""Initializes and returns a PRAW Reddit instance using OAuth2 refresh token."""
|
||||||
load_dotenv()
|
|
||||||
|
env_path = Path(__file__).parent / '.env'
|
||||||
|
load_dotenv(dotenv_path=env_path)
|
||||||
|
|
||||||
client_id = os.getenv("REDDIT_CLIENT_ID")
|
client_id = os.getenv("REDDIT_CLIENT_ID")
|
||||||
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
|
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
|
||||||
user_agent = os.getenv("REDDIT_USER_AGENT")
|
user_agent = os.getenv("REDDIT_USER_AGENT")
|
||||||
|
@@ -7,6 +7,7 @@ from .logger_setup import logger as log
|
|||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
DB_FILE = "reddit_stocks.db"
|
DB_FILE = "reddit_stocks.db"
|
||||||
|
MARKET_CAP_REFRESH_INTERVAL = 86400
|
||||||
|
|
||||||
def get_db_connection():
|
def get_db_connection():
|
||||||
"""Establishes a connection to the SQLite database."""
|
"""Establishes a connection to the SQLite database."""
|
||||||
|
@@ -6,20 +6,18 @@ import os
|
|||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
import praw
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import praw
|
||||||
|
|
||||||
from . import database
|
from . import database
|
||||||
from .ticker_extractor import extract_tickers
|
from .ticker_extractor import extract_tickers
|
||||||
from .sentiment_analyzer import get_sentiment_score
|
from .sentiment_analyzer import get_sentiment_score
|
||||||
from .logger_setup import setup_logging, logger as log
|
from .logger_setup import setup_logging, logger as log
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
MARKET_CAP_REFRESH_INTERVAL = 86400
|
|
||||||
POST_AGE_LIMIT = 86400
|
|
||||||
|
|
||||||
|
|
||||||
def load_subreddits(filepath):
|
def load_subreddits(filepath):
|
||||||
|
"""Loads a list of subreddits from a JSON file."""
|
||||||
try:
|
try:
|
||||||
with open(filepath, 'r') as f:
|
with open(filepath, 'r') as f:
|
||||||
return json.load(f).get("subreddits", [])
|
return json.load(f).get("subreddits", [])
|
||||||
@@ -28,30 +26,42 @@ def load_subreddits(filepath):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def get_reddit_instance():
|
def get_reddit_instance():
|
||||||
|
"""Initializes and returns a PRAW Reddit instance."""
|
||||||
|
env_path = Path(__file__).parent.parent / '.env'
|
||||||
|
load_dotenv(dotenv_path=env_path)
|
||||||
|
|
||||||
client_id = os.getenv("REDDIT_CLIENT_ID")
|
client_id = os.getenv("REDDIT_CLIENT_ID")
|
||||||
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
|
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
|
||||||
user_agent = os.getenv("REDDIT_USER_AGENT")
|
user_agent = os.getenv("REDDIT_USER_AGENT")
|
||||||
if not all([client_id, client_secret, user_agent]):
|
if not all([client_id, client_secret, user_agent]):
|
||||||
print("Error: Reddit API credentials not found in .env file.")
|
log.error("Error: Reddit API credentials not found in .env file.")
|
||||||
return None
|
return None
|
||||||
return praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent)
|
return praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent)
|
||||||
|
|
||||||
|
def get_financial_data_via_fetcher(ticker_symbol):
|
||||||
|
"""
|
||||||
|
Calls the external fetcher.py script in an isolated process to get financial data.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
command = [sys.executable, "-m", "rstat_tool.fetcher", ticker_symbol]
|
||||||
|
result = subprocess.run(
|
||||||
|
command, capture_output=True, text=True, check=True, timeout=30
|
||||||
|
)
|
||||||
|
return json.loads(result.stdout)
|
||||||
|
except Exception as e:
|
||||||
|
log.warning(f"Fetcher script failed for {ticker_symbol}: {e}")
|
||||||
|
return {"market_cap": None, "closing_price": None}
|
||||||
|
|
||||||
def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1):
|
def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100, days_to_scan=1):
|
||||||
"""
|
""" Scans subreddits and uses the fetcher to get financial data. """
|
||||||
Scans subreddits with a hybrid mention counting logic.
|
|
||||||
- If a ticker is in the title, it gets credit for all comments.
|
|
||||||
- If not, tickers only get credit for direct mentions in comments.
|
|
||||||
"""
|
|
||||||
conn = database.get_db_connection()
|
conn = database.get_db_connection()
|
||||||
post_age_limit = days_to_scan * 86400
|
post_age_limit = days_to_scan * 86400
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
|
|
||||||
log.info(f"\nScanning {len(subreddits_list)} subreddit(s) for NEW posts in the last {days_to_scan} day(s)...")
|
log.info(f"Scanning {len(subreddits_list)} subreddit(s) for NEW posts in the last {days_to_scan} day(s)...")
|
||||||
for subreddit_name in subreddits_list:
|
for subreddit_name in subreddits_list:
|
||||||
try:
|
try:
|
||||||
# Always use the lowercase version of the name for consistency.
|
|
||||||
normalized_sub_name = subreddit_name.lower()
|
normalized_sub_name = subreddit_name.lower()
|
||||||
|
|
||||||
subreddit_id = database.get_or_create_entity(conn, 'subreddits', 'name', normalized_sub_name)
|
subreddit_id = database.get_or_create_entity(conn, 'subreddits', 'name', normalized_sub_name)
|
||||||
subreddit = reddit.subreddit(normalized_sub_name)
|
subreddit = reddit.subreddit(normalized_sub_name)
|
||||||
log.info(f"Scanning r/{normalized_sub_name}...")
|
log.info(f"Scanning r/{normalized_sub_name}...")
|
||||||
@@ -62,54 +72,45 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100,
|
|||||||
break
|
break
|
||||||
|
|
||||||
tickers_in_title = set(extract_tickers(submission.title))
|
tickers_in_title = set(extract_tickers(submission.title))
|
||||||
all_tickers_found_in_post = set(tickers_in_title) # Start a set to track all tickers for financials
|
all_tickers_found_in_post = set(tickers_in_title)
|
||||||
|
|
||||||
submission.comments.replace_more(limit=0)
|
submission.comments.replace_more(limit=0)
|
||||||
all_comments = submission.comments.list()[:comment_limit]
|
all_comments = submission.comments.list()[:comment_limit]
|
||||||
|
|
||||||
# --- CASE A: Tickers were found in the title ---
|
|
||||||
if tickers_in_title:
|
if tickers_in_title:
|
||||||
log.info(f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments.")
|
log.info(f" -> Title Mention(s): {', '.join(tickers_in_title)}. Attributing all comments.")
|
||||||
post_sentiment = get_sentiment_score(submission.title)
|
post_sentiment = get_sentiment_score(submission.title)
|
||||||
|
|
||||||
# Add one 'post' mention for each title ticker
|
|
||||||
for ticker_symbol in tickers_in_title:
|
for ticker_symbol in tickers_in_title:
|
||||||
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
||||||
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'post', int(submission.created_utc), post_sentiment)
|
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'post', int(submission.created_utc), post_sentiment)
|
||||||
|
|
||||||
# Add one 'comment' mention for EACH comment FOR EACH title ticker
|
|
||||||
for comment in all_comments:
|
for comment in all_comments:
|
||||||
comment_sentiment = get_sentiment_score(comment.body)
|
comment_sentiment = get_sentiment_score(comment.body)
|
||||||
for ticker_symbol in tickers_in_title:
|
for ticker_symbol in tickers_in_title:
|
||||||
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
||||||
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
|
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
|
||||||
|
|
||||||
# --- CASE B: No tickers in the title, scan comments individually ---
|
|
||||||
else:
|
else:
|
||||||
for comment in all_comments:
|
for comment in all_comments:
|
||||||
tickers_in_comment = set(extract_tickers(comment.body))
|
tickers_in_comment = set(extract_tickers(comment.body))
|
||||||
if tickers_in_comment:
|
if tickers_in_comment:
|
||||||
all_tickers_found_in_post.update(tickers_in_comment) # Add to our set for financials
|
all_tickers_found_in_post.update(tickers_in_comment)
|
||||||
comment_sentiment = get_sentiment_score(comment.body)
|
comment_sentiment = get_sentiment_score(comment.body)
|
||||||
for ticker_symbol in tickers_in_comment:
|
for ticker_symbol in tickers_in_comment:
|
||||||
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
||||||
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
|
database.add_mention(conn, ticker_id, subreddit_id, submission.id, 'comment', int(comment.created_utc), comment_sentiment)
|
||||||
|
|
||||||
# --- EFFICIENT FINANCIALS UPDATE ---
|
# --- THIS IS THE CRITICAL LOGIC THAT WAS MISSING ---
|
||||||
# Now, update market cap once for every unique ticker found in the whole post
|
|
||||||
for ticker_symbol in all_tickers_found_in_post:
|
for ticker_symbol in all_tickers_found_in_post:
|
||||||
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
ticker_id = database.get_or_create_entity(conn, 'tickers', 'symbol', ticker_symbol)
|
||||||
ticker_info = database.get_ticker_info(conn, ticker_id)
|
ticker_info = database.get_ticker_info(conn, ticker_id)
|
||||||
if not ticker_info['last_updated'] or (current_time - ticker_info['last_updated'] > MARKET_CAP_REFRESH_INTERVAL):
|
if not ticker_info['last_updated'] or (current_time - ticker_info['last_updated'] > database.MARKET_CAP_REFRESH_INTERVAL):
|
||||||
log.info(f" -> Fetching financial data for {ticker_symbol}...")
|
log.info(f" -> Fetching financial data for {ticker_symbol}...")
|
||||||
financials = get_financial_data(ticker_symbol)
|
financials = get_financial_data_via_fetcher(ticker_symbol)
|
||||||
database.update_ticker_financials(
|
database.update_ticker_financials(
|
||||||
conn, ticker_id,
|
conn, ticker_id,
|
||||||
financials['market_cap'] or ticker_info['market_cap'],
|
financials.get('market_cap'),
|
||||||
financials['closing_price'] or ticker_info['closing_price']
|
financials.get('closing_price')
|
||||||
)
|
)
|
||||||
|
|
||||||
# --- DEEP DIVE SAVE (Still valuable) ---
|
|
||||||
all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments]
|
all_comment_sentiments = [get_sentiment_score(c.body) for c in all_comments]
|
||||||
avg_sentiment = sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0
|
avg_sentiment = sum(all_comment_sentiments) / len(all_comment_sentiments) if all_comment_sentiments else 0
|
||||||
post_analysis_data = {
|
post_analysis_data = {
|
||||||
@@ -121,92 +122,46 @@ def scan_subreddits(reddit, subreddits_list, post_limit=100, comment_limit=100,
|
|||||||
database.add_or_update_post_analysis(conn, post_analysis_data)
|
database.add_or_update_post_analysis(conn, post_analysis_data)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Could not scan r/{subreddit_name}. Error: {e}")
|
log.error(f"Could not scan r/{normalized_sub_name}. Error: {e}", exc_info=True)
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
log.critical("\n--- Scan Complete ---")
|
log.critical("\n--- Scan Complete ---")
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main function to run the Reddit stock analysis tool."""
|
"""Main function to run the Reddit stock analysis tool."""
|
||||||
parser = argparse.ArgumentParser(description="Analyze stock ticker mentions on Reddit.", formatter_class=argparse.RawTextHelpFormatter)
|
parser = argparse.ArgumentParser(description="Analyze stock ticker mentions on Reddit.", formatter_class=argparse.RawTextHelpFormatter)
|
||||||
|
|
||||||
parser.add_argument("-f", "--config", default="subreddits.json", help="Path to the JSON file containing subreddits.\n(Default: subreddits.json)")
|
parser.add_argument("--update-financials-only", action="store_true", help="Skip Reddit scan and only update financial data for all existing tickers.")
|
||||||
parser.add_argument("-s", "--subreddit", help="Scan a single subreddit, ignoring the config file.")
|
parser.add_argument("--config", default="subreddits.json", help="Path to the JSON file for scanning.")
|
||||||
parser.add_argument("-d", "--days", type=int, default=1, help="Number of past days to scan for new posts.\n(Default: 1 for last 24 hours)")
|
parser.add_argument("--subreddit", help="Scan a single subreddit, ignoring the config file.")
|
||||||
parser.add_argument("-p", "--posts", type=int, default=200, help="Max posts to check per subreddit.\n(Default: 200)")
|
parser.add_argument("--days", type=int, default=1, help="Number of past days to scan for new posts.")
|
||||||
parser.add_argument("-c", "--comments", type=int, default=100, help="Number of comments to scan per post.\n(Default: 100)")
|
parser.add_argument("-p", "--posts", type=int, default=200, help="Max posts to check per subreddit.")
|
||||||
parser.add_argument("-u", "--update-financials-only", action="store_true", help="Skip Reddit scan and only update financial data for all existing tickers.")
|
parser.add_argument("-c", "--comments", type=int, default=100, help="Number of comments to scan per post.")
|
||||||
parser.add_argument("--stdout", action="store_true", help="Print all log messages to the console.")
|
parser.add_argument("--stdout", action="store_true", help="Print all log messages to the console.")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
setup_logging(console_verbose=args.stdout)
|
setup_logging(console_verbose=args.stdout)
|
||||||
|
|
||||||
if args.subreddit:
|
|
||||||
# If --subreddit is used, create a list with just that one.
|
|
||||||
subreddits_to_scan = [args.subreddit]
|
|
||||||
log.critical(f"Targeted Scan Mode: Focusing on r/{args.subreddit}")
|
|
||||||
else:
|
|
||||||
# Otherwise, load from the config file.
|
|
||||||
log.critical(f"Config Scan Mode: Loading subreddits from {args.config}")
|
|
||||||
# Use the correct argument name: args.config
|
|
||||||
subreddits_to_scan = load_subreddits(args.config)
|
|
||||||
|
|
||||||
if not subreddits_to_scan:
|
|
||||||
log.error("Error: No subreddits to scan. Please check your config file or --subreddit argument.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# --- Initialize and Run ---
|
|
||||||
database.initialize_db()
|
database.initialize_db()
|
||||||
|
|
||||||
if args.update_financials_only:
|
if args.update_financials_only:
|
||||||
log.critical("--- Starting Financial Data Update Only Mode (using isolated fetcher) ---")
|
log.critical("--- Starting Financial Data Update Only Mode (using isolated fetcher) ---")
|
||||||
all_tickers = database.get_all_tickers() # No longer need to manage 'conn' here
|
all_tickers = database.get_all_tickers()
|
||||||
log.info(f"Found {len(all_tickers)} tickers in the database to update.")
|
log.info(f"Found {len(all_tickers)} tickers in the database to update.")
|
||||||
|
|
||||||
conn = database.get_db_connection()
|
conn = database.get_db_connection()
|
||||||
for ticker in all_tickers:
|
for ticker in all_tickers:
|
||||||
symbol = ticker['symbol']
|
symbol = ticker['symbol']
|
||||||
log.info(f" -> Fetching financials for {symbol}...")
|
log.info(f" -> Updating financials for {symbol}...")
|
||||||
|
financials = get_financial_data_via_fetcher(symbol)
|
||||||
try:
|
database.update_ticker_financials(
|
||||||
# --- THIS IS THE NEW LOGIC ---
|
conn, ticker['id'],
|
||||||
# Construct the command to run our fetcher script in a new process
|
financials.get('market_cap'),
|
||||||
command = [sys.executable, "-m", "rstat_tool.fetcher", symbol]
|
financials.get('closing_price')
|
||||||
|
)
|
||||||
# Run the command, capture the output, and set a timeout
|
|
||||||
result = subprocess.run(
|
|
||||||
command,
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=True, # This will raise an exception if the script returns a non-zero exit code
|
|
||||||
timeout=30 # Timeout after 30 seconds
|
|
||||||
)
|
|
||||||
|
|
||||||
# The output from the script is a JSON string
|
|
||||||
financials = json.loads(result.stdout)
|
|
||||||
|
|
||||||
database.update_ticker_financials(
|
|
||||||
conn, ticker['id'],
|
|
||||||
financials.get('market_cap'),
|
|
||||||
financials.get('closing_price')
|
|
||||||
)
|
|
||||||
# --- END OF NEW LOGIC ---
|
|
||||||
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
log.error(f"Fetcher script failed for {symbol}: {e.stderr}")
|
|
||||||
except subprocess.TimeoutExpired:
|
|
||||||
log.error(f"Fetcher script timed out for {symbol}.")
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
log.error(f"Could not parse JSON from fetcher script for {symbol}.")
|
|
||||||
except Exception as e:
|
|
||||||
log.error(f"An unexpected error occurred for {symbol}: {e}")
|
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
log.critical("--- Financial Data Update Complete ---")
|
log.critical("--- Financial Data Update Complete ---")
|
||||||
else:
|
else:
|
||||||
# This is the normal Reddit scanning logic
|
|
||||||
log.critical("--- Starting Reddit Scan Mode ---")
|
log.critical("--- Starting Reddit Scan Mode ---")
|
||||||
if args.subreddit:
|
if args.subreddit:
|
||||||
subreddits_to_scan = [args.subreddit]
|
subreddits_to_scan = [args.subreddit]
|
||||||
|
Reference in New Issue
Block a user