Spaces:
Sleeping
Sleeping
from typing import Any | |
from mcp.server.fastmcp import FastMCP | |
import http.client | |
import json | |
import requests | |
mindverse = FastMCP("mindverse_public") | |
url = "app.secondme.io" | |
messages =[] | |
async def get_response(query:str, instance_id:str) -> str | None: | |
""" | |
Received a response based on public secondme model. | |
Args: | |
query (str): Questions raised by users regarding the secondme model. | |
instance_id (str): ID used to identify the secondme model, or url used to identify the secondme model. | |
""" | |
id = instance_id.split('/')[-1] | |
path = f"/api/chat/{id}" | |
headers = {"Content-Type": "application/json"} | |
messages.append({"role": "user", "content": query}) | |
data = { | |
"messages": messages, | |
"metadata": { | |
"enable_l0_retrieval": False, | |
"role_id": "default_role" | |
}, | |
"temperature": 0.7, | |
"max_tokens": 2000, | |
"stream": True | |
} | |
conn = http.client.HTTPSConnection(url) | |
# Send the POST request | |
conn.request("POST", path, body=json.dumps(data), headers=headers) | |
# Get the response | |
response = conn.getresponse() | |
full_content = "" | |
for line in response: | |
if line: | |
decoded_line = line.decode('utf-8').strip() | |
if decoded_line == 'data: [DONE]': | |
break | |
if decoded_line.startswith('data: '): | |
try: | |
json_str = decoded_line[6:] | |
chunk = json.loads(json_str) | |
content = chunk['choices'][0]['delta'].get('content', '') | |
if content: | |
full_content += content | |
except json.JSONDecodeError: | |
pass | |
conn.close() | |
if full_content: | |
messages.append({"role": "system", "content": full_content}) | |
return full_content | |
else: | |
return None | |
async def get_online_instances(): | |
""" | |
Check which secondme models are available for chatting online. | |
""" | |
url = "https://app.secondme.io/api/upload/list?page_size=100" | |
response = requests.get(url) | |
if response.status_code == 200: | |
data = response.json() | |
items = data.get("data", {}).get("items", []) | |
online_items = [ | |
{ | |
"upload_name": item["upload_name"], | |
"instance_id": item["instance_id"], | |
"description": item["description"] | |
} | |
for item in items if item.get("status") == "online" | |
] | |
return json.dumps(online_items, ensure_ascii=False, indent=2) | |
else: | |
raise Exception(f"Request failed with status code: {response.status_code}") | |
if __name__ == "__main__": | |
mindverse.run(transport='stdio') | |