ner-annotation / app.py
nam pham
fix: upload to huggingface
ffa19f8
raw
history blame
43.2 kB
import gradio as gr
from huggingface_hub import HfApi, create_repo
import os
import re
import json
import torch
import random
from typing import List, Dict, Union, Tuple
from gliner import GLiNER
from datasets import load_dataset
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
HF_TOKEN = os.getenv("HUGGINGFACE_ACCESS_TOKEN")
# Available models for annotation
AVAILABLE_MODELS = [
"BookingCare/gliner-multi-healthcare",
"knowledgator/gliner-multitask-large-v0.5",
"knowledgator/gliner-multitask-base-v0.5"
]
# Dataset Viewer Classes and Functions
class DynamicDataset:
def __init__(
self, data: List[Dict[str, Union[List[Union[int, str]], bool]]]
) -> None:
self.data = data
self.data_len = len(self.data)
self.current = -1
for example in self.data:
if not "validated" in example.keys():
example["validated"] = False
def next_example(self):
self.current += 1
if self.current > self.data_len-1:
self.current = self.data_len -1
elif self.current < 0:
self.current = 0
def previous_example(self):
self.current -= 1
if self.current > self.data_len-1:
self.current = self.data_len -1
elif self.current < 0:
self.current = 0
def example_by_id(self, id):
self.current = id
if self.current > self.data_len-1:
self.current = self.data_len -1
elif self.current < 0:
self.current = 0
def validate(self):
self.data[self.current]["validated"] = True
def load_current_example(self):
return self.data[self.current]
def tokenize_text(text):
"""Tokenize the input text into a list of tokens."""
return re.findall(r'\w+(?:[-_]\w+)*|\S', text)
def join_tokens(tokens):
# Joining tokens with space, but handling special characters correctly
text = ""
for token in tokens:
if token in {",", ".", "!", "?", ":", ";", "..."}:
text = text.rstrip() + token
else:
text += " " + token
return text.strip()
def prepare_for_highlight(data):
tokens = data["tokenized_text"]
ner = data["ner"]
highlighted_text = []
current_entity = None
entity_tokens = []
normal_tokens = []
for idx, token in enumerate(tokens):
# Check if the current token is the start of a new entity
if current_entity is None or idx > current_entity[1]:
if entity_tokens:
highlighted_text.append((" ".join(entity_tokens), current_entity[2]))
entity_tokens = []
current_entity = next((entity for entity in ner if entity[0] == idx), None)
# If current token is part of an entity
if current_entity and current_entity[0] <= idx <= current_entity[1]:
if normal_tokens:
highlighted_text.append((" ".join(normal_tokens), None))
normal_tokens = []
entity_tokens.append(token + " ")
else:
if entity_tokens:
highlighted_text.append((" ".join(entity_tokens), current_entity[2]))
entity_tokens = []
normal_tokens.append(token + " ")
# Append any remaining tokens
if entity_tokens:
highlighted_text.append((" ".join(entity_tokens), current_entity[2]))
if normal_tokens:
highlighted_text.append((" ".join(normal_tokens), None))
# Clean up spaces before punctuation
cleaned_highlighted_text = []
for text, label in highlighted_text:
cleaned_text = re.sub(r'\s(?=[,\.!?…:;])', '', text)
cleaned_highlighted_text.append((cleaned_text, label))
return cleaned_highlighted_text
def extract_tokens_and_labels(data: List[Dict[str, Union[str, None]]]) -> Dict[str, Union[List[str], List[Tuple[int, int, str]]]]:
tokens = []
ner = []
token_start_idx = 0
for entry in data:
char = entry['token']
label = entry['class_or_confidence']
# Tokenize the current text chunk
token_list = tokenize_text(char)
# Append tokens to the main tokens list
tokens.extend(token_list)
if label:
token_end_idx = token_start_idx + len(token_list) - 1
ner.append((token_start_idx, token_end_idx, label))
token_start_idx += len(token_list)
return tokens, ner
# Global variables for dataset viewer
dynamic_dataset = None
def load_dataset():
global dynamic_dataset
try:
with open("data/annotated_data.json", 'rt') as dataset:
ANNOTATED_DATA = json.load(dataset)
dynamic_dataset = DynamicDataset(ANNOTATED_DATA)
max_value = len(dynamic_dataset.data) - 1 if dynamic_dataset.data else 0
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=0, maximum=max_value)
except Exception as e:
return [("Error loading dataset: " + str(e), None)], gr.update(value=0, maximum=1)
def example_by_id(id):
global dynamic_dataset
if dynamic_dataset is None:
return [("Please load a dataset first", None)], gr.update(value=0, maximum=1)
try:
id = int(id) # Ensure id is an integer
dynamic_dataset.example_by_id(id)
current = dynamic_dataset.current
max_value = len(dynamic_dataset.data) - 1
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=current, maximum=max_value)
except Exception as e:
return [("Error navigating to example: " + str(e), None)], gr.update(value=0, maximum=1)
def next_example():
global dynamic_dataset
if dynamic_dataset is None:
return [("Please load a dataset first", None)], gr.update(value=0, maximum=1)
try:
dynamic_dataset.next_example()
current = dynamic_dataset.current
max_value = len(dynamic_dataset.data) - 1
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=current, maximum=max_value)
except Exception as e:
return [("Error navigating to next example: " + str(e), None)], gr.update(value=0, maximum=1)
def previous_example():
global dynamic_dataset
if dynamic_dataset is None:
return [("Please load a dataset first", None)], gr.update(value=0, maximum=1)
try:
dynamic_dataset.previous_example()
current = dynamic_dataset.current
max_value = len(dynamic_dataset.data) - 1
return prepare_for_highlight(dynamic_dataset.load_current_example()), gr.update(value=current, maximum=max_value)
except Exception as e:
return [("Error navigating to previous example: " + str(e), None)], gr.update(value=0, maximum=1)
def update_example(data):
global dynamic_dataset
if dynamic_dataset is None:
return [("Please load a dataset first", None)]
tokens, ner = extract_tokens_and_labels(data)
dynamic_dataset.data[dynamic_dataset.current]["tokenized_text"] = tokens
dynamic_dataset.data[dynamic_dataset.current]["ner"] = ner
return prepare_for_highlight(dynamic_dataset.load_current_example())
def validate_example():
global dynamic_dataset
if dynamic_dataset is None:
return [("Please load a dataset first", None)]
dynamic_dataset.data[dynamic_dataset.current]["validated"] = True
return [("The example was validated!", None)]
def save_dataset(inp):
global dynamic_dataset
if dynamic_dataset is None:
return [("Please load a dataset first", None)]
with open("data/annotated_data.json", "wt") as file:
json.dump(dynamic_dataset.data, file)
return [("The validated dataset was saved as data/annotated_data.json", None)]
# Original annotation functions
def transform_data(data):
tokens = tokenize_text(data['text'])
spans = []
for entity in data['entities']:
entity_tokens = tokenize_text(entity['word'])
entity_length = len(entity_tokens)
# Find the start and end indices of each entity in the tokenized text
for i in range(len(tokens) - entity_length + 1):
if tokens[i:i + entity_length] == entity_tokens:
spans.append([i, i + entity_length - 1, entity['entity']])
break
return {"tokenized_text": tokens, "ner": spans, "validated": False}
def merge_entities(entities):
if not entities:
return []
merged = []
current = entities[0]
for next_entity in entities[1:]:
if next_entity['entity'] == current['entity'] and (next_entity['start'] == current['end'] + 1 or next_entity['start'] == current['end']):
current['word'] += ' ' + next_entity['word']
current['end'] = next_entity['end']
else:
merged.append(current)
current = next_entity
merged.append(current)
return merged
def annotate_text(
model, text, labels: List[str], threshold: float, nested_ner: bool
) -> Dict:
labels = [label.strip() for label in labels]
r = {
"text": text,
"entities": [
{
"entity": entity["label"],
"word": entity["text"],
"start": entity["start"],
"end": entity["end"],
"score": 0,
}
for entity in model.predict_entities(
text, labels, flat_ner=not nested_ner, threshold=threshold
)
],
}
r["entities"] = merge_entities(r["entities"])
return transform_data(r)
def batch_annotate_text(model: GLiNER, texts: List[str], labels: List[str], threshold: float, nested_ner: bool) -> List[Dict]:
"""Annotate multiple texts in batch"""
labels = [label.strip() for label in labels]
batch_entities = model.batch_predict_entities(texts, labels, flat_ner=not nested_ner, threshold=threshold)
results = []
for text, entities in zip(texts, batch_entities):
r = {
"text": text,
"entities": [
{
"entity": entity["label"],
"word": entity["text"],
"start": entity["start"],
"end": entity["end"],
"score": 0,
}
for entity in entities
],
}
r["entities"] = merge_entities(r["entities"])
results.append(transform_data(r))
return results
class AutoAnnotator:
def __init__(
self, model: str = "BookingCare/gliner-multi-healthcare",
# device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
device = torch.device('cpu')
) -> None:
# Set PyTorch memory management settings
if torch.cuda.is_available():
torch.cuda.empty_cache()
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
self.model = GLiNER.from_pretrained(model).to(device)
self.annotated_data = []
self.stat = {
"total": None,
"current": -1
}
def auto_annotate(
self, data: List[str], labels: List[str],
prompt: Union[str, List[str]] = None, threshold: float = 0.5, nested_ner: bool = False
) -> List[Dict]:
self.stat["total"] = len(data)
self.stat["current"] = -1 # Reset current progress
# Process texts in batches
processed_data = []
batch_size = 8 # Reduced batch size to prevent OOM errors
for i in range(0, len(data), batch_size):
batch_texts = data[i:i + batch_size]
batch_with_prompts = []
# Add prompts to batch texts
for text in batch_texts:
if isinstance(prompt, list):
prompt_text = random.choice(prompt)
else:
prompt_text = prompt
text_with_prompt = f"{prompt_text}\n{text}" if prompt_text else text
batch_with_prompts.append(text_with_prompt)
# Process batch
batch_results = batch_annotate_text(self.model, batch_with_prompts, labels, threshold, nested_ner)
processed_data.extend(batch_results)
# Clear CUDA cache after each batch
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Update progress
self.stat["current"] = min(i + batch_size, len(data))
self.annotated_data = processed_data
return self.annotated_data
# Global variables
annotator = None
sentences = []
def process_text_for_gliner(text: str, max_tokens: int = 256, overlap: int = 32) -> List[str]:
"""
Process text for GLiNER by splitting long texts into overlapping chunks.
Preserves sentence boundaries and context when possible.
Args:
text: The input text to process
max_tokens: Maximum number of tokens per chunk
overlap: Number of tokens to overlap between chunks
Returns:
List of text chunks suitable for GLiNER
"""
# First split into sentences to preserve natural boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
# Tokenize the sentence
sentence_tokens = tokenize_text(sentence)
sentence_length = len(sentence_tokens)
# If a single sentence is too long, split it
if sentence_length > max_tokens:
# If we have accumulated tokens, add them as a chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_length = 0
# Split the long sentence into smaller chunks
start = 0
while start < sentence_length:
end = min(start + max_tokens, sentence_length)
chunk_tokens = sentence_tokens[start:end]
chunks.append(" ".join(chunk_tokens))
start = end - overlap if end < sentence_length else end
# If adding this sentence would exceed max_tokens, start a new chunk
elif current_length + sentence_length > max_tokens:
chunks.append(" ".join(current_chunk))
current_chunk = sentence_tokens
current_length = sentence_length
else:
current_chunk.extend(sentence_tokens)
current_length += sentence_length
# Add any remaining tokens as the final chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def process_uploaded_file(file_obj):
if file_obj is None:
return "Please upload a file first!"
try:
# Read the uploaded file
global sentences
if file_obj.name.endswith('.csv'):
import pandas as pd
df = pd.read_csv(file_obj.name)
sentences = df['Nội dung'].dropna().tolist()
# Process each sentence and flatten the list
processed_sentences = []
for sentence in sentences:
processed_sentences.extend(process_text_for_gliner(sentence))
sentences = processed_sentences
else:
# Read the file content directly from the file object
content = file_obj.read().decode('utf-8')
raw_sentences = [line.strip() for line in content.splitlines() if line.strip()]
# Process each sentence and flatten the list
processed_sentences = []
for sentence in raw_sentences:
processed_sentences.extend(process_text_for_gliner(sentence))
sentences = processed_sentences
return f"Successfully loaded {len(sentences)} sentences from file!"
except Exception as e:
return f"Error reading file: {str(e)}"
def is_valid_repo_name(repo_name):
# Hugging Face repo names must not contain slashes or spaces
return bool(re.match(r'^[A-Za-z0-9_./-]+$', repo_name))
def create_hf_repo(repo_name: str, repo_type: str = "dataset", private: bool = False):
"""Create a new repository on Hugging Face Hub"""
if not is_valid_repo_name(repo_name):
raise Exception("Invalid repo name: must not contain slashes, spaces, or special characters except '-', '_', '.'")
try:
api = HfApi(token=HF_TOKEN)
# user = api.whoami()['name']
# repo_id = f"{user}/{repo_name}"
create_repo(
repo_id=repo_name,
repo_type=repo_type,
private=private,
exist_ok=True,
token=HF_TOKEN
)
return repo_name
except Exception as e:
raise Exception(f"Error creating repository: {str(e)}")
def annotate(model, labels, threshold, prompt, save_to_hub, repo_name, repo_type, is_private):
global annotator
try:
if not sentences:
return "Please upload a file with text first!"
if save_to_hub and not is_valid_repo_name(repo_name):
return "Error: Invalid repo name. Only use letters, numbers, '-', '_', or '.' (no slashes or spaces)."
labels = [label.strip() for label in labels.split(",")]
annotator = AutoAnnotator(model)
annotated_data = annotator.auto_annotate(sentences, labels, prompt, threshold)
# Save annotated data locally
os.makedirs("data", exist_ok=True)
local_path = "data/annotated_data.json"
with open(local_path, "wt") as file:
json.dump(annotated_data, file, ensure_ascii=False)
status_messages = [f"Successfully annotated and saved locally to {local_path}"]
# Upload to Hugging Face Hub if requested
if save_to_hub:
try:
repo_id = create_hf_repo(repo_name, repo_type, is_private)
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo="annotated_data.json",
repo_id=repo_id,
repo_type=repo_type,
token=HF_TOKEN
)
status_messages.append(f"Successfully uploaded to Hugging Face Hub repository: {repo_id}")
except Exception as e:
status_messages.append(f"Error with Hugging Face Hub: {str(e)}")
return "\n".join(status_messages)
except Exception as e:
return f"Error during annotation: {str(e)}"
def convert_hf_dataset_to_ner_format(dataset):
"""Convert Hugging Face dataset to NER format"""
converted_data = []
for item in dataset:
# Assuming the dataset has 'tokens' and 'ner_tags' fields
# Adjust the field names based on your dataset structure
if 'tokens' in item and 'ner_tags' in item:
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(item['tokens'], item['ner_tags'])):
if tag != 'O': # Not Outside
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": item['tokens'],
"ner": ner_spans,
"validated": False
})
return converted_data
def load_from_huggingface(dataset_name: str, split: str = "all"):
"""Load dataset from Hugging Face Hub"""
try:
dataset = load_dataset(dataset_name, split=split)
converted_data = convert_hf_dataset_to_ner_format(dataset)
# Save the converted data
os.makedirs("data", exist_ok=True)
with open("data/annotated_data.json", "wt") as file:
json.dump(converted_data, file, ensure_ascii=False)
return f"Successfully loaded and converted dataset: {dataset_name}"
except Exception as e:
return f"Error loading dataset: {str(e)}"
def load_from_local_file(file_path: str, file_format: str = "json"):
"""Load and convert data from local file in various formats"""
try:
if file_format == "json":
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
# If data is already in the correct format
if all("tokenized_text" in item and "ner" in item for item in data):
return data
# Convert from other JSON formats
converted_data = []
for item in data:
if "tokens" in item and "ner_tags" in item:
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(item["tokens"], item["ner_tags"])):
if tag != "O":
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": item["tokens"],
"ner": ner_spans,
"validated": False
})
return converted_data
else:
raise ValueError("JSON file must contain a list of examples")
elif file_format == "conll":
converted_data = []
current_example = {"tokens": [], "ner_tags": []}
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
if line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
token, tag = parts[0], parts[-1]
current_example["tokens"].append(token)
current_example["ner_tags"].append(tag)
elif current_example["tokens"]:
# Convert current example
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])):
if tag != "O":
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": current_example["tokens"],
"ner": ner_spans,
"validated": False
})
current_example = {"tokens": [], "ner_tags": []}
# Handle last example if exists
if current_example["tokens"]:
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])):
if tag != "O":
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": current_example["tokens"],
"ner": ner_spans,
"validated": False
})
return converted_data
elif file_format == "txt":
# Simple text file with one sentence per line
converted_data = []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
tokens = tokenize_text(line)
converted_data.append({
"tokenized_text": tokens,
"ner": [],
"validated": False
})
return converted_data
else:
raise ValueError(f"Unsupported file format: {file_format}")
except Exception as e:
raise Exception(f"Error loading file: {str(e)}")
def process_local_file(file_obj, file_format):
"""Process uploaded local file"""
if file_obj is None:
return "Please upload a file first!"
try:
# Load and convert the data
data = load_from_local_file(file_obj.name, file_format)
# Save the converted data
os.makedirs("data", exist_ok=True)
with open("data/annotated_data.json", "wt") as file:
json.dump(data, file, ensure_ascii=False)
return f"Successfully loaded and converted {len(data)} examples from {file_format} file!"
except Exception as e:
return f"Error processing file: {str(e)}"
# Add a function to download the annotated data
def download_annotated_data():
file_path = "data/annotated_data.json"
if os.path.exists(file_path):
return file_path
else:
return None
def download_to_folder():
"""Download annotated data to a local folder"""
try:
source_path = "data/annotated_data.json"
if not os.path.exists(source_path):
return "No annotated data found!"
# Create downloads directory if it doesn't exist
download_dir = os.path.expanduser("~/Downloads")
os.makedirs(download_dir, exist_ok=True)
# Copy file to downloads folder
import shutil
dest_path = os.path.join(download_dir, "annotated_data.json")
shutil.copy2(source_path, dest_path)
return f"Successfully downloaded to {dest_path}"
except Exception as e:
return f"Error downloading file: {str(e)}"
def update_hf_dataset(repo_name: str, repo_type: str = "dataset", is_private: bool = False):
"""Update or create a Hugging Face dataset with the current annotated data"""
try:
if not dynamic_dataset or not dynamic_dataset.data:
return "No data to upload! Please load or annotate data first."
# Save current data to local file
os.makedirs("data", exist_ok=True)
local_path = "data/annotated_data.json"
with open(local_path, "wt") as file:
json.dump(dynamic_dataset.data, file, ensure_ascii=False)
# Create or update repository
try:
repo_id = create_hf_repo(repo_name, repo_type, is_private)
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo="annotated_data.json",
repo_id=repo_id,
repo_type=repo_type,
token=HF_TOKEN
)
return f"Successfully uploaded to Hugging Face Hub repository: {repo_id}"
except Exception as e:
if "already exists" in str(e):
# If repo exists, just update the file
user = api.whoami()['name']
repo_id = f"{user}/{repo_name}"
api.upload_file(
path_or_fileobj=local_path,
path_in_repo="annotated_data.json",
repo_id=repo_id,
repo_type=repo_type,
token=HF_TOKEN
)
return f"Successfully updated existing repository: {repo_id}"
else:
raise e
except Exception as e:
return f"Error updating Hugging Face dataset: {str(e)}"
# Create the main interface with tabs
with gr.Blocks() as demo:
gr.Markdown("# NER Annotation Tool")
with gr.Tabs():
with gr.TabItem("Auto Annotation"):
with gr.Row():
with gr.Column():
file_uploader = gr.File(label="Upload text file (one sentence per line)")
upload_status = gr.Textbox(label="Upload Status")
file_uploader.change(fn=process_uploaded_file, inputs=[file_uploader], outputs=[upload_status])
with gr.Column():
model = gr.Dropdown(
label="Choose the model for annotation",
choices=AVAILABLE_MODELS,
value=AVAILABLE_MODELS[0]
)
labels = gr.Textbox(
label="Labels",
placeholder="Enter comma-separated labels (e.g., PERSON,ORG,LOC)",
scale=2
)
threshold = gr.Slider(
0, 1,
value=0.3,
step=0.01,
label="Threshold",
info="Lower threshold increases entity predictions"
)
prompt = gr.Textbox(
label="Prompt",
placeholder="Enter your annotation prompt (optional)",
scale=2
)
with gr.Group():
gr.Markdown("### Save Options")
save_to_hub = gr.Checkbox(
label="Save to Hugging Face Hub",
value=False
)
with gr.Group(visible=False) as hub_settings:
gr.Markdown("#### Hugging Face Hub Settings")
repo_name = gr.Textbox(
label="Repository Name",
placeholder="Enter repository name (e.g., my-ner-dataset)",
scale=2
)
repo_type = gr.Dropdown(
choices=["dataset", "model", "space"],
value="dataset",
label="Repository Type"
)
is_private = gr.Checkbox(
label="Private Repository",
value=False
)
annotate_btn = gr.Button("Annotate Data")
output_info = gr.Textbox(label="Processing Status")
# Add download buttons for annotated data
with gr.Row():
download_btn_annot = gr.Button("Download Annotated Data", visible=False)
download_file_annot = gr.File(label="Download", interactive=False, visible=False)
download_status = gr.Textbox(label="Download Status", visible=False)
def toggle_hub_settings(save_to_hub):
return {
hub_settings: gr.update(visible=save_to_hub)
}
save_to_hub.change(
fn=toggle_hub_settings,
inputs=[save_to_hub],
outputs=[hub_settings]
)
def show_download_buttons(status):
# Show download buttons only if annotation was successful
if status and status.startswith("Successfully annotated and saved locally"):
return gr.update(visible=True), gr.update(visible=True)
return gr.update(visible=False), gr.update(visible=False)
annotate_btn.click(
fn=annotate,
inputs=[
model, labels, threshold, prompt,
save_to_hub, repo_name, repo_type, is_private
],
outputs=[output_info]
)
output_info.change(
fn=show_download_buttons,
inputs=[output_info],
outputs=[download_btn_annot, download_status]
)
def handle_download_annot():
file_path = download_annotated_data()
if file_path:
return gr.update(value=file_path, visible=True)
else:
return gr.update(visible=False)
download_btn_annot.click(fn=handle_download_annot, inputs=None, outputs=[download_file_annot])
with gr.TabItem("Dataset Viewer"):
with gr.Row():
with gr.Column():
with gr.Row():
load_local_btn = gr.Button("Load Local Dataset")
load_hf_btn = gr.Button("Load from Hugging Face")
local_file = gr.File(label="Upload Local Dataset", visible=False)
file_format = gr.Dropdown(
choices=["json", "conll", "txt"],
value="json",
label="File Format",
visible=False
)
local_status = gr.Textbox(label="Local File Status", visible=False)
with gr.Group(visible=False) as hf_inputs:
with gr.Row():
dataset_name = gr.Textbox(
label="Hugging Face Dataset Name",
placeholder="Enter dataset name (e.g., conll2003)",
scale=3
)
dataset_split = gr.Dropdown(
choices=["train", "validation", "test"],
value="train",
label="Dataset Split",
scale=2
)
load_dataset_btn = gr.Button("Load Dataset", scale=1)
hf_status = gr.Textbox(label="Dataset Loading Status")
bar = gr.Slider(
minimum=0,
maximum=1,
step=1,
label="Progress",
interactive=True,
info="Use slider to navigate through examples"
)
with gr.Row():
previous_btn = gr.Button("Previous example")
apply_btn = gr.Button("Apply changes")
next_btn = gr.Button("Next example")
validate_btn = gr.Button("Validate")
save_btn = gr.Button("Save validated dataset")
# Add Hugging Face upload section
with gr.Group(visible=False) as hf_upload_group:
gr.Markdown("### Upload to Hugging Face")
hf_repo_name = gr.Textbox(
label="Repository Name",
placeholder="Enter repository name (e.g., my-ner-dataset)",
scale=2
)
hf_repo_type = gr.Dropdown(
choices=["dataset", "model", "space"],
value="dataset",
label="Repository Type"
)
hf_is_private = gr.Checkbox(
label="Private Repository",
value=False
)
upload_to_hf_btn = gr.Button("Upload to Hugging Face")
hf_upload_status = gr.Textbox(label="Upload Status")
with gr.Row():
show_hf_upload_btn = gr.Button("Show Upload Options")
hide_hf_upload_btn = gr.Button("Hide Upload Options", visible=False)
def toggle_hf_upload(show: bool):
return {
hf_upload_group: gr.update(visible=show),
show_hf_upload_btn: gr.update(visible=not show),
hide_hf_upload_btn: gr.update(visible=show)
}
show_hf_upload_btn.click(
fn=lambda: toggle_hf_upload(True),
inputs=None,
outputs=[hf_upload_group, show_hf_upload_btn, hide_hf_upload_btn]
)
hide_hf_upload_btn.click(
fn=lambda: toggle_hf_upload(False),
inputs=None,
outputs=[hf_upload_group, show_hf_upload_btn, hide_hf_upload_btn]
)
inp_box = gr.HighlightedText(value=None, interactive=True)
def toggle_local_inputs():
return {
local_file: gr.update(visible=True),
file_format: gr.update(visible=True),
local_status: gr.update(visible=True),
hf_inputs: gr.update(visible=False)
}
def toggle_hf_inputs():
return {
local_file: gr.update(visible=False),
file_format: gr.update(visible=False),
local_status: gr.update(visible=False),
hf_inputs: gr.update(visible=True)
}
load_local_btn.click(
fn=toggle_local_inputs,
inputs=None,
outputs=[local_file, file_format, local_status, hf_inputs]
)
load_hf_btn.click(
fn=toggle_hf_inputs,
inputs=None,
outputs=[local_file, file_format, local_status, hf_inputs]
)
def process_and_load_local(file_obj, format):
status = process_local_file(file_obj, format)
if "Successfully" in status:
return load_dataset()
return [status], 0, 0
local_file.change(
fn=process_and_load_local,
inputs=[local_file, file_format],
outputs=[inp_box, bar]
)
def load_hf_dataset(name, split):
status = load_from_huggingface(name, split)
if "Successfully" in status:
return load_dataset(), status
return [status], 0, 0, status
load_dataset_btn.click(
fn=load_hf_dataset,
inputs=[dataset_name, dataset_split],
outputs=[inp_box, bar, hf_status]
)
apply_btn.click(fn=update_example, inputs=inp_box, outputs=inp_box)
save_btn.click(fn=save_dataset, inputs=inp_box, outputs=inp_box)
validate_btn.click(fn=validate_example, inputs=None, outputs=inp_box)
next_btn.click(fn=next_example, inputs=None, outputs=[inp_box, bar])
previous_btn.click(fn=previous_example, inputs=None, outputs=[inp_box, bar])
bar.change(
fn=example_by_id,
inputs=[bar],
outputs=[inp_box, bar],
api_name="example_by_id"
)
# Add Hugging Face upload functionality
upload_to_hf_btn.click(
fn=update_hf_dataset,
inputs=[hf_repo_name, hf_repo_type, hf_is_private],
outputs=[hf_upload_status]
)
demo.launch()