This commit is contained in:
Julian Freeman
2024-08-11 17:01:46 -04:00
parent 362db77ff7
commit 030e5eec55
10 changed files with 164 additions and 110 deletions

View File

@@ -1,3 +1,17 @@
# KpsUnifier # KpsUnifier
一个可以同时处理多个 keepass 文件的工具 一个可以同时处理多个 keepass 文件的工具
## 版本日志
### v1.2.0
- 修复添加的 keepass 文件为空时静默报错的问题
- 增加记录上次打开目录的位置
- 转移条目时直接转移整个条目而不是逐项复制(可以转移历史)
- 查询页增加一个输入框显示当前查询条数
### v1.1.1
- 修复无法保存配置文件的问题
- 修复切换数据库时 kps 文件加载状态更新错误的问题

View File

@@ -7,11 +7,12 @@ import time
from dataclasses import dataclass from dataclasses import dataclass
from enum import StrEnum from enum import StrEnum
from os import PathLike from os import PathLike
from types import NoneType
from cryptography.fernet import Fernet, InvalidToken from cryptography.fernet import Fernet, InvalidToken
__version__ = "2.2.0" __version__ = "2.2.2"
__version_info__ = tuple(map(int, __version__.split("."))) __version_info__ = tuple(map(int, __version__.split(".")))
@@ -32,13 +33,18 @@ class NullType(object):
class BlobType(object): class BlobType(object):
def __init__(self, data: bytes = b""): def __init__(self, data: bytes = b""):
self.data = data self._data = data
def __str__(self): def __str__(self):
return f"X'{self.data.hex()}'" return f"X'{self._data.hex()}'"
def encrypt(self, fernet: _NotRandomFernet) -> BlobType:
if fernet is None:
raise ValueError("Key is not set")
return BlobType(fernet.encrypt(self._data))
class NotRandomFernet(Fernet): class _NotRandomFernet(Fernet):
"""固定下来每次相同的 key 的加密结果相同,方便条件查询""" """固定下来每次相同的 key 的加密结果相同,方便条件查询"""
def __init__(self, key: bytes | str, fix_time: int, fix_iv: bytes, backend=None): def __init__(self, key: bytes | str, fix_time: int, fix_iv: bytes, backend=None):
@@ -50,44 +56,53 @@ class NotRandomFernet(Fernet):
return self._encrypt_from_parts(data, self._fix_time, self._fix_iv) return self._encrypt_from_parts(data, self._fix_time, self._fix_iv)
def _encrypt_blob(blob: BlobType, fernet: NotRandomFernet) -> BlobType: VALUE_TYPES = None | NullType | int | float | str | bytes | BlobType
if fernet is None:
raise ValueError("Key is not set")
return BlobType(fernet.encrypt(blob.data))
def _get_type(data_type: DataType) -> type: def _check_data_type(data_type: DataType, allow_null: bool, value) -> bool:
value_type = type(value)
allow_types = []
if data_type == DataType.NULL: if data_type == DataType.NULL:
return NullType pass
if data_type == DataType.INTEGER: elif data_type == DataType.INTEGER:
return int allow_types.extend([int, ])
if data_type == DataType.REAL: elif data_type == DataType.REAL:
return float allow_types.extend([int, float])
if data_type == DataType.TEXT: elif data_type == DataType.TEXT:
return str allow_types.extend([str, ])
elif data_type == DataType.BLOB:
allow_types.extend([str, bytes, BlobType])
if allow_null:
allow_types.extend([NoneType, NullType])
return value_type in allow_types
def _implicitly_convert(data_type: DataType, value):
if data_type == DataType.REAL and type(value) is int:
return float(value)
if data_type == DataType.BLOB: if data_type == DataType.BLOB:
return BlobType if type(value) is str:
raise TypeError(f"Data type {data_type} is not supported") return BlobType(value.encode("utf-8"))
if type(value) is bytes:
return BlobType(value)
return value
def _get_data_type(type_: type) -> DataType: def _is_null(value) -> bool:
if type_ is NullType: return type(value) in (NoneType, NullType)
return DataType.NULL
if type_ is int:
return DataType.INTEGER
if type_ is float:
return DataType.REAL
if type_ is str:
return DataType.TEXT
if type_ is BlobType:
return DataType.BLOB
raise TypeError(f"Data type {type_} is not supported")
def _to_string(value): def _to_string(value):
# 如果传入的类型不是 text 会直接返回原值 if value is None:
if type(value) is str: value = NullType()
if not (value.startswith("'") or value.endswith("'")): elif type(value) is str:
# 只要开头或者结尾任意一个字符不是单引号
if not (value.startswith("'") and value.endswith("'")):
# 把单引号换为两个单引号转义
value = value.replace("'", "''")
value = f"'{value}'" value = f"'{value}'"
return str(value) return str(value)
@@ -171,7 +186,7 @@ class Operand(object):
fix_time = self._fix_time if self._fix_time is not None else int(time.time()) fix_time = self._fix_time if self._fix_time is not None else int(time.time())
fix_iv = self._fix_iv if self._fix_iv is not None else os.urandom(16) fix_iv = self._fix_iv if self._fix_iv is not None else os.urandom(16)
try: try:
value = _encrypt_blob(value, NotRandomFernet(self._key, fix_time, fix_iv)) value = value.encrypt(_NotRandomFernet(self._key, fix_time, fix_iv))
except ValueError: except ValueError:
pass pass
op = "!=" if not_ else "=" op = "!=" if not_ else "="
@@ -264,7 +279,7 @@ class Sqlite3Worker(object):
fix_time = fix_time if fix_time is not None else int(time.time()) fix_time = fix_time if fix_time is not None else int(time.time())
fix_iv = fix_iv if fix_iv is not None else os.urandom(16) fix_iv = fix_iv if fix_iv is not None else os.urandom(16)
try: try:
self._fernet = NotRandomFernet(key, fix_time, fix_iv) self._fernet = _NotRandomFernet(key, fix_time, fix_iv)
except ValueError: except ValueError:
pass pass
@@ -284,6 +299,12 @@ class Sqlite3Worker(object):
def commit(self): def commit(self):
self._conn.commit() self._conn.commit()
def _execute(self, statement: str):
try:
self._cursor.execute(statement)
except sqlite3.Error as e:
raise sqlite3.Error(f"Error name: {e.sqlite_errorname};\nError statement: {statement}")
def create_table(self, table_name: str, columns: list[Column], def create_table(self, table_name: str, columns: list[Column],
if_not_exists: bool = False, schema_name: str = "", if_not_exists: bool = False, schema_name: str = "",
*, execute: bool = True) -> str: *, execute: bool = True) -> str:
@@ -301,7 +322,7 @@ class Sqlite3Worker(object):
statement = f"{head} {name} ({columns_str});" statement = f"{head} {name} ({columns_str});"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
return statement return statement
def drop_table(self, table_name: str, if_exists: bool = False, def drop_table(self, table_name: str, if_exists: bool = False,
@@ -316,14 +337,14 @@ class Sqlite3Worker(object):
statement = f"{head} {name};" statement = f"{head} {name};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
return statement return statement
def rename_table(self, table_name: str, new_name: str, *, execute: bool = True) -> str: def rename_table(self, table_name: str, new_name: str, *, execute: bool = True) -> str:
head = "ALTER TABLE" head = "ALTER TABLE"
statement = f"{head} {table_name} RENAME TO {new_name};" statement = f"{head} {table_name} RENAME TO {new_name};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
return statement return statement
def add_column(self, table_name: str, column: Column, *, execute: bool = True) -> str: def add_column(self, table_name: str, column: Column, *, execute: bool = True) -> str:
@@ -338,7 +359,7 @@ class Sqlite3Worker(object):
head = "ALTER TABLE" head = "ALTER TABLE"
statement = f"{head} {table_name} ADD COLUMN {str(column)};" statement = f"{head} {table_name} ADD COLUMN {str(column)};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
return statement return statement
def rename_column(self, table_name: str, column_name: str, def rename_column(self, table_name: str, column_name: str,
@@ -349,7 +370,7 @@ class Sqlite3Worker(object):
head = "ALTER TABLE" head = "ALTER TABLE"
statement = f"{head} {table_name} RENAME COLUMN {column_name} TO {new_name};" statement = f"{head} {table_name} RENAME COLUMN {column_name} TO {new_name};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
return statement return statement
def show_tables(self) -> list[str]: def show_tables(self) -> list[str]:
@@ -370,7 +391,7 @@ class Sqlite3Worker(object):
return ", ".join(columns_str_ls) return ", ".join(columns_str_ls)
def insert_into(self, table_name: str, columns: list[Column | str], def insert_into(self, table_name: str, columns: list[Column | str],
values: list[list[NullType | str | int | float | BlobType]], values: list[list[VALUE_TYPES]],
*, execute: bool = True, commit: bool = True) -> str: *, execute: bool = True, commit: bool = True) -> str:
col_count = len(columns) col_count = len(columns)
columns_str = self._columns_to_string(columns) columns_str = self._columns_to_string(columns)
@@ -381,26 +402,15 @@ class Sqlite3Worker(object):
raise ValueError(f"Length of values must be {col_count}") raise ValueError(f"Length of values must be {col_count}")
value_row_str_ls = [] value_row_str_ls = []
for i in range(col_count): for column, value in zip(columns, value_row):
column = columns[i]
value = value_row[i]
if isinstance(column, Column): if isinstance(column, Column):
col_type = _get_type(column.data_type) if not _check_data_type(column.data_type, column.nullable, value):
val_type = type(value) raise ValueError(f"Type of {column.name} must be {column.data_type}, found {type(value)}")
# 支持将 int 隐式转为 float # 这一步一定在加密之前
if val_type is int and col_type is float: value = _implicitly_convert(column.data_type, value)
pass
# 支持将 NULL 值插入任意类型的列,除了 NOT NULL 限制的
elif val_type is NullType and column.nullable is True:
pass
# 其他类型不匹配
elif val_type is not col_type:
raise ValueError(f"The {i + 1}(th) type of value must be {col_type},"
f" because the column type is {column.data_type},"
f" found {val_type}")
# 如果加密 # 如果加密
if column.secure and val_type is not NullType: if column.secure and not _is_null(value):
value = _to_string(_encrypt_blob(value, self._fernet)) value = value.encrypt(self._fernet)
value_row_str_ls.append(_to_string(value)) value_row_str_ls.append(_to_string(value))
@@ -411,7 +421,7 @@ class Sqlite3Worker(object):
head = "INSERT INTO" head = "INSERT INTO"
statement = f"{head} {table_name} ({columns_str}) VALUES {values_str};" statement = f"{head} {table_name} ({columns_str}) VALUES {values_str};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
if commit: if commit:
self._conn.commit() self._conn.commit()
return statement return statement
@@ -450,7 +460,7 @@ class Sqlite3Worker(object):
statement = f"{body};" statement = f"{body};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
rows = self._cursor.fetchall() rows = self._cursor.fetchall()
rows = [list(row) for row in rows] # 将每行转成列表,方便替换解密数据 rows = [list(row) for row in rows] # 将每行转成列表,方便替换解密数据
# 下面的整个循环都是为了找到需要解密的数据尝试解密 # 下面的整个循环都是为了找到需要解密的数据尝试解密
@@ -481,25 +491,24 @@ class Sqlite3Worker(object):
statement = f"{body};" statement = f"{body};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
if commit: if commit:
self._conn.commit() self._conn.commit()
return statement return statement
def update(self, table_name: str, new_values: list[tuple[Column | str, NullType | int | float | str | BlobType]], def update(self, table_name: str, new_values: list[tuple[Column | str, VALUE_TYPES]],
where: Expression = None, where: Expression = None,
*, execute: bool = True, commit: bool = True) -> str: *, execute: bool = True, commit: bool = True) -> str:
new_values_str_ls = [] new_values_str_ls = []
for column, value in new_values: for column, value in new_values:
if isinstance(column, Column): if isinstance(column, Column):
# 支持将 NULL 值填入任意类型的列,除了 NOT NULL 限制的 if not _check_data_type(column.data_type, column.nullable, value):
if type(value) is NullType and column.nullable is True:
pass
elif _get_data_type(type(value)) != column.data_type:
raise ValueError(f"Type of {column.name} must be {column.data_type}, found {type(value)}") raise ValueError(f"Type of {column.name} must be {column.data_type}, found {type(value)}")
# 这一步一定在加密之前
if column.secure and type(value) is not NullType: value = _implicitly_convert(column.data_type, value)
value = _encrypt_blob(value, self._fernet) # 如果加密
if column.secure and not _is_null(value):
value = value.encrypt(self._fernet)
name = column.name name = column.name
else: else:
@@ -514,7 +523,7 @@ class Sqlite3Worker(object):
statement = f"{body};" statement = f"{body};"
if execute: if execute:
self._cursor.execute(statement) self._execute(statement)
if commit: if commit:
self._conn.commit() self._conn.commit()
return statement return statement

View File

@@ -49,17 +49,22 @@ def get_config_path(org_name: str, app_name: str) -> Path:
def read_config(org_name: str, app_name: str) -> dict: def read_config(org_name: str, app_name: str) -> dict:
config_path = get_config_path(org_name, app_name)
if not config_path.exists():
config = { config = {
"button_min_width": 120, "button_min_width": 120,
"last_db_path": "", "last_db_path": "",
"last_open_path": "../",
"loaded_memory": {} "loaded_memory": {}
} }
config_path = get_config_path(org_name, app_name)
if not config_path.exists():
config_path.write_text(json.dumps(config, ensure_ascii=False, indent=4), encoding="utf-8") config_path.write_text(json.dumps(config, ensure_ascii=False, indent=4), encoding="utf-8")
return config return config
else: else:
return json.loads(config_path.read_text(encoding="utf-8")) exist_config = json.loads(config_path.read_text(encoding="utf-8"))
for key, value in config.items():
if key not in exist_config:
exist_config[key] = value
return exist_config
def write_config(config: dict, org_name: str, app_name: str): def write_config(config: dict, org_name: str, app_name: str):

View File

@@ -46,5 +46,8 @@ def read_kps_to_db(kps_file: str | PathLike[str], password: str,
blob_fy("::".join(entry.path[:-1])), blob_fy("::".join(entry.path[:-1])),
]) ])
if len(values) == 0:
raise ValueError("Keepass 文件为空")
sqh.insert_into(table_name, insert_columns, values) sqh.insert_into(table_name, insert_columns, values)
return kp return kp

View File

@@ -12,7 +12,7 @@ from lib.config_utils import (
from src.mw_kps_unifier import KpsUnifier from src.mw_kps_unifier import KpsUnifier
import src.rc_kps_unifier import src.rc_kps_unifier
__version__ = '1.1.1' __version__ = '1.2.0'
__version_info__ = tuple(map(int, __version__.split('.'))) __version_info__ = tuple(map(int, __version__.split('.')))
ORG_NAME = "JnPrograms" ORG_NAME = "JnPrograms"

View File

@@ -1,4 +1,6 @@
# coding: utf8 # coding: utf8
from pathlib import Path
from PySide6 import QtWidgets, QtCore, QtGui from PySide6 import QtWidgets, QtCore, QtGui
from pykeepass import PyKeePass from pykeepass import PyKeePass
from pykeepass.exceptions import HeaderChecksumError, CredentialsError from pykeepass.exceptions import HeaderChecksumError, CredentialsError
@@ -45,10 +47,11 @@ class UiDaTargetLogin(object):
class DaTargetLogin(QtWidgets.QDialog): class DaTargetLogin(QtWidgets.QDialog):
def __init__(self, parent=None): def __init__(self, config: dict, parent=None):
super().__init__(parent) super().__init__(parent)
self.ui = UiDaTargetLogin(self) self.ui = UiDaTargetLogin(self)
self.tar_kp: PyKeePass | None = None self.tar_kp: PyKeePass | None = None
self.config = config
self.ui.pbn_browse.clicked.connect(self.on_pbn_browse_clicked) self.ui.pbn_browse.clicked.connect(self.on_pbn_browse_clicked)
self.ui.pbn_eye.clicked.connect(self.on_pbn_eye_clicked) self.ui.pbn_eye.clicked.connect(self.on_pbn_eye_clicked)
@@ -56,11 +59,12 @@ class DaTargetLogin(QtWidgets.QDialog):
self.ui.pbn_cancel.clicked.connect(self.on_pbn_cancel_clicked) self.ui.pbn_cancel.clicked.connect(self.on_pbn_cancel_clicked)
def on_pbn_browse_clicked(self): def on_pbn_browse_clicked(self):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "选择", "../", filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "选择", self.config["last_open_path"],
filter="KeePass 2 数据库 (*.kdbx);;所有文件 (*)") filter="KeePass 2 数据库 (*.kdbx);;所有文件 (*)")
if len(filename) == 0: if len(filename) == 0:
return return
self.ui.lne_path.setText(filename) self.ui.lne_path.setText(filename)
self.config["last_open_path"] = str(Path(filename).parent)
def on_pbn_eye_clicked(self): def on_pbn_eye_clicked(self):
if self.ui.lne_password.echoMode() == QtWidgets.QLineEdit.EchoMode.Password: if self.ui.lne_password.echoMode() == QtWidgets.QLineEdit.EchoMode.Password:

