Spaces:
Running
on
Zero
Running
on
Zero
""" | |
DNA-Diffusion Gradio Application | |
Interactive DNA sequence generation with slot machine visualization and protein analysis | |
""" | |
import gradio as gr | |
import logging | |
import json | |
import os | |
from typing import Dict, Any, Tuple | |
import html | |
import requests | |
import time | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Try to import spaces for GPU decoration | |
try: | |
import spaces | |
SPACES_AVAILABLE = True | |
except ImportError: | |
SPACES_AVAILABLE = False | |
# Create a dummy decorator if spaces is not available | |
class spaces: | |
def GPU(duration=60): | |
def decorator(func): | |
return func | |
return decorator | |
# Try to import model, but allow app to run without it for UI development | |
try: | |
from dna_diffusion_model import DNADiffusionModel, get_model | |
MODEL_AVAILABLE = True | |
logger.info("DNA-Diffusion model module loaded successfully") | |
except ImportError as e: | |
logger.warning(f"DNA-Diffusion model not available: {e}") | |
MODEL_AVAILABLE = False | |
# Load the HTML interface | |
HTML_FILE = "dna-slot-machine.html" | |
if not os.path.exists(HTML_FILE): | |
raise FileNotFoundError(f"HTML interface file '{HTML_FILE}' not found. Please ensure it exists in the same directory as app.py") | |
with open(HTML_FILE, "r") as f: | |
SLOT_MACHINE_HTML = f.read() | |
class ProteinAnalyzer: | |
"""Handles protein translation and analysis using LLM""" | |
# Genetic code table for DNA to amino acid translation | |
CODON_TABLE = { | |
'TTT': 'F', 'TTC': 'F', 'TTA': 'L', 'TTG': 'L', | |
'TCT': 'S', 'TCC': 'S', 'TCA': 'S', 'TCG': 'S', | |
'TAT': 'Y', 'TAC': 'Y', 'TAA': '*', 'TAG': '*', | |
'TGT': 'C', 'TGC': 'C', 'TGA': '*', 'TGG': 'W', | |
'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L', | |
'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', | |
'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q', | |
'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', | |
'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M', | |
'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', | |
'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K', | |
'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R', | |
'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V', | |
'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', | |
'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E', | |
'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' | |
} | |
def dna_to_protein(dna_sequence: str) -> str: | |
"""Translate DNA sequence to protein sequence""" | |
# Ensure sequence is uppercase | |
dna_sequence = dna_sequence.upper() | |
# Remove any non-DNA characters | |
dna_sequence = ''.join(c for c in dna_sequence if c in 'ATCG') | |
# Translate to protein | |
protein = [] | |
for i in range(0, len(dna_sequence) - 2, 3): | |
codon = dna_sequence[i:i+3] | |
if len(codon) == 3: | |
amino_acid = ProteinAnalyzer.CODON_TABLE.get(codon, 'X') | |
if amino_acid == '*': # Stop codon | |
break | |
protein.append(amino_acid) | |
return ''.join(protein) | |
def analyze_protein_with_llm(protein_sequence: str, cell_type: str, language: str = "en") -> str: | |
"""Analyze protein structure and function using Friendli LLM API""" | |
# Get API token from environment | |
token = os.getenv("FRIENDLI_TOKEN") | |
if not token: | |
logger.warning("FRIENDLI_TOKEN not found in environment variables") | |
if language == "ko": | |
return "단백질 분석 불가: API 토큰이 설정되지 않았습니다" | |
return "Protein analysis unavailable: API token not configured" | |
try: | |
url = "https://api.friendli.ai/dedicated/v1/chat/completions" | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json" | |
} | |
# Create prompt for protein analysis based on language | |
if language == "ko": | |
prompt = f"""당신은 생물정보학 전문가입니다. 다음 단백질 서열을 분석하고 잠재적인 구조와 기능에 대한 통찰력을 제공해주세요. | |
단백질 서열: {protein_sequence} | |
세포 유형: {cell_type} | |
다음 내용을 포함해주세요: | |
1. 서열 패턴을 기반으로 예측되는 단백질 패밀리 또는 도메인 | |
2. 잠재적인 구조적 특징 (알파 나선, 베타 시트, 루프) | |
3. 가능한 생물학적 기능 | |
4. {cell_type} 세포 유형과의 관련성 | |
5. 주목할 만한 서열 모티프나 특성 | |
과학 애플리케이션에 표시하기에 적합하도록 간결하면서도 유익한 응답을 작성해주세요.""" | |
else: | |
prompt = f"""You are a bioinformatics expert. Analyze the following protein sequence and provide insights about its potential structure and function. | |
Protein sequence: {protein_sequence} | |
Cell type context: {cell_type} | |
Please provide: | |
1. Predicted protein family or domain based on sequence patterns | |
2. Potential structural features (alpha helices, beta sheets, loops) | |
3. Possible biological functions | |
4. Relevance to the {cell_type} cell type | |
5. Any notable sequence motifs or characteristics | |
Keep the response concise but informative, suitable for display in a scientific application.""" | |
payload = { | |
"model": "dep89a2fld32mcm", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": "You are a knowledgeable bioinformatics assistant specializing in protein structure and function prediction." if language == "en" else "당신은 단백질 구조와 기능 예측을 전문으로 하는 지식이 풍부한 생물정보학 어시스턴트입니다." | |
}, | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
], | |
"max_tokens": 1000, | |
"temperature": 0.7, | |
"top_p": 0.8, | |
"stream": False # Disable streaming for simplicity | |
} | |
response = requests.post(url, json=payload, headers=headers, timeout=30) | |
response.raise_for_status() | |
result = response.json() | |
analysis = result['choices'][0]['message']['content'] | |
return analysis | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to analyze protein with LLM: {e}") | |
return f"Protein analysis failed: {str(e)}" | |
except Exception as e: | |
logger.error(f"Unexpected error during protein analysis: {e}") | |
return "Protein analysis unavailable due to an error" | |
class DNADiffusionApp: | |
"""Main application class for DNA-Diffusion Gradio interface""" | |
def __init__(self): | |
self.model = None | |
self.model_loading = False | |
self.model_error = None | |
self.protein_analyzer = ProteinAnalyzer() | |
def initialize_model(self): | |
"""Initialize the DNA-Diffusion model""" | |
if not MODEL_AVAILABLE: | |
self.model_error = "DNA-Diffusion model module not available. Please install dependencies." | |
return | |
if self.model_loading: | |
return | |
self.model_loading = True | |
try: | |
logger.info("Starting model initialization...") | |
self.model = get_model() | |
logger.info("Model initialized successfully!") | |
self.model_error = None | |
except Exception as e: | |
logger.error(f"Failed to initialize model: {e}") | |
self.model_error = str(e) | |
self.model = None | |
finally: | |
self.model_loading = False | |
def generate_sequence(self, cell_type: str, guidance_scale: float = 1.0) -> Tuple[str, Dict[str, Any]]: | |
"""Generate a DNA sequence using the model or mock data""" | |
# Use mock generation if model is not available | |
if not MODEL_AVAILABLE or self.model is None: | |
logger.warning("Using mock sequence generation") | |
import random | |
sequence = ''.join(random.choice(['A', 'T', 'C', 'G']) for _ in range(200)) | |
metadata = { | |
'cell_type': cell_type, | |
'guidance_scale': guidance_scale, | |
'generation_time': 2.0, | |
'mock': True | |
} | |
# Simulate generation time | |
time.sleep(2.0) | |
return sequence, metadata | |
# Use real model | |
try: | |
result = self.model.generate(cell_type, guidance_scale) | |
return result['sequence'], result['metadata'] | |
except Exception as e: | |
logger.error(f"Generation failed: {e}") | |
raise | |
def handle_generation_request(self, cell_type: str, guidance_scale: float, language: str = "en"): | |
"""Handle sequence generation request from Gradio""" | |
try: | |
logger.info(f"Generating sequence for cell type: {cell_type}, language: {language}") | |
# Generate DNA sequence | |
sequence, metadata = self.generate_sequence(cell_type, guidance_scale) | |
# Translate to protein | |
logger.info("Translating DNA to protein sequence...") | |
protein_sequence = self.protein_analyzer.dna_to_protein(sequence) | |
# Add protein sequence to metadata | |
metadata['protein_sequence'] = protein_sequence | |
metadata['protein_length'] = len(protein_sequence) | |
# Analyze protein with LLM | |
logger.info("Analyzing protein structure and function...") | |
protein_analysis = self.protein_analyzer.analyze_protein_with_llm( | |
protein_sequence, cell_type, language | |
) | |
# Add analysis to metadata | |
metadata['protein_analysis'] = protein_analysis | |
logger.info("Generation and analysis complete") | |
return sequence, json.dumps(metadata) | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Generation request failed: {error_msg}") | |
return "", json.dumps({"error": error_msg}) | |
# Create single app instance | |
app = DNADiffusionApp() | |
def create_demo(): | |
"""Create the Gradio demo interface""" | |
# CSS to hide backend controls and prevent scrolling | |
css = """ | |
#hidden-controls { display: none !important; } | |
.gradio-container { | |
overflow: hidden; | |
background-color: #000000 !important; | |
} | |
#dna-frame { overflow: hidden; position: relative; } | |
body { | |
background-color: #000000 !important; | |
} | |
""" | |
# JavaScript for handling communication between iframe and Gradio | |
js = """ | |
function() { | |
console.log('Initializing DNA-Diffusion Gradio interface...'); | |
// Set up message listener to receive requests from iframe | |
window.addEventListener('message', function(event) { | |
console.log('Parent received message:', event.data); | |
if (event.data.type === 'generate_request') { | |
console.log('Triggering generation for cell type:', event.data.cellType); | |
console.log('Language:', event.data.language); | |
// Update the hidden cell type input | |
const radioInputs = document.querySelectorAll('#cell-type-input input[type="radio"]'); | |
radioInputs.forEach(input => { | |
if (input.value === event.data.cellType) { | |
input.checked = true; | |
// Trigger change event | |
input.dispatchEvent(new Event('change')); | |
} | |
}); | |
// Update the language input | |
const langInputs = document.querySelectorAll('#language-input input[type="radio"]'); | |
langInputs.forEach(input => { | |
if (input.value === event.data.language) { | |
input.checked = true; | |
input.dispatchEvent(new Event('change')); | |
} | |
}); | |
// Small delay to ensure radio button update is processed | |
setTimeout(() => { | |
document.querySelector('#generate-btn').click(); | |
}, 100); | |
} | |
}); | |
// Function to send sequence to iframe | |
window.sendSequenceToIframe = function(sequence, metadata) { | |
console.log('Sending sequence to iframe:', sequence); | |
const iframe = document.querySelector('#dna-frame iframe'); | |
if (iframe && iframe.contentWindow) { | |
try { | |
const meta = JSON.parse(metadata); | |
if (meta.error) { | |
iframe.contentWindow.postMessage({ | |
type: 'generation_error', | |
error: meta.error | |
}, '*'); | |
} else { | |
iframe.contentWindow.postMessage({ | |
type: 'sequence_generated', | |
sequence: sequence, | |
metadata: meta | |
}, '*'); | |
} | |
} catch (e) { | |
console.error('Failed to parse metadata:', e); | |
// If parsing fails, still send the sequence | |
iframe.contentWindow.postMessage({ | |
type: 'sequence_generated', | |
sequence: sequence, | |
metadata: {} | |
}, '*'); | |
} | |
} else { | |
console.error('Could not find iframe'); | |
} | |
}; | |
} | |
""" | |
with gr.Blocks(css=css, js=js, theme=gr.themes.Base()) as demo: | |
# Hidden controls for backend processing | |
with gr.Column(elem_id="hidden-controls", visible=False): | |
cell_type_input = gr.Radio( | |
["K562", "GM12878", "HepG2"], | |
value="K562", | |
label="Cell Type", | |
elem_id="cell-type-input" | |
) | |
language_input = gr.Radio( | |
["en", "ko"], | |
value="en", | |
label="Language", | |
elem_id="language-input" | |
) | |
guidance_input = gr.Slider( | |
minimum=1.0, | |
maximum=10.0, | |
value=1.0, | |
step=0.5, | |
label="Guidance Scale", | |
elem_id="guidance-input" | |
) | |
generate_btn = gr.Button("Generate", elem_id="generate-btn") | |
sequence_output = gr.Textbox(label="Sequence", elem_id="sequence-output") | |
metadata_output = gr.Textbox(label="Metadata", elem_id="metadata-output") | |
# Main interface - the slot machine in an iframe | |
# Escape the HTML content for srcdoc | |
escaped_html = html.escape(SLOT_MACHINE_HTML, quote=True) | |
iframe_html = f'<iframe srcdoc="{escaped_html}" style="width: 100%; height: 800px; border: none; display: block;"></iframe>' | |
html_display = gr.HTML( | |
iframe_html, | |
elem_id="dna-frame" | |
) | |
# Wire up the generation | |
generate_btn.click( | |
fn=app.handle_generation_request, | |
inputs=[cell_type_input, guidance_input, language_input], | |
outputs=[sequence_output, metadata_output] | |
).then( | |
fn=None, | |
inputs=[sequence_output, metadata_output], | |
outputs=None, | |
js="(seq, meta) => sendSequenceToIframe(seq, meta)" | |
) | |
# Initialize model on load | |
demo.load( | |
fn=app.initialize_model, | |
inputs=None, | |
outputs=None | |
) | |
return demo | |
# Launch the app | |
if __name__ == "__main__": | |
demo = create_demo() | |
# Parse any command line arguments | |
import argparse | |
parser = argparse.ArgumentParser(description="DNA-Diffusion Gradio App") | |
parser.add_argument("--share", action="store_true", help="Create a public shareable link") | |
parser.add_argument("--port", type=int, default=7860, help="Port to run the app on") | |
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to run the app on") | |
args = parser.parse_args() | |
# For Hugging Face Spaces deployment | |
import os | |
if os.getenv("SPACE_ID"): | |
# Running on Hugging Face Spaces | |
args.host = "0.0.0.0" | |
args.port = 7860 | |
args.share = False | |
inbrowser = False | |
else: | |
inbrowser = True | |
logger.info(f"Starting DNA-Diffusion Gradio app on {args.host}:{args.port}") | |
demo.launch( | |
share=args.share, | |
server_name=args.host, | |
server_port=args.port, | |
inbrowser=inbrowser | |
) |