From aca88dd9527694515f65b302c235ee8c52d184cb Mon Sep 17 00:00:00 2001 From: Julian Freeman Date: Fri, 19 Jul 2024 07:58:30 -0400 Subject: [PATCH] =?UTF-8?q?dev:=20=E5=88=9D=E6=AD=A5=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=8A=A0=E8=BD=BDkps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/db_columns_def.py | 23 ++++++++++++++++ lib/kps_operations.py | 47 +++++++++++++++++++++++++++++++ main.py | 6 ++++ src/gbx_kps_login.py | 5 ++++ src/mw_kps_unifier.py | 64 +++++++++++++++++++++++++++++++++++++++---- src/page_load.py | 39 +++++++++++++++++++++----- 6 files changed, 171 insertions(+), 13 deletions(-) create mode 100644 lib/db_columns_def.py create mode 100644 lib/kps_operations.py diff --git a/lib/db_columns_def.py b/lib/db_columns_def.py new file mode 100644 index 0000000..f612235 --- /dev/null +++ b/lib/db_columns_def.py @@ -0,0 +1,23 @@ +# coding: utf8 +from .Sqlite3Helper import Column, DataType + +columns_d = { + "entry_id": Column("entry_id", DataType.INTEGER, primary_key=True), + "title": Column("title", DataType.BLOB), + "username": Column("username", DataType.BLOB), + "password": Column("password", DataType.BLOB), + "opt": Column("opt", DataType.TEXT), + "url": Column("url", DataType.BLOB), + "notes": Column("notes", DataType.BLOB), + "path": Column("path", DataType.BLOB, nullable=False), +} +all_columns = [ + columns_d["entry_id"], + columns_d["title"], + columns_d["username"], + columns_d["password"], + columns_d["opt"], + columns_d["url"], + columns_d["notes"], + columns_d["path"], +] diff --git a/lib/kps_operations.py b/lib/kps_operations.py new file mode 100644 index 0000000..3f65bbb --- /dev/null +++ b/lib/kps_operations.py @@ -0,0 +1,47 @@ +# coding: utf8 +from os import PathLike +from pykeepass import PyKeePass +from .Sqlite3Helper import Sqlite3Worker, BlobType, Column + + +def trim_str(value): + if value is None: + return "" + if isinstance(value, str): + return value.strip() + return value + + +def extract_otp(otp: str) -> str: + if otp is None: + return "" + params = otp.split("?", 1)[1] + secret = params.split("&")[0] + return secret.split("=")[1] + + +def blob_fy(value: str) -> BlobType: + if value is None: + return BlobType() + return BlobType(value.encode("utf-8")) + + +def read_kps_to_db(kps_file: str | PathLike[str], password: str, + columns: list[Column], sqh: Sqlite3Worker) -> PyKeePass: + kp = PyKeePass(kps_file, password=password) + + values = [] + for group in kp.groups: + for entry in group.entries: + values.append([ + blob_fy(trim_str(entry.title)), + blob_fy(trim_str(entry.username)), + blob_fy(entry.password), + extract_otp(entry.otp), + blob_fy(trim_str(entry.url)), + blob_fy(entry.notes), + blob_fy("::".join(entry.path[:-1])), + ]) + + sqh.insert_into("entries", columns, values) + return kp diff --git a/main.py b/main.py index a501667..3ffc869 100644 --- a/main.py +++ b/main.py @@ -6,9 +6,15 @@ from PySide6.QtWidgets import QApplication from src.mw_kps_unifier import KpsUnifier import src.rc_kps_unifier +ORG_NAME = "JnPrograms" +APP_NAME = "KpsUnifier" + def main(): app = QApplication(sys.argv) + app.setOrganizationName(ORG_NAME) + app.setApplicationName(APP_NAME) + win = KpsUnifier() win.show() return app.exec() diff --git a/src/gbx_kps_login.py b/src/gbx_kps_login.py index 20b4d18..d31f2f3 100644 --- a/src/gbx_kps_login.py +++ b/src/gbx_kps_login.py @@ -57,6 +57,11 @@ class GbxKpsLogin(QtWidgets.QGroupBox): self.lne_password.setEchoMode(QtWidgets.QLineEdit.EchoMode.Password) self.pbn_eye.setIcon(self.icon_eye_off) + def set_loaded(self, loaded: bool): + self.is_loaded = loaded + self.lb_loaded.setVisible(loaded) + self.pbn_load.setDisabled(loaded) + class PushButtonWithItem(QtWidgets.QPushButton): diff --git a/src/mw_kps_unifier.py b/src/mw_kps_unifier.py index 27186a1..51ab7f2 100644 --- a/src/mw_kps_unifier.py +++ b/src/mw_kps_unifier.py @@ -1,11 +1,36 @@ # coding: utf8 +import os +import sys +from datetime import datetime +from pathlib import Path from PySide6 import QtWidgets, QtCore, QtGui + from .page_load import PageLoad from .cmbx_styles import StyleComboBox +from lib.Sqlite3Helper import Sqlite3Worker +from lib.db_columns_def import all_columns + + +def get_default_db_path() -> str: + plat = sys.platform + if plat == "win32": + data_dir = os.path.expandvars("%appdata%") + elif plat == "darwin": + data_dir = os.path.expanduser("~/Library/Application Support") + else: + raise OSError("Unsupported platform") + app_dir = Path(data_dir, + QtWidgets.QApplication.organizationName(), + QtWidgets.QApplication.applicationName()) + if not app_dir.exists(): + app_dir.mkdir(parents=True, exist_ok=True) + + now_s = datetime.now().strftime("%Y%m%d%H%M%S") + return str(app_dir / f"{now_s}.db") class UiKpsUnifier(object): - def __init__(self, window: QtWidgets.QMainWindow): + def __init__(self, default_db_path: str, sqh: Sqlite3Worker, window: QtWidgets.QMainWindow): window.setWindowTitle('KeePassXC 多合一') self.cw = QtWidgets.QWidget(window) self.vly_m = QtWidgets.QVBoxLayout() @@ -28,6 +53,7 @@ class UiKpsUnifier(object): self.lne_db_path = QtWidgets.QLineEdit(self.cw) self.lne_db_path.setEnabled(False) + self.lne_db_path.setPlaceholderText(default_db_path) self.cmbx_styles = StyleComboBox(self.cw) self.hly_top.addWidget(self.lne_db_path) self.hly_top.addWidget(self.cmbx_styles) @@ -35,17 +61,24 @@ class UiKpsUnifier(object): self.sw_m = QtWidgets.QStackedWidget(self.cw) self.vly_m.addWidget(self.sw_m) - self.page_load = PageLoad(self.cw) + self.page_load = PageLoad(sqh, self.cw) self.sw_m.addWidget(self.page_load) self.page_query = QtWidgets.QWidget(self.cw) self.sw_m.addWidget(self.page_query) + def update_sqh(self, sqh: Sqlite3Worker): + self.page_load.update_sqh(sqh) + class KpsUnifier(QtWidgets.QMainWindow): def __init__(self, parent=None): super().__init__(parent) - self.ui = UiKpsUnifier(self) + self.db_path = get_default_db_path() + self.sqh = self.init_db() + self.ui = UiKpsUnifier(self.db_path, self.sqh, self) + + self.ui.act_new.triggered.connect(self.on_act_new_triggered) self.ui.act_open.triggered.connect(self.on_act_open_triggered) self.ui.act_load.triggered.connect(self.on_act_load_triggered) self.ui.act_query.triggered.connect(self.on_act_query_triggered) @@ -53,11 +86,30 @@ class KpsUnifier(QtWidgets.QMainWindow): def sizeHint(self): return QtCore.QSize(860, 640) - def on_act_open_triggered(self): - filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", "../") + def init_db(self) -> Sqlite3Worker: + sqh = Sqlite3Worker(self.db_path) + sqh.create_table("entries", all_columns, if_not_exists=True) + return sqh + + def update_db(self, filename: str): + self.db_path = filename + self.sqh = self.init_db() + self.ui.update_sqh(self.sqh) + self.ui.lne_db_path.setText(filename) + + def on_act_new_triggered(self): + filename, _ = QtWidgets.QFileDialog.getSaveFileName(self, "新建", "../", + filter="数据库 (*.db);;所有文件 (*)") if len(filename) == 0: return - self.ui.lne_db_path.setText(filename) + self.update_db(filename) + + def on_act_open_triggered(self): + filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", "../", + filter="数据库 (*.db);;所有文件 (*)") + if len(filename) == 0: + return + self.update_db(filename) def on_act_load_triggered(self): self.ui.sw_m.setCurrentIndex(0) diff --git a/src/page_load.py b/src/page_load.py index 3b2e32e..e05e489 100644 --- a/src/page_load.py +++ b/src/page_load.py @@ -1,11 +1,19 @@ # coding: utf8 from PySide6 import QtWidgets +from pykeepass.exceptions import CredentialsError + from .gbx_kps_login import GbxKpsLogin +from lib.Sqlite3Helper import Sqlite3Worker +from lib.db_columns_def import all_columns +from lib.kps_operations import read_kps_to_db class WgLoadKps(QtWidgets.QWidget): - def __init__(self, parent=None): + def __init__(self, sqh: Sqlite3Worker, parent=None): super().__init__(parent) + self.sqh = sqh + self.kps_wgs: list[GbxKpsLogin] = [] + self.vly_m = QtWidgets.QVBoxLayout() self.setLayout(self.vly_m) self.vly_m.addStretch(1) @@ -16,19 +24,33 @@ class WgLoadKps(QtWidgets.QWidget): wg.pbn_load.clicked_with_item.connect(self.on_item_pbn_load_clicked) # 从倒数第二个位置插入,保证弹簧始终在最后 self.vly_m.insertWidget(self.vly_m.count() - 1, wg) + self.kps_wgs.append(wg) def on_item_pbn_remove_clicked(self, item: GbxKpsLogin): self.vly_m.removeWidget(item) item.deleteLater() def on_item_pbn_load_clicked(self, item: GbxKpsLogin): - item.is_loaded = True - item.lb_loaded.setVisible(True) - item.pbn_load.setDisabled(True) + try: + read_kps_to_db(kps_file=item.lne_path.text(), + password=item.lne_password.text(), + columns=all_columns[1:], + sqh=self.sqh) + except CredentialsError: + QtWidgets.QMessageBox.critical(self, "密码错误", + f"{item.lne_path.text()}\n密码错误") + return + + item.set_loaded(True) + + def update_sqh(self, sqh: Sqlite3Worker): + self.sqh = sqh + for wg in self.kps_wgs: + wg.set_loaded(False) class PageLoad(QtWidgets.QWidget): - def __init__(self, parent=None): + def __init__(self, sqh: Sqlite3Worker, parent=None): super().__init__(parent) self.hly_m = QtWidgets.QHBoxLayout() self.setLayout(self.hly_m) @@ -44,7 +66,7 @@ class PageLoad(QtWidgets.QWidget): self.sa = QtWidgets.QScrollArea(self) self.sa.setWidgetResizable(True) self.hly_m.addWidget(self.sa) - self.wg_sa = WgLoadKps(self.sa) + self.wg_sa = WgLoadKps(sqh, self.sa) self.sa.setWidget(self.wg_sa) self.hly_m.setStretchFactor(self.vly_left, 1) @@ -54,8 +76,11 @@ class PageLoad(QtWidgets.QWidget): def on_pbn_add_clicked(self): filenames, _ = QtWidgets.QFileDialog.getOpenFileNames(self, "选择", "../", - filter="KeePass 2 数据库 (*.kdbx);;所有文件(*)") + filter="KeePass 2 数据库 (*.kdbx);;所有文件 (*)") if len(filenames) == 0: return for filename in filenames: self.wg_sa.add_kps(filename) + + def update_sqh(self, sqh: Sqlite3Worker): + self.wg_sa.update_sqh(sqh)