Integrate with Reddit.

This commit is contained in:
2025-07-21 12:14:45 +02:00
parent e86a2bb69a
commit b617016b61
4 changed files with 120 additions and 19 deletions

5
.gitignore vendored
View File

@@ -1 +1,6 @@
.venv/ .venv/
.env
__pycache__/
*.pyc
*.sqlite3
*.log

92
main.py
View File

@@ -2,7 +2,17 @@
import argparse import argparse
import json import json
import os
from collections import Counter
import praw
import yfinance as yf import yfinance as yf
from dotenv import load_dotenv
from ticker_extractor import extract_tickers
# Load environment variables from .env file
load_dotenv()
def load_subreddits(filepath): def load_subreddits(filepath):
"""Loads a list of subreddits from a JSON file.""" """Loads a list of subreddits from a JSON file."""
@@ -25,41 +35,85 @@ def get_market_cap(ticker_symbol):
if market_cap: if market_cap:
# Formatting for better readability # Formatting for better readability
return f"${market_cap:,}" return f"${market_cap:,}"
return "N/A" return "N/A"
except Exception as e: except Exception as e:
# yfinance can sometimes fail for various reasons (e.g., invalid ticker) # yfinance can sometimes fail for various reasons (e.g., invalid ticker)
return "N/A" return "N/A"
def get_reddit_instance():
"""Initializes and returns a PRAW Reddit instance."""
client_id = os.getenv("REDDIT_CLIENT_ID")
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
user_agent = os.getenv("REDDIT_USER_AGENT")
if not all([client_id, client_secret, user_agent]):
print("Error: Reddit API credentials not found in .env file.")
return None
return praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent
)
def scan_subreddits(reddit, subreddits_list, post_limit=25):
"""Scans subreddits for stock tickers and returns a count of each."""
all_tickers = Counter()
print(f"\nScanning {len(subreddits_list)} subreddits for top {post_limit} posts...")
for subreddit_name in subreddits_list:
try:
subreddit = reddit.subreddit(subreddit_name)
print(f"r/{subreddit_name}...")
# Fetch hot posts from the subreddit
for submission in subreddit.hot(limit=post_limit):
# Combine title and selftext for analysis
full_text = submission.title + " " + submission.selftext
# Extract tickers from the combined text
tickers_in_post = extract_tickers(full_text)
all_tickers.update(tickers_in_post)
# Future work: also scan comments
# submission.comments.replace_more(limit=0) # Expand all comment trees
# for comment in submission.comments.list():
# tickers_in_comment = extract_tickers(comment.body)
# all_tickers.update(tickers_in_comment)
except Exception as e:
print(f"Could not scan r/{subreddit_name}. Error: {e}")
return all_tickers
def main(): def main():
"""Main function to run the Reddit stock analysis tool.""" """Main function to run the Reddit stock analysis tool."""
parser = argparse.ArgumentParser(description="Analyze stock ticker mentions on Reddit.") parser = argparse.ArgumentParser(description="Analyze stock ticker mentions on Reddit.")
parser.add_argument( parser.add_argument("config_file", help="Path to the JSON file containing subreddits.")
"config_file",
help="Path to the JSON file containing the list of subreddits."
)
args = parser.parse_args() args = parser.parse_args()
# --- Part 1: Load Configuration --- # --- Part 1: Load Configuration & Initialize Reddit ---
print("Loading configuration...")
subreddits = load_subreddits(args.config_file) subreddits = load_subreddits(args.config_file)
if not subreddits: if not subreddits:
print("No subreddits found in the configuration file. Exiting.")
return return
print(f"Successfully loaded {len(subreddits)} subreddits: {', '.join(subreddits)}") reddit = get_reddit_instance()
print("-" * 30) if not reddit:
return
# --- Part 2: Scan Reddit for Tickers ---
ticker_counts = scan_subreddits(reddit, subreddits)
if not ticker_counts:
print("No tickers found.")
return
# --- Part 2: Test Market Data Fetching (Example) --- print("\n--- Scan Complete ---")
print("Testing market data functionality...") print("Top 15 mentioned tickers:")
example_ticker = "AAPL"
market_cap = get_market_cap(example_ticker)
print(f"Market Cap for {example_ticker}: {market_cap}")
print("-" * 30)
# In the next steps, we will add the Reddit scanning logic here.
print("Next up: Integrating the Reddit API to find tickers...")
# --- Part 3: Display Results ---
# We will enrich this data with market cap and sentiment in the next steps
for ticker, count in ticker_counts.most_common(15):
print(f"{ticker}: {count} mentions")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1 +1,3 @@
yfinance yfinance
praw
python-dotenv

40
ticker_extractor.py Normal file
View File

@@ -0,0 +1,40 @@
# ticker_extractor.py
import re
# A set of common English words and acronyms that look like stock tickers.
# This helps reduce false positives.
COMMON_WORDS_BLACKLIST = {
"A", "I", "DD", "CEO", "CFO", "CTO", "EPS", "IPO", "YOLO", "FOMO",
"TLDR", "EDIT", "THE", "AND", "FOR", "ARE", "BUT", "NOT", "YOU",
"ALL", "ANY", "CAN", "HAS", "NEW", "NOW", "OLD", "SEE", "TWO",
"WAY", "WHO", "WHY", "BIG", "BUY", "SELL", "HOLD", "BE", "GO",
"ON", "AT", "IN", "IS", "IT", "OF", "OR", "TO", "WE", "UP",
"OUT", "SO", "RH", "SEC", "IRS", "USA", "UK", "EU",
"AI", "ML", "AR", "VR", "NFT", "DAO", "WEB3", "ETH", "BTC",
"USD", "EUR", "GBP", "JPY", "CNY", "INR", "AUD", "CAD", "CHF",
"RUB", "ZAR", "BRL", "MXN", "HKD", "SGD", "NZD", "RSD",
"JPY", "KRW", "SEK", "NOK", "DKK", "PLN", "CZK", "HUF", "TRY",
"US", "IRA", "FDA", "SEC", "FBI", "CIA", "NSA", "NATO",
}
def extract_tickers(text):
"""
Extracts potential stock tickers from a given piece of text.
A ticker is identified as a 1-5 character uppercase word, or a word prefixed with $.
"""
# Regex to find potential tickers:
# 1. Words prefixed with $: $AAPL, $TSLA
# 2. All-caps words between 1 and 5 characters: GME, AMC
ticker_regex = r"\$[A-Z]{1,5}\b|\b[A-Z]{1,5}\b"
potential_tickers = re.findall(ticker_regex, text)
# Filter out common words and remove the '$' prefix
tickers = []
for ticker in potential_tickers:
cleaned_ticker = ticker.replace("$", "").upper()
if cleaned_ticker not in COMMON_WORDS_BLACKLIST:
tickers.append(cleaned_ticker)
return tickers