Mike Jay
tweaking prompt
7f1778f
raw
history blame
2.87 kB
"""Module for questions data processing"""
import os
import json
import requests
QUESTIONS_DATA_FILE = "data/questions_data.json"
def _get_task_files(files_url: str, questions_data: dict):
for item in questions_data:
task_id = item.get("task_id")
file_name = item.get("file_name")
local_file = "/".join(["data", file_name])
if task_id and file_name:
file_url = "/".join([files_url, task_id])
response = requests.get(file_url, stream=True, timeout=120)
response.raise_for_status()
with open(local_file, "wb") as f:
f.write(response.content)
print(f"File '{file_name}' downloaded successfully.")
def _is_exists_questions_data_file() -> bool:
if os.path.exists(QUESTIONS_DATA_FILE):
print(f"The file '{QUESTIONS_DATA_FILE}' exists.")
return True
print(f"The file '{QUESTIONS_DATA_FILE}' does not exist.")
return False
def _write_question_data_to_file(questions_data: dict):
with open(QUESTIONS_DATA_FILE, "w", encoding="utf-8") as json_file:
json.dump(
questions_data, json_file, indent=4
) # The 'indent=4' parameter pretty-prints the JSON with 4 spaces for indentation.
print(f"Pretty-printed JSON data written to {QUESTIONS_DATA_FILE}")
def _read_question_data_from_file() -> dict:
with open(QUESTIONS_DATA_FILE, "r", encoding="utf-8") as json_file:
questions_data = json.load(json_file)
return questions_data
def _request_question_data_res(questions_url: str) -> requests.Response:
try:
res = requests.get(questions_url, timeout=60)
res.raise_for_status()
return res
except requests.exceptions.HTTPError as e:
print("HTTP error occurred:", e)
except requests.exceptions.RequestException as e:
print("A request error occurred:", e)
return None
def _json_question_data_from_res(res: requests.Response) -> dict:
try:
questions_data = res.json()
return questions_data
except requests.exceptions.JSONDecodeError as e:
questions_data = None
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {res.text[:500]}")
return None
def get_questions_data(questions_url: str, files_url: str) -> dict:
"""Get Questions Data"""
if _is_exists_questions_data_file():
questions_data = _read_question_data_from_file()
return questions_data
res = _request_question_data_res(questions_url=questions_url)
if res:
questions_data = _json_question_data_from_res(res)
if questions_data:
_write_question_data_to_file(questions_data=questions_data)
_get_task_files(files_url=files_url, questions_data=questions_data)
return questions_data
return None