from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForTokenClassification from bs4 import BeautifulSoup import requests # ----------- Lazy Initialization of Pipelines ----------- _sentiment_pipeline = None _ner_pipeline = None def get_sentiment_pipeline(): global _sentiment_pipeline if _sentiment_pipeline is None: model_id = "LinkLinkWu/Stock_Analysis_Test_Ahamed" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForSequenceClassification.from_pretrained(model_id) _sentiment_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) return _sentiment_pipeline def get_ner_pipeline(): global _ner_pipeline if _ner_pipeline is None: tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER") model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER") _ner_pipeline = pipeline("ner", model=model, tokenizer=tokenizer, grouped_entities=True) return _ner_pipeline # ----------- Core Functions ----------- def fetch_news(ticker): try: url = f"https://finviz.com/quote.ashx?t={ticker}" headers = { 'User-Agent': 'Mozilla/5.0', 'Accept': 'text/html', 'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://finviz.com/', 'Connection': 'keep-alive', } response = requests.get(url, headers=headers) if response.status_code != 200: return [] soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.text if soup.title else "" if ticker not in title: return [] news_table = soup.find(id='news-table') if news_table is None: return [] news = [] for row in news_table.findAll('tr')[:30]: a_tag = row.find('a') if a_tag: title = a_tag.get_text() link = a_tag['href'] news.append({'title': title, 'link': link}) return news except Exception: return [] def analyze_sentiment(text, sentiment_pipeline): try: result = sentiment_pipeline(text)[0] return "Positive" if result['label'] == 'POSITIVE' else "Negative" except Exception: return "Unknown" def extract_org_entities(text, ner_pipeline): try: entities = ner_pipeline(text) org_entities = [] for ent in entities: if ent["entity_group"] == "ORG": clean_word = ent["word"].replace("##", "").strip() if clean_word.upper() not in org_entities: org_entities.append(clean_word.upper()) if len(org_entities) >= 5: break return org_entities except Exception: return []