View File

@@ -97,6 +97,9 @@ class GbxKpsLogin(QtWidgets.QGroupBox):
QtWidgets.QMessageBox.critical(self, "密码错误", QtWidgets.QMessageBox.critical(self, "密码错误",
f"{self.lne_path.text()}\n密码错误") f"{self.lne_path.text()}\n密码错误")
return return
except ValueError as e:
QtWidgets.QMessageBox.critical(self, "错误", str(e))
return
self.file_kp[self.lne_path.text()] = kp self.file_kp[self.lne_path.text()] = kp
self.sec_sqh.insert_into("secrets", insert_sec_columns, [ self.sec_sqh.insert_into("secrets", insert_sec_columns, [

View File

@@ -1,4 +1,6 @@
# coding: utf8 # coding: utf8
from pathlib import Path
from PySide6 import QtWidgets, QtCore, QtGui from PySide6 import QtWidgets, QtCore, QtGui
from pykeepass import PyKeePass from pykeepass import PyKeePass
@@ -131,18 +133,20 @@ class KpsUnifier(QtWidgets.QMainWindow):
self.ui.lne_db_path.setText(filename) self.ui.lne_db_path.setText(filename)
def on_act_new_triggered(self): def on_act_new_triggered(self):
filename, _ = QtWidgets.QFileDialog.getSaveFileName(self, "新建", "../", filename, _ = QtWidgets.QFileDialog.getSaveFileName(self, "新建", self.config["last_open_path"],
filter="数据库 (*.db);;所有文件 (*)") filter="数据库 (*.db);;所有文件 (*)")
if len(filename) == 0: if len(filename) == 0:
return return
self.update_db(filename) self.update_db(filename)
self.config["last_open_path"] = str(Path(filename).parent)
def on_act_open_triggered(self): def on_act_open_triggered(self):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", "../", filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", self.config["last_open_path"],
filter="数据库 (*.db);;所有文件 (*)") filter="数据库 (*.db);;所有文件 (*)")
if len(filename) == 0: if len(filename) == 0:
return return
self.update_db(filename) self.update_db(filename)
self.config["last_open_path"] = str(Path(filename).parent)
def on_act_load_triggered(self): def on_act_load_triggered(self):
self.ui.sw_m.setCurrentIndex(0) self.ui.sw_m.setCurrentIndex(0)

View File

@@ -109,15 +109,17 @@ class PageLoad(QtWidgets.QWidget):
self.pbn_clear_loaded_mem.clicked.connect(self.on_pbn_clear_loaded_mem_clicked) self.pbn_clear_loaded_mem.clicked.connect(self.on_pbn_clear_loaded_mem_clicked)
def update_sqh(self, sqh: Sqlite3Worker): def update_sqh(self, sqh: Sqlite3Worker):
self.sqh = sqh
self.wg_sa.update_sqh(sqh) self.wg_sa.update_sqh(sqh)
def on_pbn_add_kps_clicked(self): def on_pbn_add_kps_clicked(self):
filenames, _ = QtWidgets.QFileDialog.getOpenFileNames(self, "选择", "../", filenames, _ = QtWidgets.QFileDialog.getOpenFileNames(self, "选择", self.config["last_open_path"],
filter="KeePass 2 数据库 (*.kdbx);;所有文件 (*)") filter="KeePass 2 数据库 (*.kdbx);;所有文件 (*)")
if len(filenames) == 0: if len(filenames) == 0:
return return
for filename in filenames: for filename in filenames:
self.wg_sa.add_kps(filename) self.wg_sa.add_kps(filename)
self.config["last_open_path"] = str(Path(filenames[0]).parent)
def on_pbn_clear_db_clicked(self): def on_pbn_clear_db_clicked(self):
if accept_warning(self, True, "警告", "你确定要清空当前数据库吗?"): if accept_warning(self, True, "警告", "你确定要清空当前数据库吗?"):
@@ -141,10 +143,14 @@ class PageLoad(QtWidgets.QWidget):
self.wg_sa.update_load_status(wg) self.wg_sa.update_load_status(wg)
def on_pbn_clear_loaded_mem_clicked(self): def on_pbn_clear_loaded_mem_clicked(self):
if accept_warning(self, True, "警告", "你确定要清空所有加载记忆吗?"): if accept_warning(self, True, "警告", "你确定要清空当前加载记忆吗?"):
return return
loaded_mem: dict = self.config["loaded_memory"] filename = str(Path(self.sqh.db_name).name)
loaded_mem: list = self.config["loaded_memory"].get(filename, None)
if loaded_mem is None:
QtWidgets.QMessageBox.warning(self, "警告", f"没有找到 {filename} 的加载记忆")
else:
loaded_mem.clear() loaded_mem.clear()
QtWidgets.QMessageBox.information(self, "提示", "已清空加载记忆") QtWidgets.QMessageBox.information(self, "提示", "已清空加载记忆")

View File

@@ -1,5 +1,6 @@
# coding: utf8 # coding: utf8
import json import json
from pathlib import Path
from uuid import UUID from uuid import UUID
from PySide6 import QtWidgets, QtCore, QtGui from PySide6 import QtWidgets, QtCore, QtGui
from pykeepass import PyKeePass from pykeepass import PyKeePass
@@ -117,7 +118,13 @@ class UiPageQuery(object):
self.vly_sa_wg.addStretch(1) self.vly_sa_wg.addStretch(1)
self.pbn_read_filters = QtWidgets.QPushButton("更多过滤", window) self.lne_entries_count = QtWidgets.QLineEdit(self.sa_wg)
self.lne_entries_count.setDisabled(True)
self.lne_entries_count.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.lne_entries_count.setMinimumWidth(config["button_min_width"])
self.vly_sa_wg.addWidget(self.lne_entries_count)
self.pbn_read_filters = QtWidgets.QPushButton("更多过滤", self.sa_wg)
self.pbn_read_filters.setMinimumWidth(config["button_min_width"]) self.pbn_read_filters.setMinimumWidth(config["button_min_width"])
self.vly_sa_wg.addWidget(self.pbn_read_filters) self.vly_sa_wg.addWidget(self.pbn_read_filters)
@@ -181,11 +188,11 @@ class PageQuery(QtWidgets.QWidget):
def set_filter_button(self, fil: dict): def set_filter_button(self, fil: dict):
pbn_fil = PushButtonWithData(fil, self.ui.sa_wg, fil["name"]) pbn_fil = PushButtonWithData(fil, self.ui.sa_wg, fil["name"])
pbn_fil.setMinimumWidth(self.config["button_min_width"]) pbn_fil.setMinimumWidth(self.config["button_min_width"])
self.ui.vly_sa_wg.insertWidget(self.ui.vly_sa_wg.count() - 2, pbn_fil) self.ui.vly_sa_wg.insertWidget(self.ui.vly_sa_wg.count() - 3, pbn_fil)
pbn_fil.clicked_with_data.connect(self.on_custom_filters_clicked_with_data) pbn_fil.clicked_with_data.connect(self.on_custom_filters_clicked_with_data)
def on_pbn_read_filters_clicked(self): def on_pbn_read_filters_clicked(self):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", "../", filename, _ = QtWidgets.QFileDialog.getOpenFileName(self, "打开", self.config["last_open_path"],
filter="JSON 文件 (*.json);;所有文件 (*)") filter="JSON 文件 (*.json);;所有文件 (*)")
if len(filename) == 0: if len(filename) == 0:
return return
@@ -194,12 +201,14 @@ class PageQuery(QtWidgets.QWidget):
for fil in filter_ls: for fil in filter_ls:
self.set_filter_button(fil) self.set_filter_button(fil)
self.config["last_open_path"] = str(Path(filename).parent)
def on_custom_filters_clicked_with_data(self, data: dict): def on_custom_filters_clicked_with_data(self, data: dict):
_, results = self.sqh.select("entries", query_columns, _, results = self.sqh.select("entries", query_columns,
where=Expression(data["where"]).and_(Operand(deleted_col).equal_to(0))) where=Expression(data["where"]).and_(Operand(deleted_col).equal_to(0)))
model = QueryTableModel(results, self) model = QueryTableModel(results, self)
self.ui.trv_m.setModel(model) self.ui.trv_m.setModel(model)
self.ui.lne_entries_count.setText(str(model.rowCount()))
def update_sqh(self, sqh: Sqlite3Worker): def update_sqh(self, sqh: Sqlite3Worker):
self.sqh = sqh self.sqh = sqh
@@ -209,6 +218,7 @@ class PageQuery(QtWidgets.QWidget):
where=Operand(deleted_col).equal_to(0)) where=Operand(deleted_col).equal_to(0))
model = QueryTableModel(results, self) model = QueryTableModel(results, self)
self.ui.trv_m.setModel(model) self.ui.trv_m.setModel(model)
self.ui.lne_entries_count.setText(str(model.rowCount()))
def on_pbn_deleted_clicked(self): def on_pbn_deleted_clicked(self):
_, results = self.sqh.select("entries", query_columns, _, results = self.sqh.select("entries", query_columns,
@@ -247,13 +257,10 @@ class PageQuery(QtWidgets.QWidget):
kp = self.file_kp[filepath] kp = self.file_kp[filepath]
return kp return kp
def delete_the_delete_and_transfer(self, transfer: bool = False): def delete_the_delete(self):
cond = Operand(status_col).equal_to("delete") _, results = self.sqh.select("entries", sim_columns,
if transfer is True: where=Operand(status_col).equal_to("delete")
cond = cond.or_(Operand(status_col).equal_to("transfer"), high_priority=True) .and_(Operand(deleted_col).equal_to(0)))
cond = cond.and_(Operand(deleted_col).equal_to(0))
_, results = self.sqh.select("entries", sim_columns, where=cond)
file_uuids = get_filepath_uuids_map(results) file_uuids = get_filepath_uuids_map(results)
total, success, invalid = 0, 0, 0 total, success, invalid = 0, 0, 0
@@ -272,10 +279,11 @@ class PageQuery(QtWidgets.QWidget):
continue continue
kp.delete_entry(entry) kp.delete_entry(entry)
success += 1
self.sqh.update("entries", [(deleted_col, 1)], self.sqh.update("entries", [(deleted_col, 1)],
where=Operand(uuid_col).equal_to(u).and_( where=Operand(uuid_col).equal_to(u).and_(
Operand(filepath_col).equal_to(blob_fy(file)))) Operand(filepath_col).equal_to(blob_fy(file))))
success += 1
kp.save() kp.save()
@@ -309,17 +317,15 @@ class PageQuery(QtWidgets.QWidget):
invalid += 1 invalid += 1
continue continue
self.tar_kp.add_entry( kp.move_entry(entry, dest_group)
dest_group,
entry.title or "",
entry.username or "",
entry.password or "",
entry.url,
entry.notes,
otp=entry.otp,
force_creation=True
)
success += 1 success += 1
self.sqh.update("entries", [(deleted_col, 1)],
where=Operand(uuid_col).equal_to(u).and_(
Operand(filepath_col).equal_to(blob_fy(file))))
kp.save()
self.tar_kp.save() self.tar_kp.save()
QtWidgets.QMessageBox.information(self, "提示", QtWidgets.QMessageBox.information(self, "提示",
f"{total} 条转移条目,成功 {success} 条,失败 {invalid} 条。") f"{total} 条转移条目,成功 {success} 条,失败 {invalid} 条。")
@@ -336,10 +342,10 @@ class PageQuery(QtWidgets.QWidget):
if transfer: if transfer:
self.transfer_the_transfer() self.transfer_the_transfer()
self.delete_the_delete_and_transfer(transfer) self.delete_the_delete()
def on_pbn_set_target_clicked(self): def on_pbn_set_target_clicked(self):
da_target_login = DaTargetLogin(self) da_target_login = DaTargetLogin(self.config, self)
da_target_login.exec() da_target_login.exec()
self.tar_kp = da_target_login.tar_kp self.tar_kp = da_target_login.tar_kp