jisujang's picture
first
a005c19
import os
from oauth2client.service_account import ServiceAccountCredentials
import gspread
from dotenv import load_dotenv
from enviroments.convert import get_json_from_env_var
from typing import Optional, List
load_dotenv(override=True)
class SheetManager:
def __init__(self, spreadsheet_url: Optional[str] = None,
worksheet_name: str = "flag",
column_name: str = "huggingface_id"):
"""
Initialize SheetManager with Google Sheets credentials and connection.
Args:
spreadsheet_url (str, optional): URL of the Google Spreadsheet.
If None, takes from environment variable.
worksheet_name (str): Name of the worksheet to operate on.
Defaults to "flag".
column_name (str): Name of the column to operate on.
Defaults to "huggingface_id".
"""
self.spreadsheet_url = spreadsheet_url or os.getenv("SPREADSHEET_URL")
if not self.spreadsheet_url:
raise ValueError("Spreadsheet URL not provided and not found in environment variables")
self.worksheet_name = worksheet_name
self.column_name = column_name
# Initialize credentials and client
self._init_google_client()
# Initialize sheet connection
self.doc = None
self.sheet = None
self.col_index = None
self._connect_to_sheet(validate_column=True)
def _init_google_client(self):
"""Initialize Google Sheets client with credentials."""
scope = ['https://spreadsheets.google.com/feeds',
'https://www.googleapis.com/auth/drive']
json_key_dict = get_json_from_env_var("GOOGLE_CREDENTIALS")
credentials = ServiceAccountCredentials.from_json_keyfile_dict(json_key_dict, scope)
self.client = gspread.authorize(credentials)
def _connect_to_sheet(self, validate_column: bool = True):
"""
Connect to the specified Google Sheet and initialize necessary attributes.
Args:
validate_column (bool): Whether to validate the column name exists
"""
try:
self.doc = self.client.open_by_url(self.spreadsheet_url)
# Try to get the worksheet
try:
self.sheet = self.doc.worksheet(self.worksheet_name)
except gspread.exceptions.WorksheetNotFound:
raise ValueError(f"Worksheet '{self.worksheet_name}' not found in spreadsheet")
# Get headers
self.headers = self.sheet.row_values(1)
# Validate column only if requested
if validate_column:
try:
self.col_index = self.headers.index(self.column_name) + 1
except ValueError:
# If column not found, use first available column
if self.headers:
self.column_name = self.headers[0]
self.col_index = 1
print(f"Column '{self.column_name}' not found. Using first available column: '{self.headers[0]}'")
else:
raise ValueError("No columns found in worksheet")
except Exception as e:
if isinstance(e, ValueError):
raise e
raise ConnectionError(f"Failed to connect to sheet: {str(e)}")
def change_worksheet(self, worksheet_name: str, column_name: Optional[str] = None):
"""
Change the current worksheet and optionally the column.
Args:
worksheet_name (str): Name of the worksheet to switch to
column_name (str, optional): Name of the column to switch to
"""
old_worksheet = self.worksheet_name
old_column = self.column_name
try:
self.worksheet_name = worksheet_name
if column_name:
self.column_name = column_name
# First connect without column validation
self._connect_to_sheet(validate_column=False)
# Then validate the column if specified
if column_name:
self.change_column(column_name)
else:
# Validate existing column in new worksheet
try:
self.col_index = self.headers.index(self.column_name) + 1
except ValueError:
# If column not found, use first available column
if self.headers:
self.column_name = self.headers[0]
self.col_index = 1
print(f"Column '{old_column}' not found in new worksheet. Using first available column: '{self.headers[0]}'")
else:
raise ValueError("No columns found in worksheet")
print(f"Successfully switched to worksheet: {worksheet_name}, using column: {self.column_name}")
except Exception as e:
# Restore previous state on error
self.worksheet_name = old_worksheet
self.column_name = old_column
self._connect_to_sheet()
raise e
def change_column(self, column_name: str):
"""
Change the target column.
Args:
column_name (str): Name of the column to switch to
"""
if not self.headers:
self.headers = self.sheet.row_values(1)
try:
self.col_index = self.headers.index(column_name) + 1
self.column_name = column_name
print(f"Successfully switched to column: {column_name}")
except ValueError:
raise ValueError(f"Column '{column_name}' not found in worksheet. Available columns: {', '.join(self.headers)}")
def get_available_worksheets(self) -> List[str]:
"""Get list of all available worksheets in the spreadsheet."""
return [worksheet.title for worksheet in self.doc.worksheets()]
def get_available_columns(self) -> List[str]:
"""Get list of all available columns in the current worksheet."""
return self.headers if self.headers else self.sheet.row_values(1)
def _reconnect_if_needed(self):
"""Reconnect to the sheet if the connection is lost."""
try:
self.sheet.row_values(1)
except (gspread.exceptions.APIError, AttributeError):
self._init_google_client()
self._connect_to_sheet()
def _fetch_column_data(self) -> List[str]:
"""Fetch all data from the huggingface_id column."""
values = self.sheet.col_values(self.col_index)
return values[1:] # Exclude header
def _update_sheet(self, data: List[str]):
"""Update the entire column with new data."""
try:
# Prepare the range for update (excluding header)
start_cell = gspread.utils.rowcol_to_a1(2, self.col_index) # Start from row 2
end_cell = gspread.utils.rowcol_to_a1(len(data) + 2, self.col_index)
range_name = f"{start_cell}:{end_cell}"
# Convert data to 2D array format required by gspread
cells = [[value] for value in data]
# Update the range
self.sheet.update(range_name, cells)
except Exception as e:
print(f"Error updating sheet: {str(e)}")
raise
def push(self, text: str) -> int:
"""
Push a text value to the next empty cell in the huggingface_id column.
Args:
text (str): Text to push to the sheet
Returns:
int: The row number where the text was pushed
"""
try:
self._reconnect_if_needed()
# Get all values in the huggingface_id column
column_values = self.sheet.col_values(self.col_index)
# Find the next empty row
next_row = None
for i in range(1, len(column_values)):
if not column_values[i].strip():
next_row = i + 1
break
# If no empty row found, append to the end
if next_row is None:
next_row = len(column_values) + 1
# Update the cell
self.sheet.update_cell(next_row, self.col_index, text)
print(f"Successfully pushed value: {text} to row {next_row}")
return next_row
except Exception as e:
print(f"Error pushing to sheet: {str(e)}")
raise
def pop(self) -> Optional[str]:
"""Remove and return the most recent value."""
try:
self._reconnect_if_needed()
data = self._fetch_column_data()
if not data or not data[0].strip():
return None
value = data.pop(0) # Remove first value
data.append("") # Add empty string at the end to maintain sheet size
self._update_sheet(data)
print(f"Successfully popped value: {value}")
return value
except Exception as e:
print(f"Error popping from sheet: {str(e)}")
raise
def delete(self, value: str) -> List[int]:
"""Delete all occurrences of a value."""
try:
self._reconnect_if_needed()
data = self._fetch_column_data()
# Find all indices before deletion
indices = [i + 1 for i, v in enumerate(data) if v.strip() == value.strip()]
if not indices:
print(f"Value '{value}' not found in sheet")
return []
# Remove matching values and add empty strings at the end
data = [v for v in data if v.strip() != value.strip()]
data.extend([""] * len(indices)) # Add empty strings to maintain sheet size
self._update_sheet(data)
print(f"Successfully deleted value '{value}' from rows: {indices}")
return indices
except Exception as e:
print(f"Error deleting from sheet: {str(e)}")
raise
def update_cell_by_condition(self, condition_column: str, condition_value: str, target_column: str, target_value: str) -> Optional[int]:
"""
Update the value of a cell based on a condition in another column.
Args:
condition_column (str): The column to check the condition on.
condition_value (str): The value to match in the condition column.
target_column (str): The column where the value should be updated.
target_value (str): The new value to set in the target column.
Returns:
Optional[int]: The row number where the value was updated, or None if no matching row was found.
"""
try:
self._reconnect_if_needed()
# Get all column headers
headers = self.sheet.row_values(1)
# Find the indices for the condition and target columns
try:
condition_col_index = headers.index(condition_column) + 1
except ValueError:
raise ValueError(f"์กฐ๊ฑด ์นผ๋Ÿผ '{condition_column}'์ด(๊ฐ€) ์—†์Šต๋‹ˆ๋‹ค.")
try:
target_col_index = headers.index(target_column) + 1
except ValueError:
raise ValueError(f"๋ชฉํ‘œ ์นผ๋Ÿผ '{target_column}'์ด(๊ฐ€) ์—†์Šต๋‹ˆ๋‹ค.")
# Get all rows of data
data = self.sheet.get_all_records()
# Find the row that matches the condition
for i, row in enumerate(data):
if row.get(condition_column) == condition_value:
# Update the target column in the matching row
row_number = i + 2 # Row index starts at 2 (1 is header)
self.sheet.update_cell(row_number, target_col_index, target_value)
print(f"Updated row {row_number}: Set {target_column} to '{target_value}' where {condition_column} is '{condition_value}'")
return row_number
print(f"์กฐ๊ฑด์— ๋งž๋Š” ํ–‰์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {condition_column} = '{condition_value}'")
return None
except Exception as e:
print(f"Error updating cell by condition: {str(e)}")
raise
def get_all_values(self) -> List[str]:
"""Get all values from the huggingface_id column."""
self._reconnect_if_needed()
return [v for v in self._fetch_column_data() if v.strip()]
# Example usage
if __name__ == "__main__":
# Initialize sheet manager
sheet_manager = SheetManager()
# # Push some test values
# sheet_manager.push("test-model-1")
# sheet_manager.push("test-model-2")
# sheet_manager.push("test-model-3")
# print("Initial values:", sheet_manager.get_all_values())
# # Pop the most recent value
# popped = sheet_manager.pop()
# print(f"Popped value: {popped}")
# print("After pop:", sheet_manager.get_all_values())
# # Delete a specific value
# deleted_rows = sheet_manager.delete("test-model-2")
# print(f"Deleted from rows: {deleted_rows}")
# print("After delete:", sheet_manager.get_all_values())
row_updated = sheet_manager.update_cell_by_condition(
condition_column="model",
condition_value="msr",
target_column="pia",
target_value="new_value"
)