Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import HfApi, create_repo | |
import os | |
import re | |
import json | |
import torch | |
import random | |
from typing import List, Dict, Union, Tuple | |
from gliner import GLiNER | |
from datasets import load_dataset | |
from dotenv import load_dotenv | |
# Load environment variables from .env | |
load_dotenv() | |
HF_TOKEN = os.getenv("HUGGINGFACE_ACCESS_TOKEN") | |
# Available models for annotation | |
AVAILABLE_MODELS = [ | |
"BookingCare/gliner-multi-healthcare", | |
"knowledgator/gliner-multitask-large-v0.5", | |
"knowledgator/gliner-multitask-base-v0.5" | |
] | |
# Dataset Viewer Classes and Functions | |
class DynamicDataset: | |
def __init__( | |
self, data: List[Dict[str, Union[List[Union[int, str]], bool]]] | |
) -> None: | |
self.data = data | |
self.data_len = len(self.data) | |
self.current = -1 | |
for example in self.data: | |
if not "validated" in example.keys(): | |
example["validated"] = False | |
def next_example(self): | |
self.current += 1 | |
if self.current > self.data_len-1: | |
self.current = self.data_len -1 | |
elif self.current < 0: | |
self.current = 0 | |
def previous_example(self): | |
self.current -= 1 | |
if self.current > self.data_len-1: | |
self.current = self.data_len -1 | |
elif self.current < 0: | |
self.current = 0 | |
def example_by_id(self, id): | |
self.current = id | |
if self.current > self.data_len-1: | |
self.current = self.data_len -1 | |
elif self.current < 0: | |
self.current = 0 | |
def validate(self): | |
self.data[self.current]["validated"] = True | |
def load_current_example(self): | |
return self.data[self.current] | |
def tokenize_text(text): | |
"""Tokenize the input text into a list of tokens.""" | |
return re.findall(r'\w+(?:[-_]\w+)*|\S', text) | |
def join_tokens(tokens): | |
# Joining tokens with space, but handling special characters correctly | |
text = "" | |
for token in tokens: | |
if token in {",", ".", "!", "?", ":", ";", "..."}: | |
text = text.rstrip() + token | |
else: | |
text += " " + token | |
return text.strip() | |
def prepare_for_highlight(data): | |
tokens = data["tokenized_text"] | |
ner = data["ner"] | |
highlighted_text = [] | |
current_entity = None | |
entity_tokens = [] | |
normal_tokens = [] | |
for idx, token in enumerate(tokens): | |
# Check if the current token is the start of a new entity | |
if current_entity is None or idx > current_entity[1]: | |
if entity_tokens: | |
highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
entity_tokens = [] | |
current_entity = next((entity for entity in ner if entity[0] == idx), None) | |
# If current token is part of an entity | |
if current_entity and current_entity[0] <= idx <= current_entity[1]: | |
if normal_tokens: | |
highlighted_text.append((" ".join(normal_tokens), None)) | |
normal_tokens = [] | |
entity_tokens.append(token + " ") | |
else: | |
if entity_tokens: | |
highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
entity_tokens = [] | |
normal_tokens.append(token + " ") | |
# Append any remaining tokens | |
if entity_tokens: | |
highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
if normal_tokens: | |
highlighted_text.append((" ".join(normal_tokens), None)) | |
# Clean up spaces before punctuation | |
cleaned_highlighted_text = [] | |
for text, label in highlighted_text: | |
cleaned_text = re.sub(r'\s(?=[,\.!?…:;])', '', text) | |
cleaned_highlighted_text.append((cleaned_text, label)) | |
return cleaned_highlighted_text | |
def extract_tokens_and_labels(data: List[Dict[str, Union[str, None]]]) -> Dict[str, Union[List[str], List[Tuple[int, int, str]]]]: | |
tokens = [] | |
ner = [] | |
token_start_idx = 0 | |
for entry in data: | |
char = entry['token'] | |
label = entry['class_or_confidence'] | |
# Tokenize the current text chunk | |
token_list = tokenize_text(char) | |
# Append tokens to the main tokens list | |
tokens.extend(token_list) | |
if label: | |
token_end_idx = token_start_idx + len(token_list) - 1 | |
ner.append((token_start_idx, token_end_idx, label)) | |
token_start_idx += len(token_list) | |
return tokens, ner | |
# Global variables for dataset viewer | |
dynamic_dataset = None | |
def load_dataset(): | |
global dynamic_dataset | |
try: | |
with open("data/annotated_data.json", 'rt') as dataset: | |
ANNOTATED_DATA = json.load(dataset) | |
dynamic_dataset = DynamicDataset(ANNOTATED_DATA) | |
max_value = len(dynamic_dataset.data) - 1 if dynamic_dataset.data else 0 | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=0, maximum=max_value) | |
except Exception as e: | |
return [("Error loading dataset: " + str(e), None)], gr.update(value=0, maximum=1) | |
def example_by_id(id): | |
global dynamic_dataset | |
if dynamic_dataset is None: | |
return [("Please load a dataset first", None)], gr.update(value=0, maximum=1) | |
try: | |
id = int(id) # Ensure id is an integer | |
dynamic_dataset.example_by_id(id) | |
current = dynamic_dataset.current | |
max_value = len(dynamic_dataset.data) - 1 | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=current, maximum=max_value) | |
except Exception as e: | |
return [("Error navigating to example: " + str(e), None)], gr.update(value=0, maximum=1) | |
def next_example(): | |
global dynamic_dataset | |
if dynamic_dataset is None: | |
return [("Please load a dataset first", None)], gr.update(value=0, maximum=1) | |
try: | |
dynamic_dataset.next_example() | |
current = dynamic_dataset.current | |
max_value = len(dynamic_dataset.data) - 1 | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=current, maximum=max_value) | |
except Exception as e: | |
return [("Error navigating to next example: " + str(e), None)], gr.update(value=0, maximum=1) | |
def previous_example(): | |
global dynamic_dataset | |
if dynamic_dataset is None: | |
return [("Please load a dataset first", None)], gr.update(value=0, maximum=1) | |
try: | |
dynamic_dataset.previous_example() | |
current = dynamic_dataset.current | |
max_value = len(dynamic_dataset.data) - 1 | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=current, maximum=max_value) | |
except Exception as e: | |
return [("Error navigating to previous example: " + str(e), None)], gr.update(value=0, maximum=1) | |
def update_example(data): | |
global dynamic_dataset | |
if dynamic_dataset is None: | |
return [("Please load a dataset first", None)] | |
tokens, ner = extract_tokens_and_labels(data) | |
dynamic_dataset.data[dynamic_dataset.current]["tokenized_text"] = tokens | |
dynamic_dataset.data[dynamic_dataset.current]["ner"] = ner | |
return prepare_for_highlight(dynamic_dataset.load_current_example()) | |
def validate_example(): | |
global dynamic_dataset | |
if dynamic_dataset is None: | |
return [("Please load a dataset first", None)] | |
dynamic_dataset.data[dynamic_dataset.current]["validated"] = True | |
return [("The example was validated!", None)] | |
def save_dataset(inp): | |
global dynamic_dataset | |
if dynamic_dataset is None: | |
return [("Please load a dataset first", None)] | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(dynamic_dataset.data, file) | |
return [("The validated dataset was saved as data/annotated_data.json", None)] | |
# Original annotation functions | |
def transform_data(data): | |
tokens = tokenize_text(data['text']) | |
spans = [] | |
for entity in data['entities']: | |
entity_tokens = tokenize_text(entity['word']) | |
entity_length = len(entity_tokens) | |
# Find the start and end indices of each entity in the tokenized text | |
for i in range(len(tokens) - entity_length + 1): | |
if tokens[i:i + entity_length] == entity_tokens: | |
spans.append([i, i + entity_length - 1, entity['entity']]) | |
break | |
return {"tokenized_text": tokens, "ner": spans, "validated": False} | |
def merge_entities(entities): | |
if not entities: | |
return [] | |
merged = [] | |
current = entities[0] | |
for next_entity in entities[1:]: | |
if next_entity['entity'] == current['entity'] and (next_entity['start'] == current['end'] + 1 or next_entity['start'] == current['end']): | |
current['word'] += ' ' + next_entity['word'] | |
current['end'] = next_entity['end'] | |
else: | |
merged.append(current) | |
current = next_entity | |
merged.append(current) | |
return merged | |
def annotate_text( | |
model, text, labels: List[str], threshold: float, nested_ner: bool | |
) -> Dict: | |
labels = [label.strip() for label in labels] | |
r = { | |
"text": text, | |
"entities": [ | |
{ | |
"entity": entity["label"], | |
"word": entity["text"], | |
"start": entity["start"], | |
"end": entity["end"], | |
"score": 0, | |
} | |
for entity in model.predict_entities( | |
text, labels, flat_ner=not nested_ner, threshold=threshold | |
) | |
], | |
} | |
r["entities"] = merge_entities(r["entities"]) | |
return transform_data(r) | |
def batch_annotate_text(model: GLiNER, texts: List[str], labels: List[str], threshold: float, nested_ner: bool) -> List[Dict]: | |
"""Annotate multiple texts in batch""" | |
labels = [label.strip() for label in labels] | |
batch_entities = model.batch_predict_entities(texts, labels, flat_ner=not nested_ner, threshold=threshold) | |
results = [] | |
for text, entities in zip(texts, batch_entities): | |
r = { | |
"text": text, | |
"entities": [ | |
{ | |
"entity": entity["label"], | |
"word": entity["text"], | |
"start": entity["start"], | |
"end": entity["end"], | |
"score": 0, | |
} | |
for entity in entities | |
], | |
} | |
r["entities"] = merge_entities(r["entities"]) | |
results.append(transform_data(r)) | |
return results | |
class AutoAnnotator: | |
def __init__( | |
self, model: str = "BookingCare/gliner-multi-healthcare", | |
# device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') | |
device = torch.device('cpu') | |
) -> None: | |
# Set PyTorch memory management settings | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True' | |
self.model = GLiNER.from_pretrained(model).to(device) | |
self.annotated_data = [] | |
self.stat = { | |
"total": None, | |
"current": -1 | |
} | |
def auto_annotate( | |
self, data: List[str], labels: List[str], | |
prompt: Union[str, List[str]] = None, threshold: float = 0.5, nested_ner: bool = False | |
) -> List[Dict]: | |
self.stat["total"] = len(data) | |
self.stat["current"] = -1 # Reset current progress | |
# Process texts in batches | |
processed_data = [] | |
batch_size = 8 # Reduced batch size to prevent OOM errors | |
for i in range(0, len(data), batch_size): | |
batch_texts = data[i:i + batch_size] | |
batch_with_prompts = [] | |
# Add prompts to batch texts | |
for text in batch_texts: | |
if isinstance(prompt, list): | |
prompt_text = random.choice(prompt) | |
else: | |
prompt_text = prompt | |
text_with_prompt = f"{prompt_text}\n{text}" if prompt_text else text | |
batch_with_prompts.append(text_with_prompt) | |
# Process batch | |
batch_results = batch_annotate_text(self.model, batch_with_prompts, labels, threshold, nested_ner) | |
processed_data.extend(batch_results) | |
# Clear CUDA cache after each batch | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
# Update progress | |
self.stat["current"] = min(i + batch_size, len(data)) | |
self.annotated_data = processed_data | |
return self.annotated_data | |
# Global variables | |
annotator = None | |
sentences = [] | |
def process_text_for_gliner(text: str, max_tokens: int = 256, overlap: int = 32) -> List[str]: | |
""" | |
Process text for GLiNER by splitting long texts into overlapping chunks. | |
Preserves sentence boundaries and context when possible. | |
Args: | |
text: The input text to process | |
max_tokens: Maximum number of tokens per chunk | |
overlap: Number of tokens to overlap between chunks | |
Returns: | |
List of text chunks suitable for GLiNER | |
""" | |
# First split into sentences to preserve natural boundaries | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for sentence in sentences: | |
# Tokenize the sentence | |
sentence_tokens = tokenize_text(sentence) | |
sentence_length = len(sentence_tokens) | |
# If a single sentence is too long, split it | |
if sentence_length > max_tokens: | |
# If we have accumulated tokens, add them as a chunk | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [] | |
current_length = 0 | |
# Split the long sentence into smaller chunks | |
start = 0 | |
while start < sentence_length: | |
end = min(start + max_tokens, sentence_length) | |
chunk_tokens = sentence_tokens[start:end] | |
chunks.append(" ".join(chunk_tokens)) | |
start = end - overlap if end < sentence_length else end | |
# If adding this sentence would exceed max_tokens, start a new chunk | |
elif current_length + sentence_length > max_tokens: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = sentence_tokens | |
current_length = sentence_length | |
else: | |
current_chunk.extend(sentence_tokens) | |
current_length += sentence_length | |
# Add any remaining tokens as the final chunk | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
def process_uploaded_file(file_obj): | |
if file_obj is None: | |
return "Please upload a file first!" | |
try: | |
# Read the uploaded file | |
global sentences | |
if file_obj.name.endswith('.csv'): | |
import pandas as pd | |
df = pd.read_csv(file_obj.name) | |
sentences = df['Nội dung'].dropna().tolist() | |
# Process each sentence and flatten the list | |
processed_sentences = [] | |
for sentence in sentences: | |
processed_sentences.extend(process_text_for_gliner(sentence)) | |
sentences = processed_sentences | |
else: | |
# Read the file content directly from the file object | |
content = file_obj.read().decode('utf-8') | |
raw_sentences = [line.strip() for line in content.splitlines() if line.strip()] | |
# Process each sentence and flatten the list | |
processed_sentences = [] | |
for sentence in raw_sentences: | |
processed_sentences.extend(process_text_for_gliner(sentence)) | |
sentences = processed_sentences | |
return f"Successfully loaded {len(sentences)} sentences from file!" | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
def is_valid_repo_name(repo_name): | |
# Hugging Face repo names must not contain slashes or spaces | |
return bool(re.match(r'^[A-Za-z0-9_./-]+$', repo_name)) | |
def create_hf_repo(repo_name: str, repo_type: str = "dataset", private: bool = False): | |
"""Create a new repository on Hugging Face Hub""" | |
if not is_valid_repo_name(repo_name): | |
raise Exception("Invalid repo name: must not contain slashes, spaces, or special characters except '-', '_', '.'") | |
try: | |
api = HfApi(token=HF_TOKEN) | |
# user = api.whoami()['name'] | |
# repo_id = f"{user}/{repo_name}" | |
create_repo( | |
repo_id=repo_name, | |
repo_type=repo_type, | |
private=private, | |
exist_ok=True, | |
token=HF_TOKEN | |
) | |
return repo_name | |
except Exception as e: | |
raise Exception(f"Error creating repository: {str(e)}") | |
def annotate(model, labels, threshold, prompt, save_to_hub, repo_name, repo_type, is_private): | |
global annotator | |
try: | |
if not sentences: | |
return "Please upload a file with text first!" | |
if save_to_hub and not is_valid_repo_name(repo_name): | |
return "Error: Invalid repo name. Only use letters, numbers, '-', '_', or '.' (no slashes or spaces)." | |
labels = [label.strip() for label in labels.split(",")] | |
annotator = AutoAnnotator(model) | |
annotated_data = annotator.auto_annotate(sentences, labels, prompt, threshold) | |
# Save annotated data locally | |
os.makedirs("data", exist_ok=True) | |
local_path = "data/annotated_data.json" | |
with open(local_path, "wt") as file: | |
json.dump(annotated_data, file, ensure_ascii=False) | |
status_messages = [f"Successfully annotated and saved locally to {local_path}"] | |
# Upload to Hugging Face Hub if requested | |
if save_to_hub: | |
try: | |
repo_id = create_hf_repo(repo_name, repo_type, is_private) | |
api = HfApi(token=HF_TOKEN) | |
api.upload_file( | |
path_or_fileobj=local_path, | |
path_in_repo="annotated_data.json", | |
repo_id=repo_id, | |
repo_type=repo_type, | |
token=HF_TOKEN | |
) | |
status_messages.append(f"Successfully uploaded to Hugging Face Hub repository: {repo_id}") | |
except Exception as e: | |
status_messages.append(f"Error with Hugging Face Hub: {str(e)}") | |
return "\n".join(status_messages) | |
except Exception as e: | |
return f"Error during annotation: {str(e)}" | |
def convert_hf_dataset_to_ner_format(dataset): | |
"""Convert Hugging Face dataset to NER format""" | |
converted_data = [] | |
for item in dataset: | |
# Assuming the dataset has 'tokens' and 'ner_tags' fields | |
# Adjust the field names based on your dataset structure | |
if 'tokens' in item and 'ner_tags' in item: | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(item['tokens'], item['ner_tags'])): | |
if tag != 'O': # Not Outside | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": item['tokens'], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
return converted_data | |
def load_from_huggingface(dataset_name: str, split: str = "all"): | |
"""Load dataset from Hugging Face Hub""" | |
try: | |
dataset = load_dataset(dataset_name, split=split) | |
converted_data = convert_hf_dataset_to_ner_format(dataset) | |
# Save the converted data | |
os.makedirs("data", exist_ok=True) | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(converted_data, file, ensure_ascii=False) | |
return f"Successfully loaded and converted dataset: {dataset_name}" | |
except Exception as e: | |
return f"Error loading dataset: {str(e)}" | |
def load_from_local_file(file_path: str, file_format: str = "json"): | |
"""Load and convert data from local file in various formats""" | |
try: | |
if file_format == "json": | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
if isinstance(data, list): | |
# If data is already in the correct format | |
if all("tokenized_text" in item and "ner" in item for item in data): | |
return data | |
# Convert from other JSON formats | |
converted_data = [] | |
for item in data: | |
if "tokens" in item and "ner_tags" in item: | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(item["tokens"], item["ner_tags"])): | |
if tag != "O": | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": item["tokens"], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
return converted_data | |
else: | |
raise ValueError("JSON file must contain a list of examples") | |
elif file_format == "conll": | |
converted_data = [] | |
current_example = {"tokens": [], "ner_tags": []} | |
with open(file_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
line = line.strip() | |
if line: | |
if line.startswith("#"): | |
continue | |
parts = line.split() | |
if len(parts) >= 2: | |
token, tag = parts[0], parts[-1] | |
current_example["tokens"].append(token) | |
current_example["ner_tags"].append(tag) | |
elif current_example["tokens"]: | |
# Convert current example | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): | |
if tag != "O": | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": current_example["tokens"], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
current_example = {"tokens": [], "ner_tags": []} | |
# Handle last example if exists | |
if current_example["tokens"]: | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): | |
if tag != "O": | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": current_example["tokens"], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
return converted_data | |
elif file_format == "txt": | |
# Simple text file with one sentence per line | |
converted_data = [] | |
with open(file_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
line = line.strip() | |
if line: | |
tokens = tokenize_text(line) | |
converted_data.append({ | |
"tokenized_text": tokens, | |
"ner": [], | |
"validated": False | |
}) | |
return converted_data | |
else: | |
raise ValueError(f"Unsupported file format: {file_format}") | |
except Exception as e: | |
raise Exception(f"Error loading file: {str(e)}") | |
def process_local_file(file_obj, file_format): | |
"""Process uploaded local file""" | |
if file_obj is None: | |
return "Please upload a file first!" | |
try: | |
# Load and convert the data | |
data = load_from_local_file(file_obj.name, file_format) | |
# Save the converted data | |
os.makedirs("data", exist_ok=True) | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(data, file, ensure_ascii=False) | |
return f"Successfully loaded and converted {len(data)} examples from {file_format} file!" | |
except Exception as e: | |
return f"Error processing file: {str(e)}" | |
# Add a function to download the annotated data | |
def download_annotated_data(): | |
file_path = "data/annotated_data.json" | |
if os.path.exists(file_path): | |
return file_path | |
else: | |
return None | |
def download_to_folder(): | |
"""Download annotated data to a local folder""" | |
try: | |
source_path = "data/annotated_data.json" | |
if not os.path.exists(source_path): | |
return "No annotated data found!" | |
# Create downloads directory if it doesn't exist | |
download_dir = os.path.expanduser("~/Downloads") | |
os.makedirs(download_dir, exist_ok=True) | |
# Copy file to downloads folder | |
import shutil | |
dest_path = os.path.join(download_dir, "annotated_data.json") | |
shutil.copy2(source_path, dest_path) | |
return f"Successfully downloaded to {dest_path}" | |
except Exception as e: | |
return f"Error downloading file: {str(e)}" | |
def update_hf_dataset(repo_name: str, repo_type: str = "dataset", is_private: bool = False): | |
"""Update or create a Hugging Face dataset with the current annotated data""" | |
try: | |
if not dynamic_dataset or not dynamic_dataset.data: | |
return "No data to upload! Please load or annotate data first." | |
# Save current data to local file | |
os.makedirs("data", exist_ok=True) | |
local_path = "data/annotated_data.json" | |
with open(local_path, "wt") as file: | |
json.dump(dynamic_dataset.data, file, ensure_ascii=False) | |
# Create or update repository | |
try: | |
repo_id = create_hf_repo(repo_name, repo_type, is_private) | |
api = HfApi(token=HF_TOKEN) | |
api.upload_file( | |
path_or_fileobj=local_path, | |
path_in_repo="annotated_data.json", | |
repo_id=repo_id, | |
repo_type=repo_type, | |
token=HF_TOKEN | |
) | |
return f"Successfully uploaded to Hugging Face Hub repository: {repo_id}" | |
except Exception as e: | |
if "already exists" in str(e): | |
# If repo exists, just update the file | |
user = api.whoami()['name'] | |
repo_id = f"{user}/{repo_name}" | |
api.upload_file( | |
path_or_fileobj=local_path, | |
path_in_repo="annotated_data.json", | |
repo_id=repo_id, | |
repo_type=repo_type, | |
token=HF_TOKEN | |
) | |
return f"Successfully updated existing repository: {repo_id}" | |
else: | |
raise e | |
except Exception as e: | |
return f"Error updating Hugging Face dataset: {str(e)}" | |
# Create the main interface with tabs | |
with gr.Blocks() as demo: | |
gr.Markdown("# NER Annotation Tool") | |
with gr.Tabs(): | |
with gr.TabItem("Auto Annotation"): | |
with gr.Row(): | |
with gr.Column(): | |
file_uploader = gr.File(label="Upload text file (one sentence per line)") | |
upload_status = gr.Textbox(label="Upload Status") | |
file_uploader.change(fn=process_uploaded_file, inputs=[file_uploader], outputs=[upload_status]) | |
with gr.Column(): | |
model = gr.Dropdown( | |
label="Choose the model for annotation", | |
choices=AVAILABLE_MODELS, | |
value=AVAILABLE_MODELS[0] | |
) | |
labels = gr.Textbox( | |
label="Labels", | |
placeholder="Enter comma-separated labels (e.g., PERSON,ORG,LOC)", | |
scale=2 | |
) | |
threshold = gr.Slider( | |
0, 1, | |
value=0.3, | |
step=0.01, | |
label="Threshold", | |
info="Lower threshold increases entity predictions" | |
) | |
prompt = gr.Textbox( | |
label="Prompt", | |
placeholder="Enter your annotation prompt (optional)", | |
scale=2 | |
) | |
with gr.Group(): | |
gr.Markdown("### Save Options") | |
save_to_hub = gr.Checkbox( | |
label="Save to Hugging Face Hub", | |
value=False | |
) | |
with gr.Group(visible=False) as hub_settings: | |
gr.Markdown("#### Hugging Face Hub Settings") | |
repo_name = gr.Textbox( | |
label="Repository Name", | |
placeholder="Enter repository name (e.g., my-ner-dataset)", | |
scale=2 | |
) | |
repo_type = gr.Dropdown( | |
choices=["dataset", "model", "space"], | |
value="dataset", | |
label="Repository Type" | |
) | |
is_private = gr.Checkbox( | |
label="Private Repository", | |
value=False | |
) | |
annotate_btn = gr.Button("Annotate Data") | |
output_info = gr.Textbox(label="Processing Status") | |
# Add download buttons for annotated data | |
with gr.Row(): | |
download_btn_annot = gr.Button("Download Annotated Data", visible=False) | |
download_file_annot = gr.File(label="Download", interactive=False, visible=False) | |
download_status = gr.Textbox(label="Download Status", visible=False) | |
def toggle_hub_settings(save_to_hub): | |
return { | |
hub_settings: gr.update(visible=save_to_hub) | |
} | |
save_to_hub.change( | |
fn=toggle_hub_settings, | |
inputs=[save_to_hub], | |
outputs=[hub_settings] | |
) | |
def show_download_buttons(status): | |
# Show download buttons only if annotation was successful | |
if status and status.startswith("Successfully annotated and saved locally"): | |
return gr.update(visible=True), gr.update(visible=True) | |
return gr.update(visible=False), gr.update(visible=False) | |
annotate_btn.click( | |
fn=annotate, | |
inputs=[ | |
model, labels, threshold, prompt, | |
save_to_hub, repo_name, repo_type, is_private | |
], | |
outputs=[output_info] | |
) | |
output_info.change( | |
fn=show_download_buttons, | |
inputs=[output_info], | |
outputs=[download_btn_annot, download_status] | |
) | |
def handle_download_annot(): | |
file_path = download_annotated_data() | |
if file_path: | |
return gr.update(value=file_path, visible=True) | |
else: | |
return gr.update(visible=False) | |
download_btn_annot.click(fn=handle_download_annot, inputs=None, outputs=[download_file_annot]) | |
with gr.TabItem("Dataset Viewer"): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
load_local_btn = gr.Button("Load Local Dataset") | |
load_hf_btn = gr.Button("Load from Hugging Face") | |
local_file = gr.File(label="Upload Local Dataset", visible=False) | |
file_format = gr.Dropdown( | |
choices=["json", "conll", "txt"], | |
value="json", | |
label="File Format", | |
visible=False | |
) | |
local_status = gr.Textbox(label="Local File Status", visible=False) | |
with gr.Group(visible=False) as hf_inputs: | |
with gr.Row(): | |
dataset_name = gr.Textbox( | |
label="Hugging Face Dataset Name", | |
placeholder="Enter dataset name (e.g., conll2003)", | |
scale=3 | |
) | |
dataset_split = gr.Dropdown( | |
choices=["train", "validation", "test"], | |
value="train", | |
label="Dataset Split", | |
scale=2 | |
) | |
load_dataset_btn = gr.Button("Load Dataset", scale=1) | |
hf_status = gr.Textbox(label="Dataset Loading Status") | |
bar = gr.Slider( | |
minimum=0, | |
maximum=1, | |
step=1, | |
label="Progress", | |
interactive=True, | |
info="Use slider to navigate through examples" | |
) | |
with gr.Row(): | |
previous_btn = gr.Button("Previous example") | |
apply_btn = gr.Button("Apply changes") | |
next_btn = gr.Button("Next example") | |
validate_btn = gr.Button("Validate") | |
save_btn = gr.Button("Save validated dataset") | |
# Add Hugging Face upload section | |
with gr.Group(visible=False) as hf_upload_group: | |
gr.Markdown("### Upload to Hugging Face") | |
hf_repo_name = gr.Textbox( | |
label="Repository Name", | |
placeholder="Enter repository name (e.g., my-ner-dataset)", | |
scale=2 | |
) | |
hf_repo_type = gr.Dropdown( | |
choices=["dataset", "model", "space"], | |
value="dataset", | |
label="Repository Type" | |
) | |
hf_is_private = gr.Checkbox( | |
label="Private Repository", | |
value=False | |
) | |
upload_to_hf_btn = gr.Button("Upload to Hugging Face") | |
hf_upload_status = gr.Textbox(label="Upload Status") | |
with gr.Row(): | |
show_hf_upload_btn = gr.Button("Show Upload Options") | |
hide_hf_upload_btn = gr.Button("Hide Upload Options", visible=False) | |
def toggle_hf_upload(show: bool): | |
return { | |
hf_upload_group: gr.update(visible=show), | |
show_hf_upload_btn: gr.update(visible=not show), | |
hide_hf_upload_btn: gr.update(visible=show) | |
} | |
show_hf_upload_btn.click( | |
fn=lambda: toggle_hf_upload(True), | |
inputs=None, | |
outputs=[hf_upload_group, show_hf_upload_btn, hide_hf_upload_btn] | |
) | |
hide_hf_upload_btn.click( | |
fn=lambda: toggle_hf_upload(False), | |
inputs=None, | |
outputs=[hf_upload_group, show_hf_upload_btn, hide_hf_upload_btn] | |
) | |
inp_box = gr.HighlightedText(value=None, interactive=True) | |
def toggle_local_inputs(): | |
return { | |
local_file: gr.update(visible=True), | |
file_format: gr.update(visible=True), | |
local_status: gr.update(visible=True), | |
hf_inputs: gr.update(visible=False) | |
} | |
def toggle_hf_inputs(): | |
return { | |
local_file: gr.update(visible=False), | |
file_format: gr.update(visible=False), | |
local_status: gr.update(visible=False), | |
hf_inputs: gr.update(visible=True) | |
} | |
load_local_btn.click( | |
fn=toggle_local_inputs, | |
inputs=None, | |
outputs=[local_file, file_format, local_status, hf_inputs] | |
) | |
load_hf_btn.click( | |
fn=toggle_hf_inputs, | |
inputs=None, | |
outputs=[local_file, file_format, local_status, hf_inputs] | |
) | |
def process_and_load_local(file_obj, format): | |
status = process_local_file(file_obj, format) | |
if "Successfully" in status: | |
return load_dataset() | |
return [status], 0, 0 | |
local_file.change( | |
fn=process_and_load_local, | |
inputs=[local_file, file_format], | |
outputs=[inp_box, bar] | |
) | |
def load_hf_dataset(name, split): | |
status = load_from_huggingface(name, split) | |
if "Successfully" in status: | |
return load_dataset(), status | |
return [status], 0, 0, status | |
load_dataset_btn.click( | |
fn=load_hf_dataset, | |
inputs=[dataset_name, dataset_split], | |
outputs=[inp_box, bar, hf_status] | |
) | |
apply_btn.click(fn=update_example, inputs=inp_box, outputs=inp_box) | |
save_btn.click(fn=save_dataset, inputs=inp_box, outputs=inp_box) | |
validate_btn.click(fn=validate_example, inputs=None, outputs=inp_box) | |
next_btn.click(fn=next_example, inputs=None, outputs=[inp_box, bar]) | |
previous_btn.click(fn=previous_example, inputs=None, outputs=[inp_box, bar]) | |
bar.change( | |
fn=example_by_id, | |
inputs=[bar], | |
outputs=[inp_box, bar], | |
api_name="example_by_id" | |
) | |
# Add Hugging Face upload functionality | |
upload_to_hf_btn.click( | |
fn=update_hf_dataset, | |
inputs=[hf_repo_name, hf_repo_type, hf_is_private], | |
outputs=[hf_upload_status] | |
) | |
demo.launch() |