|
import asyncio |
|
from curl_cffi.requests import AsyncSession,RequestsError |
|
from bs4 import BeautifulSoup |
|
from App.routers.bonds.schemas import BondCreate |
|
|
|
from typing import List, Dict, Any, Optional, AsyncGenerator |
|
|
|
import re |
|
from datetime import datetime as dt |
|
from typing import Tuple, Optional, List, Dict, Any |
|
|
|
def parse_bond_title_details(title_str: str) -> Tuple[Optional[float], Optional[str], str, Optional[int]]: |
|
coupon_rate_val = None |
|
maturity_years_val = None |
|
instrument_type_val = "TREASURY BOND" |
|
issue_number_val = None |
|
|
|
if not title_str: |
|
return coupon_rate_val, maturity_years_val, instrument_type_val, issue_number_val |
|
|
|
|
|
coupon_match = re.search(r'(\d+\.?\d*)%', title_str) |
|
if coupon_match: |
|
coupon_rate_val = float(coupon_match.group(1)) |
|
remaining_after_coupon = title_str.split(coupon_match.group(0), 1)[-1].strip() |
|
else: |
|
remaining_after_coupon = title_str |
|
|
|
|
|
maturity_match = re.search(r'(\d+)-YEAR', remaining_after_coupon, re.IGNORECASE) |
|
if maturity_match: |
|
maturity_years_val = f"{maturity_match.group(1)}-YEAR" |
|
remaining_after_year = remaining_after_coupon.split(maturity_match.group(0), 1)[-1].strip() |
|
else: |
|
remaining_after_year = remaining_after_coupon |
|
|
|
|
|
|
|
cleaner_remaining = re.split(r'\s+NUMBER\s+\d+', remaining_after_year, flags=re.IGNORECASE)[0] |
|
cleaner_remaining = re.split(r'\s+ISSUE\s+\d+', cleaner_remaining, flags=re.IGNORECASE)[0].strip() |
|
|
|
if cleaner_remaining: |
|
instrument_type_val = cleaner_remaining |
|
if maturity_years_val and maturity_years_val not in instrument_type_val : |
|
instrument_type_val = f"{maturity_years_val} {instrument_type_val}" |
|
elif maturity_years_val: |
|
instrument_type_val = f"{maturity_years_val} TREASURY BOND" |
|
|
|
|
|
issue_match = re.search(r'ISSUE\s+(\d+)', title_str, re.IGNORECASE) |
|
if issue_match: |
|
issue_number_val = int(issue_match.group(1)) |
|
|
|
if not instrument_type_val.strip() and title_str.strip(): |
|
instrument_type_val = "TREASURY BOND" |
|
if maturity_years_val: |
|
instrument_type_val = f"{maturity_years_val} {instrument_type_val}" |
|
|
|
|
|
return coupon_rate_val, maturity_years_val, instrument_type_val.strip(), issue_number_val |
|
|
|
|
|
def parse_date_flexible(date_str: str, default=None) -> Optional[dt]: |
|
if not date_str: |
|
return default |
|
|
|
date_str = date_str.replace("-", " ").title() if len(date_str.split('-')) == 3 else date_str |
|
|
|
formats_to_try = ["%d %b %Y", "%B %d, %Y", "%d/%m/%Y"] |
|
for fmt in formats_to_try: |
|
try: |
|
return dt.strptime(date_str, fmt).date() |
|
except ValueError: |
|
continue |
|
print(f"Warning: Could not parse date string: {date_str}") |
|
return default |
|
|
|
def get_summary_item_value(summary_list: List[Dict[str, Any]], item_desc_key: str, default=None) -> Optional[str]: |
|
for item in summary_list: |
|
if item.get("itemDesc", "").strip().upper() == item_desc_key.upper(): |
|
value_str = item.get("itemValue") |
|
if value_str: |
|
return value_str.strip() |
|
return default |
|
|
|
|
|
class BondDataScraper: |
|
BASE_URL = "https://www.bot.go.tz" |
|
TBONDS_URL = f"{BASE_URL}/TBonds" |
|
AUCTION_SUMMARY_URL = f"{BASE_URL}/TBonds/AuctionSummaries" |
|
IMPERSONATE_PROFILE = "chrome110" |
|
|
|
def __init__(self): |
|
self.headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
|
'Accept-Language': 'en-US,en;q=0.9', |
|
} |
|
self.ajax_headers = { |
|
**self.headers, |
|
'Accept': 'application/json, text/javascript, */*; q=0.01', |
|
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', |
|
'X-Requested-With': 'XMLHttpRequest', |
|
'Origin': self.BASE_URL, |
|
'Referer': self.TBONDS_URL, |
|
} |
|
|
|
async def _fetch_content(self, session: AsyncSession, url: str, method: str = "GET", data: Optional[Dict] = None, is_json: bool = False): |
|
try: |
|
if method.upper() == "POST": |
|
response = await session.post(url, headers=self.ajax_headers if data else self.headers, data=data, impersonate=self.IMPERSONATE_PROFILE, timeout=20) |
|
else: |
|
response = await session.get(url, headers=self.headers, impersonate=self.IMPERSONATE_PROFILE, timeout=60*5) |
|
|
|
|
|
response.raise_for_status() |
|
return response.json() if is_json else response.text |
|
except RequestsError as e: |
|
print(f"HTTP error fetching {url}: {e}") |
|
except Exception as e: |
|
print(f"Unexpected error fetching {url}: {e}") |
|
return None |
|
|
|
async def _parse_main_tbonds_page(self, html_content: str) -> List[Dict[str, Any]]: |
|
if not html_content: |
|
return [] |
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
table = soup.find('table', class_='tbond-table') |
|
if not table: |
|
print("Main T-Bonds table not found.") |
|
return [] |
|
|
|
parsed_rows = [] |
|
tbody = table.find('tbody') |
|
if not tbody: |
|
print("Tbody not found in main T-Bonds table.") |
|
return [] |
|
|
|
for row in tbody.find_all('tr'): |
|
cols = row.find_all(['th', 'td']) |
|
if len(cols) < 5: |
|
continue |
|
|
|
try: |
|
|
|
auction_number_text = cols[1].get_text(strip=True) |
|
auction_title = cols[2].get_text(strip=True) |
|
auction_date_str = cols[3].get_text(strip=True) |
|
|
|
view_button = cols[4].find('button', id='showSummaryDetails') |
|
if not view_button or not view_button.get('value'): |
|
print(f"Skipping row, view button or value not found for auction: {auction_number_text}") |
|
continue |
|
|
|
button_value_parts = view_button['value'].split('_') |
|
au_no_str = button_value_parts[0] |
|
au_days_part = button_value_parts[1] |
|
|
|
parsed_rows.append({ |
|
'table_auction_number_text': auction_number_text, |
|
'table_auction_title': auction_title, |
|
'table_auction_date_str': auction_date_str, |
|
'au_no': int(au_no_str), |
|
'au_days_part': au_days_part, |
|
}) |
|
except Exception as e: |
|
print(f"Error parsing main table row: {e}. Row: {[c.get_text(strip=True) for c in cols]}") |
|
return parsed_rows |
|
|
|
async def _fetch_bond_details(self, session: AsyncSession, au_no: int, au_days_part: str) -> Optional[Dict[str, Any]]: |
|
payload = {'au_no': str(au_no), 'au_days': au_days_part} |
|
return await self._fetch_content(session, self.AUCTION_SUMMARY_URL, method="POST", data=payload, is_json=True) |
|
|
|
def _parse_bond_details_json(self, json_data: Dict[str, Any], initial_data: Dict[str, Any]) -> Optional[BondCreate]: |
|
if not json_data or json_data.get("message") != "SUCCESS": |
|
print(f"Bond details JSON invalid or not successful for au_no: {initial_data.get('au_no')}") |
|
return None |
|
|
|
summary_list = json_data.get("tbondSummary", []) |
|
|
|
coupon_rate, maturity_years, instrument_type, issue_number = parse_bond_title_details(json_data.get("bondTitle", "")) |
|
|
|
auction_date_obj = parse_date_flexible(initial_data['table_auction_date_str']) |
|
if not auction_date_obj: |
|
print(f"Critical: Could not parse auction_date for au_no: {initial_data.get('au_no')}") |
|
return None |
|
|
|
|
|
maturity_date_str = get_summary_item_value(summary_list, "REDEMPTION DATE") |
|
maturity_date_obj = parse_date_flexible(maturity_date_str) |
|
|
|
dtm_val = None |
|
if maturity_date_obj and auction_date_obj: |
|
dtm_val = (maturity_date_obj - auction_date_obj).days |
|
|
|
face_value_str = get_summary_item_value(summary_list, "AMOUNT OFFERED TZS(000,000)") |
|
face_value_val = None |
|
if face_value_str: |
|
try: |
|
face_value_val = int(float(face_value_str.replace(",", "")) * 1_000_000) |
|
except ValueError: |
|
print(f"Could not parse face_value: {face_value_str} for au_no: {initial_data.get('au_no')}") |
|
|
|
price_per_100_str = get_summary_item_value(summary_list, "MINIMUM SUCCESSFUL PRICE / 100") |
|
price_per_100_val = None |
|
if price_per_100_str: |
|
try: |
|
price_per_100_val = float(price_per_100_str) |
|
except ValueError: |
|
print(f"Could not parse price_per_100: {price_per_100_str} for au_no: {initial_data.get('au_no')}") |
|
|
|
|
|
holding_number_str = json_data.get("auctionNumber") |
|
holding_number_val = int(holding_number_str) if holding_number_str and holding_number_str.isdigit() else None |
|
|
|
return BondCreate( |
|
instrument_type=instrument_type if instrument_type else "TREASURY BOND", |
|
auction_number=initial_data['au_no'], |
|
auction_date=auction_date_obj, |
|
maturity_years=maturity_years, |
|
maturity_date=maturity_date_obj, |
|
effective_date=auction_date_obj, |
|
dtm=dtm_val, |
|
bond_auction_number=issue_number, |
|
holding_number=holding_number_val, |
|
face_value=face_value_val, |
|
price_per_100=price_per_100_val, |
|
coupon_rate=coupon_rate, |
|
isin=json_data.get("ISIN") |
|
) |
|
|
|
async def scrape_all_bond_data(self) -> AsyncGenerator[BondCreate, None]: |
|
async with AsyncSession() as session: |
|
|
|
await session.get(self.TBONDS_URL, headers=self.headers, impersonate=self.IMPERSONATE_PROFILE,timeout=60*5) |
|
|
|
main_page_html = await self._fetch_content(session, self.TBONDS_URL, method="GET") |
|
print(main_page_html) |
|
if not main_page_html: |
|
print("Failed to fetch main T-Bonds page.") |
|
return |
|
|
|
initial_bond_rows = await self._parse_main_tbonds_page(main_page_html) |
|
|
|
print(f"Found {len(initial_bond_rows)} initial bond rows from main table.") |
|
|
|
for row_data in initial_bond_rows: |
|
print(f"Fetching details for au_no: {row_data['au_no']}...") |
|
await asyncio.sleep(0.5) |
|
|
|
details_json = await self._fetch_bond_details(session, row_data['au_no'], row_data['au_days_part']) |
|
if details_json: |
|
bond_create_obj = self._parse_bond_details_json(details_json, row_data) |
|
if bond_create_obj: |
|
yield bond_create_obj |
|
else: |
|
print(f"Failed to fetch or parse details for au_no: {row_data['au_no']}") |