import requests from PySide6.QtCore import QObject, Signal from common.utils import APIException, BASE_URL # --- API 工作线程 --- class ApiWorker(QObject): """在单独的线程中执行网络请求,避免 GUI 冻结""" # 定义信号 queryAllFinished = Signal(list) addOneFinished = Signal(dict) updateOneFinished = Signal(dict) deleteOneFinished = Signal(dict) error = Signal(str) def __init__(self): super().__init__() self.session = requests.Session() @staticmethod def _handle_response(response): """辅助函数:检查 HTTP 响应""" if not response.ok: try: detail = response.json().get("detail", response.text) raise APIException(f"API 错误 (状态 {response.status_code}): {detail}") except requests.JSONDecodeError: raise APIException(f"API 错误 (状态 {response.status_code}): {response.text}") return response.json() def do_query_all(self): """执行 query_all 操作""" try: response = self.session.get(f"{BASE_URL}/query_all") data = self._handle_response(response) self.queryAllFinished.emit(data) except Exception as e: self.error.emit(str(e)) def do_add_one(self, extension: dict[str, str | int]): """执行 add_one 操作 (dict)""" try: response = self.session.post(f"{BASE_URL}/add_one", json=extension) data = self._handle_response(response) self.addOneFinished.emit(data) except Exception as e: self.error.emit(str(e)) def do_update_one(self, item_id: str, payload: dict[str, str | int]): """执行 update_one 操作 (dict)""" try: response = self.session.put(f"{BASE_URL}/update_one/{item_id}", json=payload) data = self._handle_response(response) self.updateOneFinished.emit(data) except Exception as e: self.error.emit(str(e)) def do_delete_one(self, item_id: str): """执行 delete_one 操作 (str ID)""" try: response = self.session.delete(f"{BASE_URL}/delete_one/{item_id}") data = self._handle_response(response) self.deleteOneFinished.emit(data) except Exception as e: self.error.emit(str(e))