dev: 初步支持加载kps

This commit is contained in:
Julian Freeman
2024-07-19 07:58:30 -04:00
parent 287f950d41
commit aca88dd952
6 changed files with 171 additions and 13 deletions

23
lib/db_columns_def.py Normal file
View File

@@ -0,0 +1,23 @@
# coding: utf8
from .Sqlite3Helper import Column, DataType
columns_d = {
"entry_id": Column("entry_id", DataType.INTEGER, primary_key=True),
"title": Column("title", DataType.BLOB),
"username": Column("username", DataType.BLOB),
"password": Column("password", DataType.BLOB),
"opt": Column("opt", DataType.TEXT),
"url": Column("url", DataType.BLOB),
"notes": Column("notes", DataType.BLOB),
"path": Column("path", DataType.BLOB, nullable=False),
}
all_columns = [
columns_d["entry_id"],
columns_d["title"],
columns_d["username"],
columns_d["password"],
columns_d["opt"],
columns_d["url"],
columns_d["notes"],
columns_d["path"],
]

47
lib/kps_operations.py Normal file
View File

@@ -0,0 +1,47 @@
# coding: utf8
from os import PathLike
from pykeepass import PyKeePass
from .Sqlite3Helper import Sqlite3Worker, BlobType, Column
def trim_str(value):
if value is None:
return ""
if isinstance(value, str):
return value.strip()
return value
def extract_otp(otp: str) -> str:
if otp is None:
return ""
params = otp.split("?", 1)[1]
secret = params.split("&")[0]
return secret.split("=")[1]
def blob_fy(value: str) -> BlobType:
if value is None:
return BlobType()
return BlobType(value.encode("utf-8"))
def read_kps_to_db(kps_file: str | PathLike[str], password: str,
columns: list[Column], sqh: Sqlite3Worker) -> PyKeePass:
kp = PyKeePass(kps_file, password=password)
values = []
for group in kp.groups:
for entry in group.entries:
values.append([
blob_fy(trim_str(entry.title)),
blob_fy(trim_str(entry.username)),
blob_fy(entry.password),
extract_otp(entry.otp),
blob_fy(trim_str(entry.url)),
blob_fy(entry.notes),
blob_fy("::".join(entry.path[:-1])),
])
sqh.insert_into("entries", columns, values)
return kp

View File

@@ -6,9 +6,15 @@ from PySide6.QtWidgets import QApplication
from src.mw_kps_unifier import KpsUnifier
import src.rc_kps_unifier
ORG_NAME = "JnPrograms"
APP_NAME = "KpsUnifier"
def main():
app = QApplication(sys.argv)
app.setOrganizationName(ORG_NAME)
app.setApplicationName(APP_NAME)
win = KpsUnifier()
win.show()
return app.exec()

View File

@@ -57,6 +57,11 @@ class GbxKpsLogin(QtWidgets.QGroupBox):
self.lne_password.setEchoMode(QtWidgets.QLineEdit.EchoMode.Password)
self.pbn_eye.setIcon(self.icon_eye_off)
def set_loaded(self, loaded: bool):
self.is_loaded = loaded
self.lb_loaded.setVisible(loaded)
self.pbn_load.setDisabled(loaded)
class PushButtonWithItem(QtWidgets.QPushButton):

View File

