66 lines
2.3 KiB
Python
66 lines
2.3 KiB
Python
import requests
|
|
from PySide6.QtCore import QObject, Signal
|
|
from common.utils import APIException, BASE_URL
|
|
|
|
|
|
# --- API 工作线程 ---
|
|
class ApiWorker(QObject):
|
|
"""在单独的线程中执行网络请求,避免 GUI 冻结"""
|
|
# 定义信号
|
|
queryAllFinished = Signal(list)
|
|
addOneFinished = Signal(dict)
|
|
updateOneFinished = Signal(dict)
|
|
deleteOneFinished = Signal(dict)
|
|
error = Signal(str)
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.session = requests.Session()
|
|
|
|
@staticmethod
|
|
def _handle_response(response):
|
|
"""辅助函数:检查 HTTP 响应"""
|
|
if not response.ok:
|
|
try:
|
|
detail = response.json().get("detail", response.text)
|
|
raise APIException(f"API 错误 (状态 {response.status_code}): {detail}")
|
|
except requests.JSONDecodeError:
|
|
raise APIException(f"API 错误 (状态 {response.status_code}): {response.text}")
|
|
return response.json()
|
|
|
|
def do_query_all(self):
|
|
"""执行 query_all 操作"""
|
|
try:
|
|
response = self.session.get(f"{BASE_URL}/query_all")
|
|
data = self._handle_response(response)
|
|
self.queryAllFinished.emit(data)
|
|
except Exception as e:
|
|
self.error.emit(str(e))
|
|
|
|
def do_add_one(self, extension: dict[str, str | int]):
|
|
"""执行 add_one 操作 (dict)"""
|
|
try:
|
|
response = self.session.post(f"{BASE_URL}/add_one", json=extension)
|
|
data = self._handle_response(response)
|
|
self.addOneFinished.emit(data)
|
|
except Exception as e:
|
|
self.error.emit(str(e))
|
|
|
|
def do_update_one(self, item_id: str, payload: dict[str, str | int]):
|
|
"""执行 update_one 操作 (dict)"""
|
|
try:
|
|
response = self.session.put(f"{BASE_URL}/update_one/{item_id}", json=payload)
|
|
data = self._handle_response(response)
|
|
self.updateOneFinished.emit(data)
|
|
except Exception as e:
|
|
self.error.emit(str(e))
|
|
|
|
def do_delete_one(self, item_id: str):
|
|
"""执行 delete_one 操作 (str ID)"""
|
|
try:
|
|
response = self.session.delete(f"{BASE_URL}/delete_one/{item_id}")
|
|
data = self._handle_response(response)
|
|
self.deleteOneFinished.emit(data)
|
|
except Exception as e:
|
|
self.error.emit(str(e))
|