| from flask import Flask, request | |
| import requests | |
| import base64 | |
| import json | |
| import re | |
| app = Flask(__name__) | |
| def parse_node_input(node_input): | |
| # 使用正则表达式提取 proxies 部分 | |
| proxies_match = re.search(r'proxies:\s*\n([\s\S]*?)(?:\n\w+:|$)', node_input) | |
| if not proxies_match: | |
| return [] | |
| proxies_text = proxies_match.group(1) | |
| nodes = re.findall(r'^\s*-\s*({[^}]+})', proxies_text, re.MULTILINE) | |
| parsed_nodes = [] | |
| for node in nodes: | |
| node_dict = {} | |
| items = re.findall(r'(\w+):\s*([^,}\n]+)', node) | |
| for key, value in items: | |
| if key in ['port', 'alterId']: | |
| node_dict[key] = int(value) | |
| elif key in ['udp', 'tls']: | |
| node_dict[key] = value.lower() == 'true' | |
| else: | |
| node_dict[key] = value.strip("'\"") | |
| # 处理 ws-opts | |
| ws_opts_match = re.search(r'ws-opts:\s*({[^}]+})', node) | |
| if ws_opts_match: | |
| ws_opts = ws_opts_match.group(1) | |
| path_match = re.search(r'path:\s*([^,}]+)', ws_opts) | |
| if path_match: | |
| node_dict['ws-opts.path'] = path_match.group(1).strip("'\"") | |
| headers_match = re.search(r'headers:\s*({[^}]+})', ws_opts) | |
| if headers_match: | |
| headers = headers_match.group(1) | |
| host_match = re.search(r'Host:\s*([^,}]+)', headers) | |
| if host_match: | |
| node_dict['ws-opts.headers.Host'] = host_match.group(1).strip("'\"") | |
| parsed_nodes.append(node_dict) | |
| return parsed_nodes | |
| def convert_to_vmess(node): | |
| try: | |
| path = node.get('ws-opts.path', '') | |
| vmess_node = { | |
| "v": "2", | |
| "ps": node['name'], | |
| "add": node['server'], | |
| "port": str(node['port']), | |
| "id": node['uuid'], | |
| "aid": str(node.get('alterId', 0)), | |
| "scy": node.get('cipher', 'auto'), | |
| "net": node.get('network', 'tcp'), | |
| "type": "none", | |
| "host": node.get('ws-opts.headers.Host', node.get('servername', "")), | |
| "path": path, | |
| "tls": "tls" if node.get('tls', False) else "" | |
| } | |
| json_str = json.dumps(vmess_node, separators=(',', ':')) | |
| vmess_base64 = base64.b64encode(json_str.encode()).decode() | |
| return f"vmess://{vmess_base64}" | |
| except KeyError as e: | |
| return f"Error in VMess conversion: Missing {e} key" | |
| def convert_to_vless(node): | |
| try: | |
| params = [] | |
| if node.get('tls', False): | |
| params.append("security=tls") | |
| if node.get('flow', ''): | |
| params.append(f"flow={node['flow']}") | |
| if node.get('servername', ''): | |
| params.append(f"sni={node['servername']}") | |
| if node.get('ws-opts.path', ''): | |
| params.append(f"path={node['ws-opts.path']}") | |
| if node.get('ws-opts.headers.Host', ''): | |
| params.append(f"host={node['ws-opts.headers.Host']}") | |
| if node.get('client-fingerprint', ''): | |
| params.append(f"fp={node['client-fingerprint']}") | |
| param_str = "&".join(params) | |
| vless_link = f"vless://{node['uuid']}@{node['server']}:{node['port']}?{param_str}#{node['name']}" | |
| return vless_link | |
| except KeyError as e: | |
| return f"Error in VLess conversion: Missing {e} key" | |
| def convert_to_trojan(node): | |
| try: | |
| trojan_link = f"trojan://{node['password']}@{node['server']}:{node['port']}?sni={node.get('sni', '')}#{node['name']}" | |
| return trojan_link | |
| except KeyError as e: | |
| return f"Error in Trojan conversion: Missing {e} key" | |
| def convert_to_ss(node): | |
| try: | |
| ss_link = f"ss://{base64.b64encode(f'{node['cipher']}:{node['password']}'.encode()).decode()}@{node['server']}:{node['port']}#{node['name']}" | |
| return ss_link | |
| except KeyError as e: | |
| return f"Error in Shadowsocks conversion: Missing {e} key" | |
| def convert_to_hysteria2(node): | |
| try: | |
| hysteria2_link = ( | |
| f"hysteria2://{node['password']}@{node['server']}:{node['port']}" | |
| f"?auth={node.get('auth', '')}&skip-cert-verify={str(node.get('skip-cert-verify', False)).lower()}" | |
| f"&udp={str(node.get('udp', False)).lower()}#{node['name']}" | |
| ) | |
| return hysteria2_link | |
| except KeyError as e: | |
| return f"Error in Hysteria2 conversion: Missing {e} key" | |
| @app.route('/', methods=['GET']) | |
| def convert_nodes(): | |
| url = request.args.get('url') | |
| if not url: | |
| return "Please provide a URL", 400 | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| node_input = response.text | |
| except requests.RequestException as e: | |
| return f"Error fetching URL: {str(e)}", 500 | |
| nodes = parse_node_input(node_input) | |
| converted_nodes = [] | |
| for node in nodes: | |
| if node['type'] == 'ss': | |
| converted_nodes.append(convert_to_ss(node)) | |
| elif node['type'] == 'vmess': | |
| converted_nodes.append(convert_to_vmess(node)) | |
| elif node['type'] == 'vless': | |
| converted_nodes.append(convert_to_vless(node)) | |
| elif node['type'] == 'trojan': | |
| converted_nodes.append(convert_to_trojan(node)) | |
| elif node['type'] == 'hysteria2': | |
| converted_nodes.append(convert_to_hysteria2(node)) | |
| else: | |
| converted_nodes.append(f"Unknown node type: {node['type']}") | |
| result = "\n".join(converted_nodes) | |
| return result, 200, {'Content-Type': 'text/plain; charset=utf-8'} | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=8080) | |