Files
xian_algorithm_new/app/services/qgis/template_modifier.py
T
2026-06-19 17:04:03 +08:00

111 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
模板 XML 修改模块。在 project.read() 之前:
1. 替换 PostgreSQL 连接参数(host/port/dbname/schema
2. 将静态底图的 datasource 替换为本地 GeoPackage 路径
"""
import os
import re
import zipfile
import tempfile
from app.config.paths import get_logger
logger = get_logger("qgis.modifier")
class TemplateModifier:
def __init__(self, config: dict):
self.config = config
self._static_map = self._build_static_map()
def _build_static_map(self) -> dict[str, str]:
"""构建 table_key → gpkg_abs_path 的映射"""
sl = self.config.get("static_layers")
if not sl or not sl.get("enabled", False):
return {}
gpkg_dir = sl["gpkg_dir"]
mapping = {}
for info in sl["layers"].values():
schema, table = info["table"].split(".", 1)
xml_key = f'table="{schema}"."{table}"'
gpkg_path = os.path.join(gpkg_dir, info["file"]).replace("\\", "/")
mapping[xml_key] = gpkg_path
return mapping
def modify(self, template_path: str) -> str:
"""修改模板文件,返回修改后的临时 .qgz 路径"""
override = self.config.get("template_override")
has_override = override and override.get("enabled", False)
has_static = bool(self._static_map)
if not has_override and not has_static:
return template_path
orig = override["original"] if has_override else None
actual = override["actual"] if has_override else None
try:
tmp = tempfile.NamedTemporaryFile(
suffix=".qgz", delete=False,
dir=os.path.dirname(template_path),
)
tmp_path = tmp.name
tmp.close()
datasource_re = re.compile(r"(<datasource>)(.*?)(</datasource>)", re.DOTALL)
table_re = re.compile(r'table="(\w+)"\."(\w+)"')
with zipfile.ZipFile(template_path, "r") as zin, \
zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zout:
for item in zin.infolist():
data = zin.read(item.filename)
if item.filename.endswith(".qgs"):
content = data.decode("utf-8")
# 替换 host/port/dbname/schema
if orig and actual:
content = re.sub(
rf"(host\s*=\s*['\"])({re.escape(orig['host'])})(['\"])",
rf"\g<1>{actual['host']}\3",
content,
)
content = re.sub(
rf"(port\s*=\s*['\"])({re.escape(str(orig['port']))})(['\"])",
rf"\g<1>{actual['port']}\3",
content,
)
content = re.sub(
rf"(dbname\s*=\s*['\"])({re.escape(orig['dbname'])})(['\"])",
rf"\g<1>{actual['dbname']}\3",
content,
)
content = content.replace(
f'table="{orig["schema"]}".',
f'table="{actual["schema"]}".',
)
# 替换静态底图 datasource
if self._static_map:
def _replace(m):
tbl = table_re.search(m.group(2))
if tbl:
key = f'table="{tbl.group(1)}"."{tbl.group(2)}"'
if key in self._static_map:
return f"{m.group(1)}{self._static_map[key]}{m.group(3)}"
return m.group(0)
content = datasource_re.sub(_replace, content)
data = content.encode("utf-8")
zout.writestr(item, data)
logger.info(f"模板已修改并写入: {tmp_path}")
return tmp_path
except Exception as e:
logger.error(f"模板修改失败,将使用原始模板: {e}")
return template_path