""" 模板缓存引擎(支持持久化)。 流程: 1. 首次加载:TemplateModifier 修改模板 → project.read() → 保存到磁盘缓存 2. 同进程内缓存命中:从内存恢复(~1s) 3. 跨进程/重启:从磁盘缓存恢复(首次 ~5s,后续 ~1s) """ import hashlib import json import os import shutil import time import tempfile from qgis.core import QgsProject, QgsRectangle from app.config.paths import get_logger logger = get_logger("qgis.cache") # 持久化缓存目录(项目根目录下) _CACHE_DIR = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))), "app", "data", "cache", "qgis_templates", ) _INDEX_FILE = os.path.join(_CACHE_DIR, "index.json") class TemplateCache: def __init__(self): self._cache: dict[str, dict] = {} # ── 公开接口 ── def is_loaded(self, template_path: str) -> bool: return template_path in self._cache def save_current(self, template_path: str, layout_name: str = "A4") -> None: """将当前已加载的项目保存到内存缓存 + 磁盘缓存""" project = QgsProject.instance() # ── 保存项目到持久化缓存目录 ── os.makedirs(_CACHE_DIR, exist_ok=True) key = self._path_key(template_path) cache_file = os.path.join(_CACHE_DIR, f"{key}.qgz") t_save = time.time() project.write(cache_file) logger.info(f" 缓存写入磁盘: {os.path.basename(cache_file)} ({time.time() - t_save:.1f}s)") # ── 记录状态 ── texts = {} extent = None layout = project.layoutManager().layoutByName(layout_name) if layout: for item_id in ["mapTitle", "mapTime", "mapUint", "info"]: item = layout.itemById(item_id) if item: texts[item_id] = item.text() map_item = layout.itemById("Map") if map_item: extent = QgsRectangle(map_item.extent()) entry = { "file": cache_file, "texts": texts, "extent": extent, "layout": layout_name, } self._cache[template_path] = entry # ── 更新索引文件 ── self._update_index(template_path, entry) logger.info(f" 模板已缓存: {os.path.basename(template_path)} ({len(self._cache)} 个)") def restore_template(self, template_path: str) -> tuple: """从缓存恢复模板""" cached = self._cache.get(template_path) if not cached: raise RuntimeError(f"模板未缓存: {template_path}") project = QgsProject.instance() t0 = time.time() # 大部分图层已 GPKG 本地化,不需要 FlagDontResolveLayers project.read(cached["file"]) logger.debug(f" 恢复耗时: {time.time() - t0:.1f}s") return project, cached["texts"], cached["extent"] def reset_project_state(self, project: QgsProject, texts: dict, extent) -> None: """重置项目到干净状态""" for layer in project.mapLayers().values(): if layer.subsetString(): layer.setSubsetString("") layout_name = "A4" for cached in self._cache.values(): layout_name = cached.get("layout", "A4") break layout = project.layoutManager().layoutByName(layout_name) if layout: for item_id, text in texts.items(): item = layout.itemById(item_id) if item: item.setText(text) if extent: map_item = layout.itemById("Map") if map_item: map_item.zoomToExtent(extent) def load_persistent_cache(self) -> int: """从磁盘恢复所有缓存到内存(qgis_runner 启动时调用)""" if not os.path.isfile(_INDEX_FILE): logger.info(" 磁盘缓存为空,首次启动需要加载模板") return 0 try: with open(_INDEX_FILE, "r", encoding="utf-8") as f: index = json.load(f) except (json.JSONDecodeError, OSError) as e: logger.warning(f" 缓存索引损坏: {e}") return 0 count = 0 for template_path, entry in index.get("templates", {}).items(): cache_file = entry.get("file", "") if not cache_file or not os.path.isfile(cache_file): logger.debug(f" 跳过无效缓存: {os.path.basename(template_path)}") continue texts = entry.get("texts", {}) extent = entry.get("extent") # 反序列化 extent if extent and isinstance(extent, dict): extent = QgsRectangle( extent.get("xmin", 0), extent.get("ymin", 0), extent.get("xmax", 0), extent.get("ymax", 0), ) else: extent = None self._cache[template_path] = { "file": cache_file, "texts": texts, "extent": extent, "layout": entry.get("layout", "A4"), } count += 1 logger.info(f" 磁盘缓存加载: {count} 个模板") return count def cleanup(self) -> None: """清理内存缓存(磁盘缓存保留)""" self._cache.clear() logger.info("已清理内存缓存") # ── 内部方法 ── @staticmethod def _path_key(template_path: str) -> str: """模板路径 → 缓存文件名 key""" return hashlib.md5(template_path.encode()).hexdigest()[:16] def _update_index(self, template_path: str, entry: dict) -> None: """更新磁盘索引文件""" index = {} if os.path.isfile(_INDEX_FILE): try: with open(_INDEX_FILE, "r", encoding="utf-8") as f: index = json.load(f) except (json.JSONDecodeError, OSError): pass index.setdefault("templates", {})[template_path] = { "file": entry["file"], "layout": entry.get("layout", "A4"), "texts": entry.get("texts", {}), "extent": ( { "xmin": entry["extent"].xMinimum(), "ymin": entry["extent"].yMinimum(), "xmax": entry["extent"].xMaximum(), "ymax": entry["extent"].yMaximum(), } if entry.get("extent") else None ), } with open(_INDEX_FILE, "w", encoding="utf-8") as f: json.dump(index, f, ensure_ascii=False, indent=2)