Mbonea's picture
initial commit
9d4bd7c
import asyncio
from curl_cffi.requests import AsyncSession,RequestsError
from bs4 import BeautifulSoup
from App.routers.bonds.schemas import BondCreate # Adjust import path
# from .bond_utils import parse_bond_title_details, parse_date_flexible, get_summary_item_value # If in separate file
from typing import List, Dict, Any, Optional, AsyncGenerator
import re
from datetime import datetime as dt
from typing import Tuple, Optional, List, Dict, Any
def parse_bond_title_details(title_str: str) -> Tuple[Optional[float], Optional[str], str, Optional[int]]:
coupon_rate_val = None
maturity_years_val = None
instrument_type_val = "TREASURY BOND"
issue_number_val = None
if not title_str:
return coupon_rate_val, maturity_years_val, instrument_type_val, issue_number_val
# 1. Coupon Rate
coupon_match = re.search(r'(\d+\.?\d*)%', title_str)
if coupon_match:
coupon_rate_val = float(coupon_match.group(1))
remaining_after_coupon = title_str.split(coupon_match.group(0), 1)[-1].strip()
else:
remaining_after_coupon = title_str
# 2. Maturity Years
maturity_match = re.search(r'(\d+)-YEAR', remaining_after_coupon, re.IGNORECASE)
if maturity_match:
maturity_years_val = f"{maturity_match.group(1)}-YEAR"
remaining_after_year = remaining_after_coupon.split(maturity_match.group(0), 1)[-1].strip()
else:
remaining_after_year = remaining_after_coupon
# 3. Instrument Type (base part)
# Remove "NUMBER ..." and "ISSUE ..." parts for cleaner type detection
cleaner_remaining = re.split(r'\s+NUMBER\s+\d+', remaining_after_year, flags=re.IGNORECASE)[0]
cleaner_remaining = re.split(r'\s+ISSUE\s+\d+', cleaner_remaining, flags=re.IGNORECASE)[0].strip()
if cleaner_remaining:
instrument_type_val = cleaner_remaining
if maturity_years_val and maturity_years_val not in instrument_type_val : # Prepend year if not already part of it
instrument_type_val = f"{maturity_years_val} {instrument_type_val}"
elif maturity_years_val:
instrument_type_val = f"{maturity_years_val} TREASURY BOND"
# 4. Issue Number (search in original title for issue)
issue_match = re.search(r'ISSUE\s+(\d+)', title_str, re.IGNORECASE)
if issue_match:
issue_number_val = int(issue_match.group(1))
if not instrument_type_val.strip() and title_str.strip(): # Fallback
instrument_type_val = "TREASURY BOND"
if maturity_years_val:
instrument_type_val = f"{maturity_years_val} {instrument_type_val}"
return coupon_rate_val, maturity_years_val, instrument_type_val.strip(), issue_number_val
def parse_date_flexible(date_str: str, default=None) -> Optional[dt]:
if not date_str:
return default
# Handle cases like "27-DEC-2012"
date_str = date_str.replace("-", " ").title() if len(date_str.split('-')) == 3 else date_str
formats_to_try = ["%d %b %Y", "%B %d, %Y", "%d/%m/%Y"]
for fmt in formats_to_try:
try:
return dt.strptime(date_str, fmt).date()
except ValueError:
continue
print(f"Warning: Could not parse date string: {date_str}")
return default
def get_summary_item_value(summary_list: List[Dict[str, Any]], item_desc_key: str, default=None) -> Optional[str]:
for item in summary_list:
if item.get("itemDesc", "").strip().upper() == item_desc_key.upper():
value_str = item.get("itemValue")
if value_str:
return value_str.strip()
return default
class BondDataScraper:
BASE_URL = "https://www.bot.go.tz"
TBONDS_URL = f"{BASE_URL}/TBonds"
AUCTION_SUMMARY_URL = f"{BASE_URL}/TBonds/AuctionSummaries"
IMPERSONATE_PROFILE = "chrome110" # Or another recent Chrome version
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9',
}
self.ajax_headers = {
**self.headers,
'Accept': 'application/json, text/javascript, */*; q=0.01', # Adjusted for AJAX
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Origin': self.BASE_URL,
'Referer': self.TBONDS_URL,
}
async def _fetch_content(self, session: AsyncSession, url: str, method: str = "GET", data: Optional[Dict] = None, is_json: bool = False):
try:
if method.upper() == "POST":
response = await session.post(url, headers=self.ajax_headers if data else self.headers, data=data, impersonate=self.IMPERSONATE_PROFILE, timeout=20)
else:
response = await session.get(url, headers=self.headers, impersonate=self.IMPERSONATE_PROFILE, timeout=60*5)
response.raise_for_status()
return response.json() if is_json else response.text
except RequestsError as e: # Updated exception type for curl_cffi
print(f"HTTP error fetching {url}: {e}") # curl_cffi errors might not have response.status_code directly
except Exception as e:
print(f"Unexpected error fetching {url}: {e}")
return None
async def _parse_main_tbonds_page(self, html_content: str) -> List[Dict[str, Any]]:
if not html_content:
return []
soup = BeautifulSoup(html_content, 'html.parser')
table = soup.find('table', class_='tbond-table') # Or id="DataTables_Table_0"
if not table:
print("Main T-Bonds table not found.")
return []
parsed_rows = []
tbody = table.find('tbody')
if not tbody:
print("Tbody not found in main T-Bonds table.")
return []
for row in tbody.find_all('tr'):
cols = row.find_all(['th', 'td']) # First col might be th
if len(cols) < 5:
continue
try:
# Sn. is cols[0]
auction_number_text = cols[1].get_text(strip=True)
auction_title = cols[2].get_text(strip=True)
auction_date_str = cols[3].get_text(strip=True)
view_button = cols[4].find('button', id='showSummaryDetails')
if not view_button or not view_button.get('value'):
print(f"Skipping row, view button or value not found for auction: {auction_number_text}")
continue
button_value_parts = view_button['value'].split('_')
au_no_str = button_value_parts[0]
au_days_part = button_value_parts[1] # Usually '1'
parsed_rows.append({
'table_auction_number_text': auction_number_text, # This is the au_no
'table_auction_title': auction_title,
'table_auction_date_str': auction_date_str,
'au_no': int(au_no_str),
'au_days_part': au_days_part,
})
except Exception as e:
print(f"Error parsing main table row: {e}. Row: {[c.get_text(strip=True) for c in cols]}")
return parsed_rows
async def _fetch_bond_details(self, session: AsyncSession, au_no: int, au_days_part: str) -> Optional[Dict[str, Any]]:
payload = {'au_no': str(au_no), 'au_days': au_days_part}
return await self._fetch_content(session, self.AUCTION_SUMMARY_URL, method="POST", data=payload, is_json=True)
def _parse_bond_details_json(self, json_data: Dict[str, Any], initial_data: Dict[str, Any]) -> Optional[BondCreate]:
if not json_data or json_data.get("message") != "SUCCESS":
print(f"Bond details JSON invalid or not successful for au_no: {initial_data.get('au_no')}")
return None
summary_list = json_data.get("tbondSummary", [])
coupon_rate, maturity_years, instrument_type, issue_number = parse_bond_title_details(json_data.get("bondTitle", ""))
auction_date_obj = parse_date_flexible(initial_data['table_auction_date_str'])
if not auction_date_obj: # Critical, skip if no valid auction date
print(f"Critical: Could not parse auction_date for au_no: {initial_data.get('au_no')}")
return None
maturity_date_str = get_summary_item_value(summary_list, "REDEMPTION DATE")
maturity_date_obj = parse_date_flexible(maturity_date_str)
dtm_val = None
if maturity_date_obj and auction_date_obj:
dtm_val = (maturity_date_obj - auction_date_obj).days
face_value_str = get_summary_item_value(summary_list, "AMOUNT OFFERED TZS(000,000)")
face_value_val = None
if face_value_str:
try:
face_value_val = int(float(face_value_str.replace(",", "")) * 1_000_000)
except ValueError:
print(f"Could not parse face_value: {face_value_str} for au_no: {initial_data.get('au_no')}")
price_per_100_str = get_summary_item_value(summary_list, "MINIMUM SUCCESSFUL PRICE / 100") # Or WAP?
price_per_100_val = None
if price_per_100_str:
try:
price_per_100_val = float(price_per_100_str)
except ValueError:
print(f"Could not parse price_per_100: {price_per_100_str} for au_no: {initial_data.get('au_no')}")
holding_number_str = json_data.get("auctionNumber") # This is the "1" from "AUCTION NUMBER 1 HELD ON..."
holding_number_val = int(holding_number_str) if holding_number_str and holding_number_str.isdigit() else None
return BondCreate(
instrument_type=instrument_type if instrument_type else "TREASURY BOND",
auction_number=initial_data['au_no'], # This is the main identifier from table
auction_date=auction_date_obj,
maturity_years=maturity_years,
maturity_date=maturity_date_obj,
effective_date=auction_date_obj, # Assuming effective date is auction date
dtm=dtm_val,
bond_auction_number=issue_number, # Issue number from title
holding_number=holding_number_val, # From JSON details header auctionNumber
face_value=face_value_val,
price_per_100=price_per_100_val,
coupon_rate=coupon_rate,
isin=json_data.get("ISIN")
)
async def scrape_all_bond_data(self) -> AsyncGenerator[BondCreate, None]:
async with AsyncSession() as session:
# First GET request to establish session cookies if necessary
await session.get(self.TBONDS_URL, headers=self.headers, impersonate=self.IMPERSONATE_PROFILE,timeout=60*5)
main_page_html = await self._fetch_content(session, self.TBONDS_URL, method="GET")
print(main_page_html)
if not main_page_html:
print("Failed to fetch main T-Bonds page.")
return
initial_bond_rows = await self._parse_main_tbonds_page(main_page_html)
print(f"Found {len(initial_bond_rows)} initial bond rows from main table.")
for row_data in initial_bond_rows:
print(f"Fetching details for au_no: {row_data['au_no']}...")
await asyncio.sleep(0.5) # Small delay to be polite
details_json = await self._fetch_bond_details(session, row_data['au_no'], row_data['au_days_part'])
if details_json:
bond_create_obj = self._parse_bond_details_json(details_json, row_data)
if bond_create_obj:
yield bond_create_obj
else:
print(f"Failed to fetch or parse details for au_no: {row_data['au_no']}")