Stock-Talk / CurrentDB.py
aayush9's picture
Update CurrentDB.py
3e6ef05 verified
import os
from dotenv import load_dotenv
import requests
import csv
from datetime import datetime, timedelta
import time
# Load environment variables from .env file
load_dotenv()
# Define your Aylien credentials
AppID = os.getenv('APP_ID')
APIKey = os.getenv('API_KEY')
PolygonAPIKey = os.getenv('POLYGON_API_KEY')
# Function to get authentication header
def get_auth_header(appid, apikey):
return {
'X-Application-Id': appid,
'X-Application-Key': apikey
}
# Function to fetch stories for specific companies within a date range
def fetch_stories_for_date_range(ticker, headers, start_date, end_date):
all_stories = []
params = {
'entities.stock_tickers': ticker,
'published_at.start': start_date.strftime('%Y-%m-%dT%H:%M:%SZ'),
'published_at.end': end_date.strftime('%Y-%m-%dT%H:%M:%SZ'),
'language': 'en',
'per_page': 2, # Set per_page to maximum allowed value
'sort_by': 'published_at',
'sort_direction': 'desc'
}
while True:
time.sleep(1) # Adding a 1-second delay between API calls
response = requests.get('https://api.aylien.com/news/stories', headers=headers, params=params)
if response.status_code == 200:
data = response.json()
stories = data.get('stories', [])
if not stories:
break
all_stories.extend(stories)
if 'next' in data.get('links', {}):
params['cursor'] = data['links']['next']
else:
break
else:
print(f"Failed to fetch stories for ticker {ticker}: {response.status_code} - {response.text}")
break
return all_stories
# Function to fetch stock data for a given symbol within a date range using Polygon API
def get_stock_data(api_key, symbol, start_date, end_date):
time.sleep(1) # Adding a 1-second delay between API calls
base_url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}?apiKey={api_key}"
response = requests.get(base_url)
if response.status_code == 200:
data = response.json()
results = data['results']
stock_data = {datetime.fromtimestamp(result['t'] / 1000).strftime('%Y-%m-%d'): {'open': result['o'], 'close': result['c']} for result in results}
return stock_data
else:
print(f"Failed to fetch stock data for {symbol} from Polygon API: {response.status_code} - {response.text}")
return None
# Save data to CSV file
def save_data_to_csv(ticker, all_stories, stock_data):
file_name = f"CurrentDatabase/{ticker}_db.csv"
with open(file_name, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow([
'Publication Date','Summary', 'Sentiment Polarity', 'Sentiment Confidence', 'Keywords', 'stock_date', 'stock_price', 'percentage_change'
])
for story in all_stories[:-1]:
article_id = story.get('id', 'N/A')
summary = ' '.join(story.get('summary', {}).get('sentences', []))
keywords = ", ".join(story.get('keywords', []))
sentiment = story.get('sentiment', {}).get('title', {})
sentiment_polarity = sentiment.get('polarity', 'N/A')
sentiment_confidence = story.get('sentiment', {}).get('body', {}).get('score', 'N/A')
print(sentiment_polarity)
publication_date = datetime.strptime(story.get('published_at', 'N/A'), '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')
stock_date = (datetime.strptime(publication_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
if stock_data.get(stock_date) == None:
stock_price = 'N/A'
open_stock_price = 'N/A'
print(stock_price)
else:
stock_price = stock_data.get(stock_date).get('close', 'N/A')
open_stock_price = stock_data.get(stock_date).get('open', 'N/A')
print(stock_price)
if stock_price == 'N/A':
# If current stock price is not available, try to get the previous day's price
previous_date = (datetime.strptime(stock_date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d')
if stock_data.get(previous_date) == None:
stock_price = 'N/A'
else:
previous_stock_price = stock_data.get(previous_date).get('close', 'N/A')
stock_price = previous_stock_price
open_stock_price = previous_stock_price
#print("Current stock price not available. Previous day's price:", previous_stock_price)
if stock_price == 'N/A':
# If current stock price is not available, try to get the previous day's price
previous_date = (datetime.strptime(stock_date, '%Y-%m-%d') - timedelta(days=2)).strftime('%Y-%m-%d')
if stock_data.get(previous_date) == None:
stock_price = 'N/A'
else:
previous_stock_price = stock_data.get(previous_date).get('close', 'N/A')
stock_price = previous_stock_price
open_stock_price = previous_stock_price
#print("Current stock price not available. Previous day's price:", previous_stock_price)
if stock_price == 'N/A':
# If current stock price is not available, try to get the previous day's price
previous_date = (datetime.strptime(stock_date, '%Y-%m-%d') - timedelta(days=3)).strftime('%Y-%m-%d')
if stock_data.get(previous_date) == None:
stock_price = 'N/A'
else:
previous_stock_price = stock_data.get(previous_date).get('close', 'N/A')
stock_price = previous_stock_price
open_stock_price = previous_stock_price
#print("Current stock price not available. Previous day's price:", previous_stock_price)
else:
None
else:
None
else:
None
percentage_change = ((float(stock_price) - float(open_stock_price)) / float(open_stock_price)) * 100
writer.writerow([
publication_date, summary, sentiment_polarity, sentiment_confidence, keywords, stock_date, stock_price, percentage_change
])
print(f"Data has been written to {file_name}")
def main():
tickers = ['AAPL','AMZN', 'TSLA', 'MSFT', 'AMD', 'GE', 'SMCI', 'META', 'BA','GOOGL','NVDA'] # Example tickers
headers = get_auth_header(AppID, APIKey)
# Define the date range (last 3 days)
end_date = datetime.now() # Current date
#end_date = datetime.strptime(end_date, '%Y-%m-%d')
start_date = datetime.now() - timedelta(days=7) # 7 days ago
# Fetch all stock data for each ticker in the date range
for ticker in tickers:
stock_data = get_stock_data(PolygonAPIKey, ticker, start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
print(stock_data)
print(end_date)
if stock_data:
all_stories = []
current_date = start_date
while current_date < end_date:
next_day = current_date + timedelta(days=1)
if next_day > end_date:
next_day = end_date
print(f"Fetching stories for {ticker}...")
all_stories.extend(fetch_stories_for_date_range(ticker, headers, current_date, next_day))
current_date = next_day
save_data_to_csv(ticker, all_stories, stock_data)
if __name__ == '__main__':
main()