import asyncio from curl_cffi.requests import AsyncSession,RequestsError from bs4 import BeautifulSoup from App.routers.bonds.schemas import BondCreate # Adjust import path # from .bond_utils import parse_bond_title_details, parse_date_flexible, get_summary_item_value # If in separate file from typing import List, Dict, Any, Optional, AsyncGenerator import re from datetime import datetime as dt from typing import Tuple, Optional, List, Dict, Any def parse_bond_title_details(title_str: str) -> Tuple[Optional[float], Optional[str], str, Optional[int]]: coupon_rate_val = None maturity_years_val = None instrument_type_val = "TREASURY BOND" issue_number_val = None if not title_str: return coupon_rate_val, maturity_years_val, instrument_type_val, issue_number_val # 1. Coupon Rate coupon_match = re.search(r'(\d+\.?\d*)%', title_str) if coupon_match: coupon_rate_val = float(coupon_match.group(1)) remaining_after_coupon = title_str.split(coupon_match.group(0), 1)[-1].strip() else: remaining_after_coupon = title_str # 2. Maturity Years maturity_match = re.search(r'(\d+)-YEAR', remaining_after_coupon, re.IGNORECASE) if maturity_match: maturity_years_val = f"{maturity_match.group(1)}-YEAR" remaining_after_year = remaining_after_coupon.split(maturity_match.group(0), 1)[-1].strip() else: remaining_after_year = remaining_after_coupon # 3. Instrument Type (base part) # Remove "NUMBER ..." and "ISSUE ..." parts for cleaner type detection cleaner_remaining = re.split(r'\s+NUMBER\s+\d+', remaining_after_year, flags=re.IGNORECASE)[0] cleaner_remaining = re.split(r'\s+ISSUE\s+\d+', cleaner_remaining, flags=re.IGNORECASE)[0].strip() if cleaner_remaining: instrument_type_val = cleaner_remaining if maturity_years_val and maturity_years_val not in instrument_type_val : # Prepend year if not already part of it instrument_type_val = f"{maturity_years_val} {instrument_type_val}" elif maturity_years_val: instrument_type_val = f"{maturity_years_val} TREASURY BOND" # 4. Issue Number (search in original title for issue) issue_match = re.search(r'ISSUE\s+(\d+)', title_str, re.IGNORECASE) if issue_match: issue_number_val = int(issue_match.group(1)) if not instrument_type_val.strip() and title_str.strip(): # Fallback instrument_type_val = "TREASURY BOND" if maturity_years_val: instrument_type_val = f"{maturity_years_val} {instrument_type_val}" return coupon_rate_val, maturity_years_val, instrument_type_val.strip(), issue_number_val def parse_date_flexible(date_str: str, default=None) -> Optional[dt]: if not date_str: return default # Handle cases like "27-DEC-2012" date_str = date_str.replace("-", " ").title() if len(date_str.split('-')) == 3 else date_str formats_to_try = ["%d %b %Y", "%B %d, %Y", "%d/%m/%Y"] for fmt in formats_to_try: try: return dt.strptime(date_str, fmt).date() except ValueError: continue print(f"Warning: Could not parse date string: {date_str}") return default def get_summary_item_value(summary_list: List[Dict[str, Any]], item_desc_key: str, default=None) -> Optional[str]: for item in summary_list: if item.get("itemDesc", "").strip().upper() == item_desc_key.upper(): value_str = item.get("itemValue") if value_str: return value_str.strip() return default class BondDataScraper: BASE_URL = "https://www.bot.go.tz" TBONDS_URL = f"{BASE_URL}/TBonds" AUCTION_SUMMARY_URL = f"{BASE_URL}/TBonds/AuctionSummaries" IMPERSONATE_PROFILE = "chrome110" # Or another recent Chrome version def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', } self.ajax_headers = { **self.headers, 'Accept': 'application/json, text/javascript, */*; q=0.01', # Adjusted for AJAX 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'X-Requested-With': 'XMLHttpRequest', 'Origin': self.BASE_URL, 'Referer': self.TBONDS_URL, } async def _fetch_content(self, session: AsyncSession, url: str, method: str = "GET", data: Optional[Dict] = None, is_json: bool = False): try: if method.upper() == "POST": response = await session.post(url, headers=self.ajax_headers if data else self.headers, data=data, impersonate=self.IMPERSONATE_PROFILE, timeout=20) else: response = await session.get(url, headers=self.headers, impersonate=self.IMPERSONATE_PROFILE, timeout=60*5) response.raise_for_status() return response.json() if is_json else response.text except RequestsError as e: # Updated exception type for curl_cffi print(f"HTTP error fetching {url}: {e}") # curl_cffi errors might not have response.status_code directly except Exception as e: print(f"Unexpected error fetching {url}: {e}") return None async def _parse_main_tbonds_page(self, html_content: str) -> List[Dict[str, Any]]: if not html_content: return [] soup = BeautifulSoup(html_content, 'html.parser') table = soup.find('table', class_='tbond-table') # Or id="DataTables_Table_0" if not table: print("Main T-Bonds table not found.") return [] parsed_rows = [] tbody = table.find('tbody') if not tbody: print("Tbody not found in main T-Bonds table.") return [] for row in tbody.find_all('tr'): cols = row.find_all(['th', 'td']) # First col might be th if len(cols) < 5: continue try: # Sn. is cols[0] auction_number_text = cols[1].get_text(strip=True) auction_title = cols[2].get_text(strip=True) auction_date_str = cols[3].get_text(strip=True) view_button = cols[4].find('button', id='showSummaryDetails') if not view_button or not view_button.get('value'): print(f"Skipping row, view button or value not found for auction: {auction_number_text}") continue button_value_parts = view_button['value'].split('_') au_no_str = button_value_parts[0] au_days_part = button_value_parts[1] # Usually '1' parsed_rows.append({ 'table_auction_number_text': auction_number_text, # This is the au_no 'table_auction_title': auction_title, 'table_auction_date_str': auction_date_str, 'au_no': int(au_no_str), 'au_days_part': au_days_part, }) except Exception as e: print(f"Error parsing main table row: {e}. Row: {[c.get_text(strip=True) for c in cols]}") return parsed_rows async def _fetch_bond_details(self, session: AsyncSession, au_no: int, au_days_part: str) -> Optional[Dict[str, Any]]: payload = {'au_no': str(au_no), 'au_days': au_days_part} return await self._fetch_content(session, self.AUCTION_SUMMARY_URL, method="POST", data=payload, is_json=True) def _parse_bond_details_json(self, json_data: Dict[str, Any], initial_data: Dict[str, Any]) -> Optional[BondCreate]: if not json_data or json_data.get("message") != "SUCCESS": print(f"Bond details JSON invalid or not successful for au_no: {initial_data.get('au_no')}") return None summary_list = json_data.get("tbondSummary", []) coupon_rate, maturity_years, instrument_type, issue_number = parse_bond_title_details(json_data.get("bondTitle", "")) auction_date_obj = parse_date_flexible(initial_data['table_auction_date_str']) if not auction_date_obj: # Critical, skip if no valid auction date print(f"Critical: Could not parse auction_date for au_no: {initial_data.get('au_no')}") return None maturity_date_str = get_summary_item_value(summary_list, "REDEMPTION DATE") maturity_date_obj = parse_date_flexible(maturity_date_str) dtm_val = None if maturity_date_obj and auction_date_obj: dtm_val = (maturity_date_obj - auction_date_obj).days face_value_str = get_summary_item_value(summary_list, "AMOUNT OFFERED TZS(000,000)") face_value_val = None if face_value_str: try: face_value_val = int(float(face_value_str.replace(",", "")) * 1_000_000) except ValueError: print(f"Could not parse face_value: {face_value_str} for au_no: {initial_data.get('au_no')}") price_per_100_str = get_summary_item_value(summary_list, "MINIMUM SUCCESSFUL PRICE / 100") # Or WAP? price_per_100_val = None if price_per_100_str: try: price_per_100_val = float(price_per_100_str) except ValueError: print(f"Could not parse price_per_100: {price_per_100_str} for au_no: {initial_data.get('au_no')}") holding_number_str = json_data.get("auctionNumber") # This is the "1" from "AUCTION NUMBER 1 HELD ON..." holding_number_val = int(holding_number_str) if holding_number_str and holding_number_str.isdigit() else None return BondCreate( instrument_type=instrument_type if instrument_type else "TREASURY BOND", auction_number=initial_data['au_no'], # This is the main identifier from table auction_date=auction_date_obj, maturity_years=maturity_years, maturity_date=maturity_date_obj, effective_date=auction_date_obj, # Assuming effective date is auction date dtm=dtm_val, bond_auction_number=issue_number, # Issue number from title holding_number=holding_number_val, # From JSON details header auctionNumber face_value=face_value_val, price_per_100=price_per_100_val, coupon_rate=coupon_rate, isin=json_data.get("ISIN") ) async def scrape_all_bond_data(self) -> AsyncGenerator[BondCreate, None]: async with AsyncSession() as session: # First GET request to establish session cookies if necessary await session.get(self.TBONDS_URL, headers=self.headers, impersonate=self.IMPERSONATE_PROFILE,timeout=60*5) main_page_html = await self._fetch_content(session, self.TBONDS_URL, method="GET") print(main_page_html) if not main_page_html: print("Failed to fetch main T-Bonds page.") return initial_bond_rows = await self._parse_main_tbonds_page(main_page_html) print(f"Found {len(initial_bond_rows)} initial bond rows from main table.") for row_data in initial_bond_rows: print(f"Fetching details for au_no: {row_data['au_no']}...") await asyncio.sleep(0.5) # Small delay to be polite details_json = await self._fetch_bond_details(session, row_data['au_no'], row_data['au_days_part']) if details_json: bond_create_obj = self._parse_bond_details_json(details_json, row_data) if bond_create_obj: yield bond_create_obj else: print(f"Failed to fetch or parse details for au_no: {row_data['au_no']}")