Zulelee's picture
Upload 62 files
57b8424
raw
history blame
5.07 kB
"""Text processing functions"""
import urllib
from typing import Dict, Generator, Optional
import string
from selenium.webdriver.remote.webdriver import WebDriver
from config import Config
from agent.llm_utils import create_chat_completion
import os
from md2pdf.core import md2pdf
CFG = Config()
def split_text(text: str, max_length: int = 8192) -> Generator[str, None, None]:
"""Split text into chunks of a maximum length
Args:
text (str): The text to split
max_length (int, optional): The maximum length of each chunk. Defaults to 8192.
Yields:
str: The next chunk of text
Raises:
ValueError: If the text is longer than the maximum length
"""
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def summarize_text(
url: str, text: str, question: str, driver: Optional[WebDriver] = None
) -> str:
"""Summarize text using the OpenAI API
Args:
url (str): The url of the text
text (str): The text to summarize
question (str): The question to ask the model
driver (WebDriver): The webdriver to use to scroll the page
Returns:
str: The summary of the text
"""
if not text:
return "Error: No text to summarize"
summaries = []
chunks = list(split_text(text))
scroll_ratio = 1 / len(chunks)
print(f"Summarizing url: {url} with total chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
if driver:
scroll_to_percentage(driver, scroll_ratio * i)
#memory_to_add = f"Source: {url}\n" f"Raw content part#{i + 1}: {chunk}"
#MEMORY.add_documents([Document(page_content=memory_to_add)])
messages = [create_message(chunk, question)]
summary = create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
max_tokens=CFG.summary_token_limit
)
summaries.append(summary)
#memory_to_add = f"Source: {url}\n" f"Content summary part#{i + 1}: {summary}"
#MEMORY.add_documents([Document(page_content=memory_to_add)])
combined_summary = "\n".join(summaries)
messages = [create_message(combined_summary, question)]
final_summary = create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
max_tokens=CFG.summary_token_limit
)
print("Final summary length: ", len(combined_summary))
print(final_summary)
return final_summary
def scroll_to_percentage(driver: WebDriver, ratio: float) -> None:
"""Scroll to a percentage of the page
Args:
driver (WebDriver): The webdriver to use
ratio (float): The percentage to scroll to
Raises:
ValueError: If the ratio is not between 0 and 1
"""
if ratio < 0 or ratio > 1:
raise ValueError("Percentage should be between 0 and 1")
driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {ratio});")
def create_message(chunk: str, question: str) -> Dict[str, str]:
"""Create a message for the chat completion
Args:
chunk (str): The chunk of text to summarize
question (str): The question to answer
Returns:
Dict[str, str]: The message to send to the chat completion
"""
return {
"role": "user",
"content": f'"""{chunk}""" Using the above text, answer in short the following'
f' question: "{question}" -- if the question cannot be answered using the text,'
" simply summarize the text. "
"Include all factual information, numbers, stats etc if available.",
}
def write_to_file(filename: str, text: str) -> None:
"""Write text to a file
Args:
text (str): The text to write
filename (str): The filename to write to
"""
with open(filename, "w") as file:
file.write(text)
async def write_md_to_pdf(task: str, path: str, text: str) -> None:
file_path = f"{path}/{task}"
write_to_file(f"{file_path}.md", text)
md_to_pdf(f"{file_path}.md", f"{file_path}.pdf")
print(f"{task} written to {file_path}.pdf")
encoded_file_path = urllib.parse.quote(f"{file_path}.pdf")
return encoded_file_path
def read_txt_files(directory):
all_text = ''
for filename in os.listdir(directory):
if filename.endswith('.txt'):
with open(os.path.join(directory, filename), 'r') as file:
all_text += file.read() + '\n'
return all_text
def md_to_pdf(input_file, output_file):
md2pdf(output_file,
md_content=None,
md_file_path=input_file,
css_file_path=None,
base_url=None)