|
import os |
|
from oauth2client.service_account import ServiceAccountCredentials |
|
import gspread |
|
from dotenv import load_dotenv |
|
from enviroments.convert import get_json_from_env_var |
|
from typing import Optional, List |
|
|
|
load_dotenv(override=True) |
|
|
|
class SheetManager: |
|
def __init__(self, spreadsheet_url: Optional[str] = None, |
|
worksheet_name: str = "flag", |
|
column_name: str = "huggingface_id"): |
|
""" |
|
Initialize SheetManager with Google Sheets credentials and connection. |
|
|
|
Args: |
|
spreadsheet_url (str, optional): URL of the Google Spreadsheet. |
|
If None, takes from environment variable. |
|
worksheet_name (str): Name of the worksheet to operate on. |
|
Defaults to "flag". |
|
column_name (str): Name of the column to operate on. |
|
Defaults to "huggingface_id". |
|
""" |
|
self.spreadsheet_url = spreadsheet_url or os.getenv("SPREADSHEET_URL") |
|
if not self.spreadsheet_url: |
|
raise ValueError("Spreadsheet URL not provided and not found in environment variables") |
|
|
|
self.worksheet_name = worksheet_name |
|
self.column_name = column_name |
|
|
|
|
|
self._init_google_client() |
|
|
|
|
|
self.doc = None |
|
self.sheet = None |
|
self.col_index = None |
|
self._connect_to_sheet(validate_column=True) |
|
|
|
def _init_google_client(self): |
|
"""Initialize Google Sheets client with credentials.""" |
|
scope = ['https://spreadsheets.google.com/feeds', |
|
'https://www.googleapis.com/auth/drive'] |
|
json_key_dict = get_json_from_env_var("GOOGLE_CREDENTIALS") |
|
credentials = ServiceAccountCredentials.from_json_keyfile_dict(json_key_dict, scope) |
|
self.client = gspread.authorize(credentials) |
|
|
|
def _connect_to_sheet(self, validate_column: bool = True): |
|
""" |
|
Connect to the specified Google Sheet and initialize necessary attributes. |
|
|
|
Args: |
|
validate_column (bool): Whether to validate the column name exists |
|
""" |
|
try: |
|
self.doc = self.client.open_by_url(self.spreadsheet_url) |
|
|
|
|
|
try: |
|
self.sheet = self.doc.worksheet(self.worksheet_name) |
|
except gspread.exceptions.WorksheetNotFound: |
|
raise ValueError(f"Worksheet '{self.worksheet_name}' not found in spreadsheet") |
|
|
|
|
|
self.headers = self.sheet.row_values(1) |
|
|
|
|
|
if validate_column: |
|
try: |
|
self.col_index = self.headers.index(self.column_name) + 1 |
|
except ValueError: |
|
|
|
if self.headers: |
|
self.column_name = self.headers[0] |
|
self.col_index = 1 |
|
print(f"Column '{self.column_name}' not found. Using first available column: '{self.headers[0]}'") |
|
else: |
|
raise ValueError("No columns found in worksheet") |
|
|
|
except Exception as e: |
|
if isinstance(e, ValueError): |
|
raise e |
|
raise ConnectionError(f"Failed to connect to sheet: {str(e)}") |
|
|
|
def change_worksheet(self, worksheet_name: str, column_name: Optional[str] = None): |
|
""" |
|
Change the current worksheet and optionally the column. |
|
|
|
Args: |
|
worksheet_name (str): Name of the worksheet to switch to |
|
column_name (str, optional): Name of the column to switch to |
|
""" |
|
old_worksheet = self.worksheet_name |
|
old_column = self.column_name |
|
|
|
try: |
|
self.worksheet_name = worksheet_name |
|
if column_name: |
|
self.column_name = column_name |
|
|
|
|
|
self._connect_to_sheet(validate_column=False) |
|
|
|
|
|
if column_name: |
|
self.change_column(column_name) |
|
else: |
|
|
|
try: |
|
self.col_index = self.headers.index(self.column_name) + 1 |
|
except ValueError: |
|
|
|
if self.headers: |
|
self.column_name = self.headers[0] |
|
self.col_index = 1 |
|
print(f"Column '{old_column}' not found in new worksheet. Using first available column: '{self.headers[0]}'") |
|
else: |
|
raise ValueError("No columns found in worksheet") |
|
|
|
print(f"Successfully switched to worksheet: {worksheet_name}, using column: {self.column_name}") |
|
|
|
except Exception as e: |
|
|
|
self.worksheet_name = old_worksheet |
|
self.column_name = old_column |
|
self._connect_to_sheet() |
|
raise e |
|
|
|
def change_column(self, column_name: str): |
|
""" |
|
Change the target column. |
|
|
|
Args: |
|
column_name (str): Name of the column to switch to |
|
""" |
|
if not self.headers: |
|
self.headers = self.sheet.row_values(1) |
|
|
|
try: |
|
self.col_index = self.headers.index(column_name) + 1 |
|
self.column_name = column_name |
|
print(f"Successfully switched to column: {column_name}") |
|
except ValueError: |
|
raise ValueError(f"Column '{column_name}' not found in worksheet. Available columns: {', '.join(self.headers)}") |
|
|
|
def get_available_worksheets(self) -> List[str]: |
|
"""Get list of all available worksheets in the spreadsheet.""" |
|
return [worksheet.title for worksheet in self.doc.worksheets()] |
|
|
|
def get_available_columns(self) -> List[str]: |
|
"""Get list of all available columns in the current worksheet.""" |
|
return self.headers if self.headers else self.sheet.row_values(1) |
|
|
|
def _reconnect_if_needed(self): |
|
"""Reconnect to the sheet if the connection is lost.""" |
|
try: |
|
self.sheet.row_values(1) |
|
except (gspread.exceptions.APIError, AttributeError): |
|
self._init_google_client() |
|
self._connect_to_sheet() |
|
|
|
def _fetch_column_data(self) -> List[str]: |
|
"""Fetch all data from the huggingface_id column.""" |
|
values = self.sheet.col_values(self.col_index) |
|
return values[1:] |
|
|
|
def _update_sheet(self, data: List[str]): |
|
"""Update the entire column with new data.""" |
|
try: |
|
|
|
start_cell = gspread.utils.rowcol_to_a1(2, self.col_index) |
|
end_cell = gspread.utils.rowcol_to_a1(len(data) + 2, self.col_index) |
|
range_name = f"{start_cell}:{end_cell}" |
|
|
|
|
|
cells = [[value] for value in data] |
|
|
|
|
|
self.sheet.update(range_name, cells) |
|
except Exception as e: |
|
print(f"Error updating sheet: {str(e)}") |
|
raise |
|
|
|
def push(self, text: str) -> int: |
|
""" |
|
Push a text value to the next empty cell in the huggingface_id column. |
|
|
|
Args: |
|
text (str): Text to push to the sheet |
|
|
|
Returns: |
|
int: The row number where the text was pushed |
|
""" |
|
try: |
|
self._reconnect_if_needed() |
|
|
|
|
|
column_values = self.sheet.col_values(self.col_index) |
|
|
|
|
|
next_row = None |
|
for i in range(1, len(column_values)): |
|
if not column_values[i].strip(): |
|
next_row = i + 1 |
|
break |
|
|
|
|
|
if next_row is None: |
|
next_row = len(column_values) + 1 |
|
|
|
|
|
self.sheet.update_cell(next_row, self.col_index, text) |
|
print(f"Successfully pushed value: {text} to row {next_row}") |
|
return next_row |
|
|
|
except Exception as e: |
|
print(f"Error pushing to sheet: {str(e)}") |
|
raise |
|
|
|
def pop(self) -> Optional[str]: |
|
"""Remove and return the most recent value.""" |
|
try: |
|
self._reconnect_if_needed() |
|
data = self._fetch_column_data() |
|
|
|
if not data or not data[0].strip(): |
|
return None |
|
|
|
value = data.pop(0) |
|
data.append("") |
|
|
|
self._update_sheet(data) |
|
print(f"Successfully popped value: {value}") |
|
return value |
|
|
|
except Exception as e: |
|
print(f"Error popping from sheet: {str(e)}") |
|
raise |
|
|
|
def delete(self, value: str) -> List[int]: |
|
"""Delete all occurrences of a value.""" |
|
try: |
|
self._reconnect_if_needed() |
|
data = self._fetch_column_data() |
|
|
|
|
|
indices = [i + 1 for i, v in enumerate(data) if v.strip() == value.strip()] |
|
if not indices: |
|
print(f"Value '{value}' not found in sheet") |
|
return [] |
|
|
|
|
|
data = [v for v in data if v.strip() != value.strip()] |
|
data.extend([""] * len(indices)) |
|
|
|
self._update_sheet(data) |
|
print(f"Successfully deleted value '{value}' from rows: {indices}") |
|
return indices |
|
|
|
except Exception as e: |
|
print(f"Error deleting from sheet: {str(e)}") |
|
raise |
|
|
|
def update_cell_by_condition(self, condition_column: str, condition_value: str, target_column: str, target_value: str) -> Optional[int]: |
|
""" |
|
Update the value of a cell based on a condition in another column. |
|
|
|
Args: |
|
condition_column (str): The column to check the condition on. |
|
condition_value (str): The value to match in the condition column. |
|
target_column (str): The column where the value should be updated. |
|
target_value (str): The new value to set in the target column. |
|
|
|
Returns: |
|
Optional[int]: The row number where the value was updated, or None if no matching row was found. |
|
""" |
|
try: |
|
self._reconnect_if_needed() |
|
|
|
|
|
headers = self.sheet.row_values(1) |
|
|
|
|
|
try: |
|
condition_col_index = headers.index(condition_column) + 1 |
|
except ValueError: |
|
raise ValueError(f"์กฐ๊ฑด ์นผ๋ผ '{condition_column}'์ด(๊ฐ) ์์ต๋๋ค.") |
|
|
|
try: |
|
target_col_index = headers.index(target_column) + 1 |
|
except ValueError: |
|
raise ValueError(f"๋ชฉํ ์นผ๋ผ '{target_column}'์ด(๊ฐ) ์์ต๋๋ค.") |
|
|
|
|
|
data = self.sheet.get_all_records() |
|
|
|
|
|
for i, row in enumerate(data): |
|
if row.get(condition_column) == condition_value: |
|
|
|
row_number = i + 2 |
|
self.sheet.update_cell(row_number, target_col_index, target_value) |
|
print(f"Updated row {row_number}: Set {target_column} to '{target_value}' where {condition_column} is '{condition_value}'") |
|
return row_number |
|
|
|
print(f"์กฐ๊ฑด์ ๋ง๋ ํ์ ์ฐพ์ ์ ์์ต๋๋ค: {condition_column} = '{condition_value}'") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"Error updating cell by condition: {str(e)}") |
|
raise |
|
|
|
def get_all_values(self) -> List[str]: |
|
"""Get all values from the huggingface_id column.""" |
|
self._reconnect_if_needed() |
|
return [v for v in self._fetch_column_data() if v.strip()] |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
sheet_manager = SheetManager() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
row_updated = sheet_manager.update_cell_by_condition( |
|
condition_column="model", |
|
condition_value="msr", |
|
target_column="pia", |
|
target_value="new_value" |
|
) |
|
|
|
|