@@ -1,11 +1,36 @@
# coding: utf8
import os
import sys
from datetime import datetime
from pathlib import Path
from PySide6 import QtWidgets, QtCore, QtGui
from .page_load import PageLoad
from .cmbx_styles import StyleComboBox
from lib.Sqlite3Helper import Sqlite3Worker
from lib.db_columns_def import all_columns
def get_default_db_path() -> str:
plat = sys.platform
if plat == "win32":
data_dir = os.path.expandvars("%appdata%")
elif plat == "darwin":
data_dir = os.path.expanduser("~/Library/Application Support")
else:
raise OSError("Unsupported platform")
app_dir = Path(data_dir,
QtWidgets.QApplication.organizationName(),
QtWidgets.QApplication.applicationName())
if not app_dir.exists():
app_dir.mkdir(parents=True, exist_ok=True)
now_s = datetime.now().strftime("%Y%m%d%H%M%S")
return str(app_dir / f"{now_s}.db")
class UiKpsUnifier(object):
def __init__(self, window: QtWidgets.QMainWindow):
def __init__(self, default_db_path: str, sqh: Sqlite3Worker, window: QtWidgets.QMainWindow):
window.setWindowTitle('KeePassXC 多合一')
self.cw = QtWidgets.QWidget(window)
self.vly_m = QtWidgets.QVBoxLayout()
@@ -28,6 +53,7 @@ class UiKpsUnifier(object):
self.lne_db_path = QtWidgets.QLineEdit(self.cw)
self.lne_db_path.setEnabled(False)
self.lne_db_path.setPlaceholderText(default_db_path)
self.cmbx_styles = StyleComboBox(self.cw)
self.hly_top.addWidget(self.lne_db_path)
self.hly_top.addWidget(self.cmbx_styles)
@@ -35,17 +61,24 @@ class UiKpsUnifier(object):
self.sw_m = QtWidgets.QStackedWidget(self.cw)
self.vly_m.addWidget(self.sw_m)
self.page_load = PageLoad(self.cw)
self.page_load = PageLoad(sqh, self.cw)
self.sw_m.addWidget(self.page_load)
self.page_query = QtWidgets.QWidget(self.cw)
self.sw_m.addWidget(self.page_query)
def update_sqh(self, sqh: Sqlite3Worker):
self.page_load.update_sqh(sqh)
class KpsUnifier(QtWidgets.QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.ui = UiKpsUnifier(self)
self.db_path = get_default_db_path()
self.sqh = self.init_db()
self.ui = UiKpsUnifier(self.db_path, self.sqh, self)
self.ui.act_new.triggered.connect(self.on_act_new_triggered)
self.ui.act_open.triggered.connect(self.on_act_open_triggered)
self.ui.act_load.triggered.connect(self.on_act_load_triggered)
self.ui.act_query.triggered.connect(self.on_act_query_triggered)
@@ -53,11 +86,30 @@ class KpsUnifier(QtWidgets.QMainWindow):
def sizeHint(self):
return QtCore.QSize(860, 640)
def on_act_open_triggered(self):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", "../")
def init_db(self) -> Sqlite3Worker:
sqh = Sqlite3Worker(self.db_path)
sqh.create_table("entries", all_columns, if_not_exists=True)
return sqh
def update_db(self, filename: str):
self.db_path = filename
self.sqh = self.init_db()
self.ui.update_sqh(self.sqh)
self.ui.lne_db_path.setText(filename)
def on_act_new_triggered(self):
filename, _ = QtWidgets.QFileDialog.getSaveFileName(self, "新建", "../",
filter="数据库 (*.db);;所有文件 (*)")
if len(filename) == 0:
return
self.ui.lne_db_path.setText(filename)
self.update_db(filename)
def on_act_open_triggered(self):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", "../",
filter="数据库 (*.db);;所有文件 (*)")
if len(filename) == 0:
return
self.update_db(filename)
def on_act_load_triggered(self):
self.ui.sw_m.setCurrentIndex(0)

View File

@@ -1,11 +1,19 @@
# coding: utf8
from PySide6 import QtWidgets
from pykeepass.exceptions import CredentialsError
from .gbx_kps_login import GbxKpsLogin
from lib.Sqlite3Helper import Sqlite3Worker
from lib.db_columns_def import all_columns
from lib.kps_operations import read_kps_to_db
class WgLoadKps(QtWidgets.QWidget):
def __init__(self, parent=None):
def __init__(self, sqh: Sqlite3Worker, parent=None):
super().__init__(parent)
self.sqh = sqh
self.kps_wgs: list[GbxKpsLogin] = []
self.vly_m = QtWidgets.QVBoxLayout()
self.setLayout(self.vly_m)
self.vly_m.addStretch(1)
@@ -16,19 +24,33 @@ class WgLoadKps(QtWidgets.QWidget):
wg.pbn_load.clicked_with_item.connect(self.on_item_pbn_load_clicked)
# 从倒数第二个位置插入,保证弹簧始终在最后
self.vly_m.insertWidget(self.vly_m.count() - 1, wg)
self.kps_wgs.append(wg)
def on_item_pbn_remove_clicked(self, item: GbxKpsLogin):
self.vly_m.removeWidget(item)
item.deleteLater()
def on_item_pbn_load_clicked(self, item: GbxKpsLogin):
item.is_loaded = True
item.lb_loaded.setVisible(True)
item.pbn_load.setDisabled(True)
try:
read_kps_to_db(kps_file=item.lne_path.text(),
password=item.lne_password.text(),
columns=all_columns[1:],
sqh=self.sqh)
except CredentialsError:
QtWidgets.QMessageBox.critical(self, "密码错误",
f"{item.lne_path.text()}\n密码错误")
return
item.set_loaded(True)
def update_sqh(self, sqh: Sqlite3Worker):
self.sqh = sqh
for wg in self.kps_wgs:
wg.set_loaded(False)
class PageLoad(QtWidgets.QWidget):
def __init__(self, parent=None):
def __init__(self, sqh: Sqlite3Worker, parent=None):
super().__init__(parent)
self.hly_m = QtWidgets.QHBoxLayout()
self.setLayout(self.hly_m)
@@ -44,7 +66,7 @@ class PageLoad(QtWidgets.QWidget):
self.sa = QtWidgets.QScrollArea(self)
self.sa.setWidgetResizable(True)
self.hly_m.addWidget(self.sa)
self.wg_sa = WgLoadKps(self.sa)
self.wg_sa = WgLoadKps(sqh, self.sa)
self.sa.setWidget(self.wg_sa)
self.hly_m.setStretchFactor(self.vly_left, 1)
@@ -59,3 +81,6 @@ class PageLoad(QtWidgets.QWidget):
return
for filename in filenames:
self.wg_sa.add_kps(filename)
def update_sqh(self, sqh: Sqlite3Worker):
self.wg_sa.update_sqh(sqh)