dt / app /translate /db.py
gitdeem's picture
Upload 96 files
4e9efe9 verified
import sqlite3
from urllib.parse import urlparse
import pymysql
import os
from dotenv import load_dotenv, find_dotenv
from threading import Lock
_ = load_dotenv(find_dotenv()) # read local .env file
def get_conn61():
try:
# 获取并清理数据库路径
sqlite_db_path = os.environ.get('PROD_DATABASE_URL')
if not sqlite_db_path:
raise ValueError("Database URL not found in environment variables.")
# 移除 'sqlite:///' 前缀
if sqlite_db_path.startswith('sqlite:///'):
sqlite_db_path = sqlite_db_path[len('sqlite:///'):]
# 连接到SQLite数据库
conn = sqlite3.connect(sqlite_db_path)
return conn
except Exception as e:
print(f"Error connecting to database: {e}")
raise
def get_conn():
try:
# 获取数据库 URL
db_url = os.environ.get('PROD_DATABASE_URL')
if not db_url:
raise ValueError("Database URL not found in environment variables.")
# 判断是否是 SQLite 链接
if db_url.startswith('sqlite:///'):
# 保留原有的 SQLite 逻辑
sqlite_db_path = db_url[len('sqlite:///'):]
conn = sqlite3.connect(sqlite_db_path)
return conn
# 判断是否是 MySQL 链接
elif db_url.startswith('mysql+pymysql://'):
# 解析 MySQL URL
parsed_url = urlparse(db_url)
mysql_host = parsed_url.hostname
mysql_port = parsed_url.port or 3306 # 默认端口 3306
mysql_db = parsed_url.path.lstrip('/')
mysql_user = parsed_url.username
mysql_password = parsed_url.password
# 连接到 MySQL 数据库
conn = pymysql.connect(
host=mysql_host,
port=mysql_port,
user=mysql_user,
password=mysql_password,
db=mysql_db,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return conn
else:
raise ValueError(f"Unsupported database URL: {db_url}")
except Exception as e:
print(f"Error connecting to database: {e}")
raise
def execute(sql, *params):
conn = get_conn()
lock=Lock()
lock.acquire()
cursor=conn.cursor()
try:
cursor.execute(sql, params)
conn.commit()
lock.release()
cursor.close()
conn.close()
except:
lock.release()
conn.rollback()
def get(sql, *params):
conn=get_conn()
lock=Lock()
lock.acquire()
try:
cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute(sql, params)
result=cursor.fetchone()
lock.release()
cursor.close()
conn.close()
return result
except:
lock.release()
return []