diff --git a/.gitignore b/.gitignore
index 555c24f..923f8b8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,3 +55,7 @@ htmlcov/
# Ignore dynaconf secret files
.secrets.*
/test/
+
+# QGIS 临时模板文件
+app/data/template/*/tmp*.qgz
+tmp*.qgz
diff --git a/app/api/qgis_map_export.py b/app/api/qgis_map_export.py
index ea49d8f..b153a5a 100644
--- a/app/api/qgis_map_export.py
+++ b/app/api/qgis_map_export.py
@@ -17,6 +17,7 @@ from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from app.config.paths import get_logger
+from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
from app.repositories import qgis_repository
from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest
from app.utils.api_deps import get_prediction_semaphore
@@ -100,44 +101,77 @@ def _build_map_title(event_type: str, condition: dict, template_name: str) -> st
return f"{prefix}{template_name}"
+def _build_info_text(event_type: str, condition: dict, occurred_time) -> str:
+ """
+ 构建信息面板文本(左上角橙色矩形区域)。
+
+ 暴雨:时间 + 累计降雨量(如有)+ 已持续(如有)
+ 地震:时间 + 震级(如有)+ 位置(如有)
+ 不显示灾害类型标签(暴雨/地震)。
+ """
+ lines = []
+
+ # 时间
+ if isinstance(occurred_time, datetime):
+ time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分"
+ elif occurred_time:
+ time_str = str(occurred_time)
+ else:
+ time_str = datetime.now().strftime("%Y年%m月%d日%H时%M分")
+ lines.append(f"时间:{time_str}")
+
+ if event_type == "rainfall":
+ rainfall = condition.get("rainfall")
+ if rainfall is not None and rainfall != "":
+ lines.append(f"累计降雨量:{float(rainfall)}mm")
+
+ duration = condition.get("duration")
+ if duration is not None and duration != "":
+ lines.append(f"已持续:{duration}")
+
+ elif event_type == "earthquake":
+ magnitude = condition.get("magnitude")
+ if magnitude is not None and magnitude != "":
+ lines.append(f"震级:{float(magnitude)}级")
+
+ lon = condition.get("epicenter_lon")
+ lat = condition.get("epicenter_lat")
+ if lon is not None and lat is not None:
+ lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
+
+ return "\n".join(lines)
+
+
def _derive_model_params(
- inference: dict, batch_folder: str, out_filename: str
+ inference: dict, batch_folder: str, template_path: str
) -> dict:
- """从推理结果 + 配置文件推导专题图生成所需的全部参数。"""
+ """从推理结果 + 模板路径推导专题图生成所需的全部参数。"""
event_type = inference["event_type"]
condition = inference["condition"]
occurred_time = inference["occurred_time"]
+ map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
+
map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3")
zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11")
zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50")
- map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
- template_base = os.path.join(
- os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
- "app", "data", "template"
- )
-
- template_path = os.path.join(template_base, event_type, f"{map_layout}.qgz")
-
- template_name_map = {"earthquake": "地震专题图", "rainfall": "降雨专题图"}
- template_name = template_name_map.get(event_type, f"{event_type}专题图")
+ # 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀)
+ template_name = os.path.splitext(os.path.basename(template_path))[0]
map_title = _build_map_title(event_type, condition, template_name)
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
- if occurred_time and isinstance(occurred_time, datetime):
- map_time = occurred_time.strftime("%Y-%m-%d %H:%M")
- elif occurred_time:
- map_time = str(occurred_time)
- else:
- map_time = datetime.now().strftime("%Y-%m-%d %H:%M")
+ # 制图时间统一用当前日期
+ map_time = datetime.now().strftime("%Y-%m-%d")
center_x, center_y = _extract_center_from_condition(
event_type, condition
)
+ info_text = _build_info_text(event_type, condition, occurred_time)
+
return {
"name": f"inference_{inference['id']}",
"path": template_path,
@@ -146,7 +180,7 @@ def _derive_model_params(
"mapTitle": map_title,
"mapTime": map_time,
"mapUint": map_unit,
- "info": "",
+ "info": info_text,
"centerX": center_x,
"centerY": center_y,
"event": str(inference["id"]),
@@ -172,11 +206,7 @@ def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
- gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg")
- project_root = os.path.dirname(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- )
- gpkg_dir = os.path.join(project_root, gpkg_subdir).replace("\\", "/")
+ gpkg_dir = get_gpkg_dir()
return {
"db": {
@@ -202,41 +232,15 @@ def _build_qgis_config(batch_folder: str) -> dict:
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT", ""),
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME", "xian_new"),
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA", "qgis"),
+ "username": getattr(settings, "DB_USER", "postgres"),
+ "password": getattr(settings, "DB_PASSWORD", ""),
},
},
- "static_layers": _build_static_layers_config(gpkg_dir),
+ "static_layers": build_static_layers_config(gpkg_dir),
"batch_folder": batch_folder,
}
-def _build_static_layers_config(gpkg_dir: str) -> dict:
- """构建静态底图配置"""
- layer_defs = {
- "水库": ("qgis.rivers", "rivers.gpkg"),
- "市州驻地": ("qgis.sx_capital", "sx_capital.gpkg"),
- "河流": ("qgis.river", "river.gpkg"),
- "active_fault": ("qgis.active_fault", "active_fault.gpkg"),
- "陕西省": ("qgis.sx", "sx.gpkg"),
- "乡镇驻地": ("qgis.sx_street", "sx_street.gpkg"),
- "区县驻地": ("qgis.sx_xa_county", "sx_xa_county.gpkg"),
- "县界": ("qgis.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
- "周边区县": ("qgis.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
- "周边市州": ("qgis.sx_zb_city", "sx_zb_city.gpkg"),
- "周边县区": ("qgis.sx_zb_county", "sx_zb_county.gpkg"),
- "traffic_expressway": ("qgis.traffic_expressway", "traffic_expressway.gpkg"),
- "traffic_provincial": ("qgis.traffic_provincial", "traffic_provincial.gpkg"),
- "traffic_railway": ("qgis.traffic_railway", "traffic_railway.gpkg"),
- "traffic_township": ("qgis.traffic_township", "traffic_township.gpkg"),
- "traffic_trunk_line": ("qgis.traffic_trunk_line", "traffic_trunk_line.gpkg"),
- }
-
- layers = {}
- for name, (table, gpkg_file) in layer_defs.items():
- layers[name] = {"file": gpkg_file, "table": table}
-
- return {"enabled": True, "gpkg_dir": gpkg_dir, "layers": layers}
-
-
# ============================================================
# 接口实现
# ============================================================
@@ -246,29 +250,30 @@ async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 occurred_time 视为同一场灾害,共享文件夹
"""
- from app.services.qgis.qgis_env import is_qgis_ready
- if not is_qgis_ready():
- raise HTTPException(status_code=503, detail="QGIS 环境未初始化,专题图功能不可用")
+ from app.services.qgis.qgis_env import is_qgis_available
+ if not is_qgis_available():
+ raise HTTPException(status_code=503, detail="QGIS 环境不可用(未找到 QGIS Python 3.12 解释器)")
semaphore = get_prediction_semaphore()
async with semaphore:
- simulation_id = req.simulationId
+ inference_id = req.inferenceId
loop = asyncio.get_event_loop()
try:
# 查询推理记录,获取 occurred_time
inference = await loop.run_in_executor(
- None, qgis_repository.query_inference_result, simulation_id
+ None, qgis_repository.query_inference_result, inference_id
)
# 将 occurred_time 格式化为时间戳作为文件夹名
disaster_time = format_disaster_time(inference["occurred_time"])
+ event_type = inference["event_type"]
# 构建批次文件夹路径
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files")
- output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:disasterTime")
- output_dir = output_tmpl.replace(":disasterTime", disaster_time)
+ output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:disasterTime")
+ output_dir = output_tmpl.replace(":eventType", event_type).replace(":disasterTime", disaster_time)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
# 去重检查:同一 disasterTime 只产图一次
@@ -318,29 +323,41 @@ async def export_map(req: QgisMapExportRequest):
# 推导参数 + 构建配置
os.makedirs(batch_folder, exist_ok=True)
- event_type = inference["event_type"]
- template_name_map = {"earthquake": "地震专题图", "rainfall": "降雨专题图"}
- template_name = template_name_map.get(event_type, f"{event_type}专题图")
- out_filename = f"{template_name}.jpg"
-
- model = _derive_model_params(inference, batch_folder, out_filename)
config = _build_qgis_config(batch_folder)
- logger.info(
- f"模板路径: {model['path']}, "
- f"输出: {model['outFile']}, "
- f"标题: {model['mapTitle']}, "
- f"中心: ({model['centerX']}, {model['centerY']})"
+ # 扫描模板文件夹下所有 .qgz 文件
+ template_base = os.path.join(
+ os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
+ "app", "data", "template"
)
+ template_dir = os.path.join(template_base, event_type)
+ template_files = sorted([
+ f for f in os.listdir(template_dir)
+ if f.endswith(".qgz") and not f.startswith("tmp")
+ ])
- # 提交到线程池
- _thread_pool.submit(
- _generate_single_map, model, config, disaster_time
- )
+ if not template_files:
+ raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
+
+ # 构建所有模型参数(批量模式)
+ models = []
+ for tpl_file in template_files:
+ tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
+ model = _derive_model_params(inference, batch_folder, tpl_path)
+ logger.info(
+ f"模板: {tpl_path}, "
+ f"输出: {model['outFile']}, "
+ f"标题: {model['mapTitle']}, "
+ f"中心: ({model['centerX']}, {model['centerY']})"
+ )
+ models.append(model)
+
+ # 一次性提交所有模型到 QGIS 子进程(单次 DLL 加载)
+ _generate_batch_maps(models, config, disaster_time)
return QgisMapExportResponse(
code=200,
- message="任务已提交",
+ message=f"任务已完成,共{len(models)}张专题图",
data=disaster_time,
)
@@ -357,22 +374,85 @@ async def export_map(req: QgisMapExportRequest):
pass
-def _generate_single_map(model: dict, config: dict, disaster_time: str) -> None:
- """线程池工作函数:在工作线程中生成单张专题图"""
- from app.services.qgis.map_service import MapService
+def _generate_batch_maps(models: list, config: dict, disaster_time: str) -> None:
+ """通过 QGIS Python 3.12 子进程批量生成专题图(单次 DLL 加载)"""
+ import json
+ import subprocess
+ import tempfile
+ from app.services.qgis.qgis_env import build_qgis_command
try:
- logger.info(f"[线程池] 开始产图: {model['mapTitle']} → {model['outFile']}")
- service = MapService(config)
- service.generate(model)
+ logger.info(f"[批量产图] 开始: {len(models)} 张专题图, batch={disaster_time}")
- if os.path.exists(model["outFile"]):
- logger.info(f"[线程池] 产图成功: {model['outFile']}")
+ # 构建子进程请求 JSON —— 批量格式
+ request_data = json.dumps(
+ {"config": config, "models": models},
+ ensure_ascii=False,
+ )
+
+ # 将请求 JSON 写入临时文件(避免 stdin 管道问题)
+ tmp_json = tempfile.NamedTemporaryFile(
+ suffix=".json", delete=False, mode="w", encoding="utf-8"
+ )
+ tmp_json.write(request_data)
+ tmp_json.close()
+
+ try:
+ from config import settings
+ qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
+ cmd = build_qgis_command(qgis_root)
+ cmd.append(tmp_json.name)
+
+ logger.info(f"[批量产图] 启动 QGIS 子进程: {' '.join(cmd[:3])}...")
+
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ timeout=300, # 5 分钟超时(批量处理多个模板)
+ )
+ finally:
+ try:
+ os.remove(tmp_json.name)
+ except OSError:
+ pass
+
+ if result.returncode != 0:
+ stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
+ logger.error(f"[批量产图] QGIS 子进程失败 (exit={result.returncode}):")
+ for line in stderr_text.split("\n"):
+ logger.error(f" {line}")
+ raise RuntimeError(
+ f"QGIS 子进程失败: {stderr_text[:300]}"
+ )
+
+ # 解析子进程输出
+ stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
+ if stdout_text:
+ for line in reversed(stdout_text.split("\n")):
+ line = line.strip()
+ if line.startswith("{"):
+ output = json.loads(line)
+ batch_results = output.get("results", [])
+ success_count = sum(1 for r in batch_results if r.get("success"))
+ fail_count = len(batch_results) - success_count
+ logger.info(
+ f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}"
+ )
+ for r in batch_results:
+ if r.get("success"):
+ logger.info(f" ✓ {r.get('output', 'N/A')}")
+ else:
+ logger.error(f" ✗ {r.get('error', 'unknown')}")
+ break
+ else:
+ logger.warning("[批量产图] 子进程输出中未找到 JSON 结果")
else:
- logger.error(f"[线程池] 产图后文件不存在: {model['outFile']}")
+ logger.warning("[批量产图] 子进程无输出,但 exit code = 0")
+ except subprocess.TimeoutExpired:
+ logger.error(f"[批量产图] QGIS 子进程超时 (300s)")
except Exception as e:
- logger.error(f"[线程池] 产图失败: {model['name']} — {e}", exc_info=True)
+ logger.error(f"[批量产图] 产图失败: {e}", exc_info=True)
# ============================================================
diff --git a/app/config/qgis_mappings.py b/app/config/qgis_mappings.py
new file mode 100644
index 0000000..5e6d782
--- /dev/null
+++ b/app/config/qgis_mappings.py
@@ -0,0 +1,108 @@
+"""
+QGIS 图层映射配置。
+
+所有模板→目标库的表名/图层映射集中管理,避免散落在业务代码中。
+"""
+from pathlib import Path
+import os
+
+# ============================================================
+# 静态底图映射:模板中 PostgreSQL 图层 → 本地 GPKG 文件
+# key: 模板图层名(用于匹配 maplayer/layername)
+# table: 原始数据源表名(schema.table 格式,用于匹配 datasource 替换)
+# gpkg: 本地 GPKG 文件名(相对于 GPKG 目录)
+# ============================================================
+STATIC_LAYERS = {
+ "水库": {"table": "base.rivers", "gpkg": "rivers.gpkg"},
+ "市州驻地": {"table": "base.sx_capital", "gpkg": "sx_capital.gpkg"},
+ "河流": {"table": "base.river", "gpkg": "river.gpkg"},
+ "active_fault": {"table": "base.active_fault", "gpkg": "active_fault.gpkg"},
+ "陕西省": {"table": "base.sx", "gpkg": "sx.gpkg"},
+ "乡镇驻地": {"table": "base.sx_street", "gpkg": "sx_street.gpkg"},
+ "区县驻地": {"table": "base.sx_xa_county", "gpkg": "sx_xa_county.gpkg"},
+ "县界": {"table": "base.sx_xa_county_boundary", "gpkg": "sx_xa_county_boundary.gpkg"},
+ "周边区县": {"table": "base.sx_zb_county_boundary", "gpkg": "sx_zb_county_boundary.gpkg"},
+ "周边市州": {"table": "base.sx_zb_city", "gpkg": "sx_zb_city.gpkg"},
+ "周边县区": {"table": "base.sx_zb_county", "gpkg": "sx_zb_county.gpkg"},
+ "traffic_expressway": {"table": "base.traffic_expressway", "gpkg": "traffic_expressway.gpkg"},
+ "traffic_provincial": {"table": "base.traffic_provincial", "gpkg": "traffic_provincial.gpkg"},
+ "traffic_railway": {"table": "base.traffic_railway", "gpkg": "traffic_railway.gpkg"},
+ "traffic_township": {"table": "base.traffic_township", "gpkg": "traffic_township.gpkg"},
+ "traffic_trunk_line": {"table": "base.traffic_trunk_line", "gpkg": "traffic_trunk_line.gpkg"},
+}
+
+# ============================================================
+# OGR 本地图层 → PostgreSQL 转换
+# 设计师用本地 JSON/shp 文件制作模板,数据已迁移到 qgis schema
+# key: layername(模板 datasource 中 |layername=xxx 的值)
+# value: qgis schema 中的目标表名
+# ============================================================
+OGR_TO_POSTGRES = {
+ "公园": "shelter_park",
+ "学校": "shelter_school",
+ "文化馆": "shelter_cultural",
+ "人防设施": "shelter_defence",
+ "体育馆": "shelter_gymnasium",
+ "广场": "shelter_square",
+ "住宿": "shelter_stay",
+}
+
+# ============================================================
+# 表名映射:模板中引用的表名 → 目标库中实际表名
+# (模板由多个源库的表拼合,迁移后表名可能不同)
+# ============================================================
+TABLE_RENAMES = {
+ "hazard_waterlogging": "hazard_hydrops", # 积水点
+}
+
+# ============================================================
+# Schema 替换:模板源 schema → 目标 schema
+# ============================================================
+SCHEMA_REPLACEMENTS = ["base", "kspg", "dzxx"]
+
+# ============================================================
+# 图层过滤配置(layer_filter.py 使用)
+# ============================================================
+EVENT_LAYERS = ["eqcenter", "震中"] # 按 event 字段过滤
+QUEUE_LAYERS = [
+ "intensity", "intensity_mian",
+ "dz_ryss", "dz_jjss", "dz_rysw", "dz_jzph", "dz_xzjl",
+] # 按 eqqueue_id 字段过滤
+
+# ============================================================
+# GPKG 目录路径(项目根目录相对路径)
+# ============================================================
+GPKG_SUBDIR = "app/data/gpkg"
+
+
+def get_gpkg_dir(project_root: str = None) -> str:
+ """获取 GPKG 目录绝对路径"""
+ if project_root is None:
+ project_root = os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ )
+ gpkg_dir = os.path.join(project_root, GPKG_SUBDIR)
+ return os.path.normpath(gpkg_dir).replace("\\", "/")
+
+
+def build_static_layers_config(gpkg_dir: str = None) -> dict:
+ """
+ 构建 template_modifier / qgis_runner 使用的静态底图配置。
+
+ 返回格式与原有 template_override 的 static_layers 段兼容。
+ """
+ if gpkg_dir is None:
+ gpkg_dir = get_gpkg_dir()
+
+ layers = {}
+ for name, info in STATIC_LAYERS.items():
+ layers[name] = {
+ "file": info["gpkg"],
+ "table": info["table"],
+ }
+
+ return {
+ "enabled": True,
+ "gpkg_dir": gpkg_dir,
+ "layers": layers,
+ }
diff --git a/app/core/dependency_manager.py b/app/core/dependency_manager.py
index c724e8a..00d10ba 100644
--- a/app/core/dependency_manager.py
+++ b/app/core/dependency_manager.py
@@ -78,13 +78,13 @@ def check_dependencies(project_root: Path):
],
check=True
)
- print("✓ 依赖安装完成(虚拟环境)")
+ print("[OK] 依赖安装完成(虚拟环境)")
else:
- print("✓ 所有依赖已安装(虚拟环境)")
-
+ print("[OK] 所有依赖已安装(虚拟环境)")
+
except subprocess.CalledProcessError as e:
- print(f"✗ 依赖检查/安装失败: {e}")
+ print(f"[FAIL] 依赖检查/安装失败: {e}")
sys.exit(1)
except Exception as e:
- print(f"✗ 依赖检查出错: {e}")
+ print(f"[FAIL] 依赖检查出错: {e}")
sys.exit(1)
diff --git a/app/core/env_checker.py b/app/core/env_checker.py
index 1bde570..62829f8 100644
--- a/app/core/env_checker.py
+++ b/app/core/env_checker.py
@@ -32,7 +32,7 @@ def check_environment():
if major == 3 and minor == 10:
return True
else:
- print(f"✗ Python版本不符合要求!")
+ print(f"[FAIL] Python版本不符合要求!")
print(f" 当前版本: {python_version}")
print(f" 要求版本: 3.10.x")
print(f"\n请使用 Python 3.10 版本运行此项目")
diff --git a/app/core/launcher.py b/app/core/launcher.py
index 45a17ff..184a7aa 100644
--- a/app/core/launcher.py
+++ b/app/core/launcher.py
@@ -70,7 +70,7 @@ class AppLauncher:
# 启动应用
print("\n" + "=" * 50)
- print("✓ 所有检查通过,准备启动应用...")
+ print("[OK] 所有检查通过,准备启动应用...")
print("=" * 50)
# 延迟导入logger
from app.utils.logger import get_logger
@@ -79,7 +79,10 @@ class AppLauncher:
start()
except Exception as e:
- self.logger.error(f"启动失败: {e}")
+ if self.logger:
+ self.logger.error(f"启动失败: {e}")
+ else:
+ print(f"[FAIL] 启动失败: {e}")
sys.exit(1)
diff --git a/app/core/server.py b/app/core/server.py
index 8b97fa1..e0568fd 100644
--- a/app/core/server.py
+++ b/app/core/server.py
@@ -22,24 +22,24 @@ async def lifespan(app: FastAPI):
get_earthquake_model()
logger.info("DBN模型预加载完成")
- # 初始化 QGIS 环境
+ # 检测 QGIS 子进程环境
qgis_root = getattr(settings, "QGIS_ROOT", None)
if qgis_root:
try:
- from app.services.qgis.qgis_env import init_qgis_env
- init_qgis_env(qgis_root)
- logger.info("QGIS 环境初始化完成")
+ from app.services.qgis.qgis_env import is_qgis_available
+ if is_qgis_available(qgis_root):
+ logger.info("QGIS 环境检测通过(子进程模式)")
+ else:
+ logger.warning("QGIS 环境不可用(未找到 Python 3.12 解释器),专题图功能降级")
except Exception as e:
- logger.error(f"QGIS 环境初始化失败(专题图功能不可用): {e}")
+ logger.error(f"QGIS 环境检测失败: {e}")
yield
- # 清理 QGIS 资源
+ # 清理资源
try:
- from app.services.qgis.qgis_env import cleanup_qgis_env
- from app.services.qgis.map_service import template_cache
- template_cache.cleanup()
- cleanup_qgis_env()
+ from app.api.qgis_map_export import shutdown_thread_pool
+ shutdown_thread_pool()
except Exception:
pass
diff --git a/app/core/venv_manager.py b/app/core/venv_manager.py
index 65fba56..7bc2954 100644
--- a/app/core/venv_manager.py
+++ b/app/core/venv_manager.py
@@ -31,14 +31,14 @@ def check_virtualenv(project_root: Path) -> bool:
python_exe = venv_path / "bin" / "python3"
if not venv_path.exists():
- print(f"\n⚠ 虚拟环境不存在,正在创建...")
+ print(f"\n[WARN] 虚拟环境不存在,正在创建...")
try:
subprocess.run([sys.executable, "-m", "venv", str(venv_path)], check=True)
- print("✓ 虚拟环境创建成功")
+ print("[OK] 虚拟环境创建成功")
return True # 继续执行后续步骤
except subprocess.CalledProcessError as e:
- print(f"✗ 虚拟环境创建失败: {e}")
+ print(f"[FAIL] 虚拟环境创建失败: {e}")
sys.exit(1)
else:
- print(f"✓ 虚拟环境已存在: {venv_path}")
+ print(f"[OK] 虚拟环境已存在: {venv_path}")
return True
diff --git a/app/data/gpkg/active_fault.gpkg b/app/data/gpkg/active_fault.gpkg
new file mode 100644
index 0000000..cdcaef5
Binary files /dev/null and b/app/data/gpkg/active_fault.gpkg differ
diff --git a/app/data/gpkg/river.gpkg b/app/data/gpkg/river.gpkg
new file mode 100644
index 0000000..21f1efc
Binary files /dev/null and b/app/data/gpkg/river.gpkg differ
diff --git a/app/data/gpkg/rivers.gpkg b/app/data/gpkg/rivers.gpkg
new file mode 100644
index 0000000..cec8de3
Binary files /dev/null and b/app/data/gpkg/rivers.gpkg differ
diff --git a/app/data/gpkg/sx.gpkg b/app/data/gpkg/sx.gpkg
new file mode 100644
index 0000000..4739dea
Binary files /dev/null and b/app/data/gpkg/sx.gpkg differ
diff --git a/app/data/gpkg/sx_capital.gpkg b/app/data/gpkg/sx_capital.gpkg
new file mode 100644
index 0000000..3357556
Binary files /dev/null and b/app/data/gpkg/sx_capital.gpkg differ
diff --git a/app/data/gpkg/sx_street.gpkg b/app/data/gpkg/sx_street.gpkg
new file mode 100644
index 0000000..ab14972
Binary files /dev/null and b/app/data/gpkg/sx_street.gpkg differ
diff --git a/app/data/gpkg/sx_xa_county.gpkg b/app/data/gpkg/sx_xa_county.gpkg
new file mode 100644
index 0000000..c542cf1
Binary files /dev/null and b/app/data/gpkg/sx_xa_county.gpkg differ
diff --git a/app/data/gpkg/sx_xa_county_boundary.gpkg b/app/data/gpkg/sx_xa_county_boundary.gpkg
new file mode 100644
index 0000000..f1aea2b
Binary files /dev/null and b/app/data/gpkg/sx_xa_county_boundary.gpkg differ
diff --git a/app/data/gpkg/sx_zb_city.gpkg b/app/data/gpkg/sx_zb_city.gpkg
new file mode 100644
index 0000000..4560f90
Binary files /dev/null and b/app/data/gpkg/sx_zb_city.gpkg differ
diff --git a/app/data/gpkg/sx_zb_county.gpkg b/app/data/gpkg/sx_zb_county.gpkg
new file mode 100644
index 0000000..6d0ad32
Binary files /dev/null and b/app/data/gpkg/sx_zb_county.gpkg differ
diff --git a/app/data/gpkg/sx_zb_county_boundary.gpkg b/app/data/gpkg/sx_zb_county_boundary.gpkg
new file mode 100644
index 0000000..bedbac3
Binary files /dev/null and b/app/data/gpkg/sx_zb_county_boundary.gpkg differ
diff --git a/app/data/gpkg/traffic_expressway.gpkg b/app/data/gpkg/traffic_expressway.gpkg
new file mode 100644
index 0000000..6a8559f
Binary files /dev/null and b/app/data/gpkg/traffic_expressway.gpkg differ
diff --git a/app/data/gpkg/traffic_provincial.gpkg b/app/data/gpkg/traffic_provincial.gpkg
new file mode 100644
index 0000000..93e6e29
Binary files /dev/null and b/app/data/gpkg/traffic_provincial.gpkg differ
diff --git a/app/data/gpkg/traffic_railway.gpkg b/app/data/gpkg/traffic_railway.gpkg
new file mode 100644
index 0000000..7988d75
Binary files /dev/null and b/app/data/gpkg/traffic_railway.gpkg differ
diff --git a/app/data/gpkg/traffic_township.gpkg b/app/data/gpkg/traffic_township.gpkg
new file mode 100644
index 0000000..f51bdd4
Binary files /dev/null and b/app/data/gpkg/traffic_township.gpkg differ
diff --git a/app/data/gpkg/traffic_trunk_line.gpkg b/app/data/gpkg/traffic_trunk_line.gpkg
new file mode 100644
index 0000000..9ae1b27
Binary files /dev/null and b/app/data/gpkg/traffic_trunk_line.gpkg differ
diff --git a/app/repositories/qgis_repository.py b/app/repositories/qgis_repository.py
index 724ce6d..3ee17a4 100644
--- a/app/repositories/qgis_repository.py
+++ b/app/repositories/qgis_repository.py
@@ -20,11 +20,11 @@ class QgisRepository:
row = rows[0]
return {
- "id": row[0],
- "name": row[1] or "",
- "event_type": row[2] or "",
- "occurred_time": row[3],
- "condition": row[4] if isinstance(row[4], dict) else {},
+ "id": row["id"],
+ "name": row["name"] or "",
+ "event_type": row["event_type"] or "",
+ "occurred_time": row["occurred_time"],
+ "condition": row["condition"] if isinstance(row["condition"], dict) else {},
}
diff --git a/app/script/export_static_layers.py b/app/script/export_static_layers.py
index 03727c6..ac8a346 100644
--- a/app/script/export_static_layers.py
+++ b/app/script/export_static_layers.py
@@ -1,10 +1,13 @@
"""
一次性脚本:从 PostgreSQL 导出静态底图为 GeoPackage 文件。
+优先使用 ogr2ogr(QGIS 自带),回退到 geopandas。
+
运行一次即可,之后服务直接读本地 GPKG。
用法: python -m app.script.export_static_layers
"""
import os
+import subprocess
import sys
import time
@@ -13,68 +16,159 @@ project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(_
if project_root not in sys.path:
sys.path.insert(0, project_root)
-import geopandas as gpd
-from sqlalchemy import create_engine
-from config import settings
+# ============================================================
+# 配置
+# ============================================================
-# GPKG 输出目录(相对于项目根目录)
-GPKG_DIR = os.path.join(project_root, getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg"))
+QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
+ogr2ogr_path = os.path.join(QGIS_ROOT, "bin", "ogr2ogr.exe")
-# 静态图层定义: {显示名: (schema.table, geom_column, srid)}
-STATIC_LAYERS = {
- "水库": ("qgis.rivers", "Geometry", 0),
- "市州驻地": ("qgis.sx_capital", "geometry", 0),
- "河流": ("qgis.river", "Geometry", 0),
- "active_fault": ("qgis.active_fault", "Geometry", 0),
- "陕西省": ("qgis.sx", "Geometry", 0),
- "乡镇驻地": ("qgis.sx_street", "dgeom", 0),
- "区县驻地": ("qgis.sx_xa_county", "Geometry", 0),
- "县界": ("qgis.sx_xa_county_boundary", "Geometry", 0),
- "周边区县": ("qgis.sx_zb_county_boundary", "Geometry", 0),
- "周边市州": ("qgis.sx_zb_city", "Geometry", 4326),
- "周边县区": ("qgis.sx_zb_county", "Geometry", 0),
- "traffic_expressway": ("qgis.traffic_expressway", "Geometry", 0),
- "traffic_provincial": ("qgis.traffic_provincial", "Geometry", 0),
- "traffic_railway": ("qgis.traffic_railway", "Geometry", 0),
- "traffic_township": ("qgis.traffic_township", "Geometry", 0),
- "traffic_trunk_line": ("qgis.traffic_trunk_line", "Geometry", 0),
-}
+GPKG_DIR = os.path.join(project_root, "app", "data", "gpkg")
+
+# 静态图层定义: {显示名: (schema.table, gpkg文件名)}
+STATIC_LAYERS = [
+ ("水库", "qgis.rivers", "rivers.gpkg"),
+ ("市州驻地", "qgis.sx_capital", "sx_capital.gpkg"),
+ ("河流", "qgis.river", "river.gpkg"),
+ ("active_fault", "qgis.active_fault", "active_fault.gpkg"),
+ ("陕西省", "qgis.sx", "sx.gpkg"),
+ ("乡镇驻地", "qgis.sx_street", "sx_street.gpkg"),
+ ("区县驻地", "qgis.sx_xa_county", "sx_xa_county.gpkg"),
+ ("县界", "qgis.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
+ ("周边区县", "qgis.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
+ ("周边市州", "qgis.sx_zb_city", "sx_zb_city.gpkg"),
+ ("周边县区", "qgis.sx_zb_county", "sx_zb_county.gpkg"),
+ ("traffic_expressway", "qgis.traffic_expressway", "traffic_expressway.gpkg"),
+ ("traffic_provincial", "qgis.traffic_provincial", "traffic_provincial.gpkg"),
+ ("traffic_railway", "qgis.traffic_railway", "traffic_railway.gpkg"),
+ ("traffic_township", "qgis.traffic_township", "traffic_township.gpkg"),
+ ("traffic_trunk_line", "qgis.traffic_trunk_line", "traffic_trunk_line.gpkg"),
+]
-def export_layers():
- os.makedirs(GPKG_DIR, exist_ok=True)
+# ============================================================
+# 方法一: ogr2ogr(推荐,QGIS 自带)
+# ============================================================
- conn_str = (
- f"postgresql://{settings.DB_USER}:{settings.DB_PASSWORD}"
- f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
- )
+def _setup_gdal_env():
+ """设置 GDAL/OGR 运行环境"""
+ gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal")
+ gdal_lib = os.path.join(QGIS_ROOT, "apps", "gdal", "lib")
+ gdal_bin = os.path.join(QGIS_ROOT, "apps", "gdal", "bin")
+
+ if os.path.isdir(gdal_data):
+ os.environ["GDAL_DATA"] = gdal_data
+ os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
+
+ paths_to_add = [p for p in [gdal_bin, gdal_lib] if os.path.isdir(p)]
+ os.environ["PATH"] = ";".join(paths_to_add) + ";" + os.environ.get("PATH", "")
+
+
+def _export_with_ogr2ogr(host, port, dbname, user, password, schema, table, gpkg_path):
+ """用 ogr2ogr 导出单个图层"""
+ conn = f"PG:host={host} port={port} dbname={dbname} user={user} password={password}"
+ cmd = [
+ ogr2ogr_path,
+ "-f", "GPKG",
+ gpkg_path,
+ conn,
+ "-sql", f'SELECT * FROM "{schema}"."{table}"',
+ "-nln", table,
+ "-overwrite",
+ "-t_srs", "EPSG:4326",
+ ]
+ result = subprocess.run(cmd, capture_output=True, timeout=120)
+ if result.returncode != 0:
+ stderr = result.stderr.decode("utf-8", errors="replace").strip()
+ raise RuntimeError(stderr[:300])
+ return True
+
+
+# ============================================================
+# 方法二: geopandas(回退)
+# ============================================================
+
+def _export_with_geopandas(host, port, dbname, user, password, schema, table, gpkg_path):
+ """用 geopandas 导出单个图层"""
+ import geopandas as gpd
+ from sqlalchemy import create_engine
+
+ conn_str = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
engine = create_engine(conn_str)
- for layer_name, (table_ref, geom_col, srid) in STATIC_LAYERS.items():
- t0 = time.time()
+ gdf = gpd.read_postgis(
+ f'SELECT * FROM "{schema}"."{table}"',
+ engine,
+ geom_col="Geometry",
+ )
+ if gdf.crs is None:
+ gdf = gdf.set_crs(epsg=4326)
+
+ gdf.to_file(gpkg_path, driver="GPKG")
+ engine.dispose()
+ return len(gdf)
+
+
+# ============================================================
+# 主入口
+# ============================================================
+
+def main():
+ # 从 config.settings 读取数据库配置
+ try:
+ from config import settings
+ host = getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST",
+ getattr(settings, "DB_HOST", "47.92.216.173"))
+ port = str(getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT",
+ getattr(settings, "DB_PORT", 7654)))
+ dbname = getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME",
+ getattr(settings, "DB_NAME", "xian_new"))
+ user = getattr(settings, "DB_USER", "postgres")
+ password = getattr(settings, "DB_PASSWORD", "zhangsan")
+ except Exception:
+ host, port, dbname, user, password = "47.92.216.173", "7654", "xian_new", "postgres", "zhangsan"
+
+ os.makedirs(GPKG_DIR, exist_ok=True)
+
+ # 选择导出方法
+ use_ogr2ogr = os.path.isfile(ogr2ogr_path)
+ method = "ogr2ogr" if use_ogr2ogr else "geopandas"
+
+ if use_ogr2ogr:
+ _setup_gdal_env()
+
+ print(f"数据库: {host}:{port}/{dbname}")
+ print(f"输出目录: {GPKG_DIR}")
+ print(f"导出方法: {method}")
+ print(f"共 {len(STATIC_LAYERS)} 个图层\n")
+
+ success = 0
+ failed = 0
+
+ for name, table_ref, gpkg_file in STATIC_LAYERS:
schema, table = table_ref.split(".", 1)
- print(f"导出 {layer_name} ({table_ref}) ...", end=" ", flush=True)
+ gpkg_path = os.path.join(GPKG_DIR, gpkg_file)
+
+ print(f"[{success + failed + 1}/{len(STATIC_LAYERS)}] {name} ({table_ref})", end=" ... ", flush=True)
+ t0 = time.time()
try:
- sql = f'SELECT * FROM "{schema}"."{table}"'
- gdf = gpd.read_postgis(sql, engine, geom_col=geom_col)
-
- if srid and srid > 0:
- gdf = gdf.set_crs(epsg=srid, allow_override=True)
- elif gdf.crs is None:
- gdf = gdf.set_crs(epsg=4326)
-
- gpkg_path = os.path.join(GPKG_DIR, f"{table}.gpkg")
- gdf.to_file(gpkg_path, driver="GPKG")
-
- print(f"✓ {len(gdf)} 行, {time.time() - t0:.1f}s")
-
+ if use_ogr2ogr:
+ _export_with_ogr2ogr(host, port, dbname, user, password, schema, table, gpkg_path)
+ size_kb = os.path.getsize(gpkg_path) / 1024
+ print(f"✓ {size_kb:.0f} KB, {time.time() - t0:.1f}s")
+ else:
+ count = _export_with_geopandas(host, port, dbname, user, password, schema, table, gpkg_path)
+ print(f"✓ {count} 行, {time.time() - t0:.1f}s")
+ success += 1
except Exception as e:
print(f"✗ 失败: {e}")
+ failed += 1
- engine.dispose()
- print(f"\n导出完成!文件目录: {GPKG_DIR}")
+ print(f"\n{'='*50}")
+ print(f"完成: 成功={success}, 失败={failed}, 共={len(STATIC_LAYERS)}")
+ print(f"输出目录: {GPKG_DIR}")
if __name__ == "__main__":
- export_layers()
\ No newline at end of file
+ main()
\ No newline at end of file
diff --git a/app/services/qgis/__init__.py b/app/services/qgis/__init__.py
index 3bc724e..53e4f57 100644
--- a/app/services/qgis/__init__.py
+++ b/app/services/qgis/__init__.py
@@ -1,13 +1,31 @@
-from app.services.qgis.map_service import MapService
-from app.services.qgis.map_exporter import MapExporter
-from app.services.qgis.template_modifier import TemplateModifier
-from app.services.qgis.template_cache import TemplateCache
-from app.services.qgis.layer_filter import LayerFilter
+"""
+QGIS 服务模块。
+
+注意:此包的核心模块(map_service、template_cache 等)依赖 qgis.core,
+仅在 QGIS Python 3.12 子进程中可导入。主进程(Python 3.10)不应直接导入
+这些模块,而应通过 qgis_runner.py 子进程调用。
+
+安全导入(不依赖 qgis.core):
+ - qgis_env: 环境检测与子进程配置
+ - qgis_runner: 子进程入口脚本路径
+"""
+
+def _lazy_import(module_name: str, attr: str):
+ """延迟导入,仅在子进程中实际使用时才加载"""
+ import importlib
+ mod = importlib.import_module(f".{module_name}", package=__name__)
+ return getattr(mod, attr)
+
__all__ = [
- 'MapService',
- 'MapExporter',
- 'TemplateModifier',
- 'TemplateCache',
- 'LayerFilter',
+ "MapService",
+ "MapExporter",
+ "TemplateModifier",
+ "TemplateCache",
+ "LayerFilter",
]
+
+# 不在顶层导入 QGIS 依赖模块,避免主进程崩溃
+# 使用方式:
+# from app.services.qgis.map_service import MapService (仅在子进程中)
+# from app.services.qgis.qgis_env import is_qgis_available (主进程安全)
diff --git a/app/services/qgis/layer_filter.py b/app/services/qgis/layer_filter.py
index 8e975b2..af27295 100644
--- a/app/services/qgis/layer_filter.py
+++ b/app/services/qgis/layer_filter.py
@@ -2,15 +2,10 @@
图层过滤模块。按 event 和 eqqueue_id 筛选要素。
"""
from app.config.paths import get_logger
+from app.config.qgis_mappings import EVENT_LAYERS, QUEUE_LAYERS
logger = get_logger("qgis.filter")
-EVENT_LAYERS = ["eqcenter", "震中"]
-QUEUE_LAYERS = [
- "intensity", "intensity_mian",
- "dz_ryss", "dz_jjss", "dz_rysw", "dz_jzph", "dz_xzjl",
-]
-
class LayerFilter:
def apply(self, project, model: dict) -> None:
diff --git a/app/services/qgis/map_exporter.py b/app/services/qgis/map_exporter.py
index 0282894..fadaf95 100644
--- a/app/services/qgis/map_exporter.py
+++ b/app/services/qgis/map_exporter.py
@@ -15,7 +15,7 @@ class MapExporter:
def update_texts(self, model: dict) -> None:
"""更新布局中的文本标签"""
- for key in ["mapTitle", "mapTime", "mapUnit", "info"]:
+ for key in ["mapTitle", "mapTime", "mapUint", "info"]:
label = self.layout.itemById(key)
if label is not None:
label.setText(model[key])
diff --git a/app/services/qgis/map_service.py b/app/services/qgis/map_service.py
index 5a85338..d26a2ad 100644
--- a/app/services/qgis/map_service.py
+++ b/app/services/qgis/map_service.py
@@ -7,6 +7,8 @@ import time
from qgis.core import QgsProject, QgsDataSourceUri
+from app.config.paths import get_logger
+from app.config.qgis_mappings import TABLE_RENAMES, SCHEMA_REPLACEMENTS
from .template_cache import TemplateCache
from .template_modifier import TemplateModifier
from .layer_filter import LayerFilter
@@ -115,17 +117,32 @@ class MapService:
db_config["username"],
db_config["password"],
)
- # 更新 schema(table="base"."xxx" → table="qgis"."xxx")
+ # 更新 schema
old_uri = uri.uri()
if f'table="{actual_schema}".' not in old_uri:
- new_uri = old_uri.replace('table="base".', f'table="{actual_schema}".')
- if new_uri != old_uri:
- uri = QgsDataSourceUri(new_uri)
+ for old_schema in SCHEMA_REPLACEMENTS:
+ new_uri = old_uri.replace(
+ f'table="{old_schema}".',
+ f'table="{actual_schema}".',
+ )
+ if new_uri != old_uri:
+ uri = QgsDataSourceUri(new_uri)
+ break
+
+ # 表名映射(模板表名 ≠ 目标库表名)
+ uri_str = uri.uri()
+ for old_name, new_name in TABLE_RENAMES.items():
+ full_old = f'table="{actual_schema}"."{old_name}"'
+ full_new = f'table="{actual_schema}"."{new_name}"'
+ if full_old in uri_str:
+ uri_str = uri_str.replace(full_old, full_new)
+ uri = QgsDataSourceUri(uri_str)
layer.setDataSource(uri.uri(), layer.name(), "postgres")
if layer.isValid():
- logger.info(f"图层 {layer.name()} 连接更新成功")
+ fc = layer.featureCount()
+ logger.info(f"图层 {layer.name()} 连接更新成功 ({fc} features)")
else:
logger.error(f"图层 {layer.name()} 更新后仍无效")
except Exception as e:
diff --git a/app/services/qgis/qgis_env.py b/app/services/qgis/qgis_env.py
index 920365e..233023d 100644
--- a/app/services/qgis/qgis_env.py
+++ b/app/services/qgis/qgis_env.py
@@ -1,266 +1,188 @@
"""
-QGIS 环境初始化模块(跨平台:Windows / Linux)。
-在 server.py lifespan 启动阶段调用 init_qgis_env(),
-完成共享库注入、环境变量设置、QgsApplication 初始化。
+QGIS 环境检测与子进程配置模块。
+
+主进程运行在 Python 3.10,无法直接加载 QGIS 的 Python 3.12 C 扩展。
+所有 QGIS 操作通过 subprocess 调用 QGIS Python 3.12 执行。
+
+本模块提供:
+ - get_qgis_python_path(): 检测 QGIS Python 3.12 解释器路径
+ - build_qgis_command(): 构建通过 .bat 启动 QGIS 子进程的命令
+ - is_qgis_available(): 检查 QGIS 是否可用
+ - get_runner_script(): 获取 qgis_runner.py 的路径
"""
import os
-import sys
-import platform
+import tempfile
from pathlib import Path
from app.config.paths import get_logger
logger = get_logger("qgis.env")
-_qgs_app = None
-_initialized = False
-_IS_WINDOWS = platform.system() == "Windows"
-
-def init_qgis_env(qgis_root: str) -> None:
+def get_qgis_python_path(qgis_root: str = None) -> str | None:
"""
- 初始化 QGIS 运行环境。整个应用生命周期只需调用一次。
+ 检测 QGIS 自带的 Python 3.12 解释器路径。
- Args:
- qgis_root: QGIS 安装根目录
- Windows: "D:/QGIS"
- Linux: "/usr" 或 "/opt/QGIS"
+ Windows: {QGIS_ROOT}/apps/Python312/python3.exe
+
+ Returns:
+ 解释器绝对路径,不存在则返回 None
"""
- global _qgs_app, _initialized
- if _initialized:
- return
+ if qgis_root is None:
+ qgis_root = _detect_qgis_root()
+ if qgis_root is None:
+ return None
- qgis_app_dir = _find_qgis_app_dir(qgis_root)
-
- # 共享库搜索路径(平台相关)
- # Windows: os.add_dll_directory() 显式注册 DLL 目录
- # Linux: LD_LIBRARY_PATH 追加 .so 搜索目录
- _add_library_paths(qgis_root, qgis_app_dir)
-
- # 环境变量
- _set_environment(qgis_root, qgis_app_dir)
-
- # Python 模块路径
- _add_python_paths(qgis_root, qgis_app_dir)
-
- # 初始化 QgsApplication
- _init_qgs_application(qgis_app_dir)
-
- _initialized = True
- logger.info(f"QGIS 环境初始化完成 ({platform.system()})")
-
-
-def cleanup_qgis_env() -> None:
- """清理 QGIS 资源(应用退出时调用)"""
- global _qgs_app, _initialized
- if _qgs_app is not None:
- _qgs_app.exitQgis()
- _qgs_app = None
- _initialized = False
- logger.info("QGIS 资源已清理")
-
-
-def is_qgis_ready() -> bool:
- """检查 QGIS 是否已初始化"""
- return _initialized
-
-
-# ─────────────────────────────────────────────────────────────
-# 平台检测
-# ─────────────────────────────────────────────────────────────
-
-def _find_qgis_app_dir(root: str) -> str:
- """
- 自动检测 QGIS 应用目录。
-
- Windows(OSGeo4W 安装): {root}/apps/qgis-ltr/
- Linux(包管理器安装): {root}/share/qgis/(Python 在 {root}/share/qgis/python)
- """
- if _IS_WINDOWS:
- return _find_qgis_app_dir_windows(root)
+ import platform
+ if platform.system() == "Windows":
+ for name in ("python3.exe", "python.exe"):
+ candidate = os.path.join(qgis_root, "apps", "Python312", name)
+ if os.path.isfile(candidate):
+ logger.info(f"检测到 QGIS Python: {candidate}")
+ return candidate
+ logger.warning(f"未找到 QGIS Python 3.12")
+ return None
else:
- return _find_qgis_app_dir_linux(root)
+ import shutil
+ # 优先检查环境变量
+ env_python = os.environ.get("QGIS_PYTHON_PATH")
+ if env_python and os.path.isfile(env_python):
+ logger.info(f"QGIS_PYTHON_PATH 指定: {env_python}")
+ return env_python
-def _find_qgis_app_dir_windows(root: str) -> str:
- """Windows: 在 {root}/apps/ 下查找 qgis* 目录"""
- apps_dir = Path(root) / "apps"
- if apps_dir.is_dir():
- for name in sorted(apps_dir.iterdir()):
- if name.name.startswith("qgis") and name.is_dir():
- logger.info(f"检测到 QGIS 应用目录: {name}")
- return str(name)
- fallback = str(apps_dir / "qgis")
- logger.warning(f"未检测到 qgis* 目录,使用默认路径: {fallback}")
- return fallback
-
-
-def _find_qgis_app_dir_linux(root: str) -> str:
- """
- Linux: 返回 QGIS_PREFIX_PATH。
- 包管理器安装的标准路径:
- Debian/Ubuntu: /usr (qgis-core 在 /usr/lib/qgis/)
- RHEL/CentOS: /usr
- QGIS.org 官方: /usr 或 /opt/QGIS
- """
- candidates = [
- Path(root) / "share" / "qgis", # /usr/share/qgis
- Path(root) / "lib" / "qgis", # /usr/lib/qgis
- Path(root) / "share" / "qgis-ltr", # qgis-ltr 变体
- ]
- for c in candidates:
- if c.is_dir():
- logger.info(f"检测到 QGIS 应用目录: {c}")
- return str(c)
-
- # 回退:用 root 本身(/usr),由 QgsApplication 自行探测
- logger.warning(f"未找到 QGIS 标准目录,使用 root: {root}")
- return str(root)
-
-
-# ─────────────────────────────────────────────────────────────
-# 共享库注入
-# ─────────────────────────────────────────────────────────────
-
-def _add_library_paths(root: str, qgis_app: str) -> None:
- """
- 注入共享库搜索路径。
-
- Windows: 调用 os.add_dll_directory() 显式注册每个 DLL 目录,
- Python 3.8+ 不再自动搜索 PATH 中的 DLL。
- Linux: 追加 LD_LIBRARY_PATH,让动态链接器 (ld-linux) 找到 .so。
- """
- if _IS_WINDOWS:
- _add_dll_directories_windows(root, qgis_app)
- else:
- _add_ld_library_path_linux(root, qgis_app)
-
-
-def _add_dll_directories_windows(root: str, qgis_app: str) -> None:
- """Windows: 显式注册 DLL 搜索目录"""
- if not hasattr(os, "add_dll_directory"):
- return # Python < 3.8
-
- dll_dirs = [
- os.path.join(root, "apps", "Qt5", "bin"),
- os.path.join(qgis_app, "bin"),
- os.path.join(root, "bin"),
- os.path.join(root, "apps", "gdal", "bin"),
- os.path.join(root, "apps", "gdal", "lib"),
- ]
- for d in dll_dirs:
- if os.path.isdir(d):
- os.add_dll_directory(d)
- logger.debug(f"注册 DLL 目录: {d}")
-
-
-def _add_ld_library_path_linux(root: str, qgis_app: str) -> None:
- """
- Linux: 追加 LD_LIBRARY_PATH,使动态链接器找到 QGIS/GDAL 的 .so。
-
- QGIS 包管理器安装时的标准 .so 路径:
- /usr/lib/ — libqgis_core.so, libqgis_analysis.so
- /usr/lib/qgis/ — 插件 .so
- /usr/lib/qgis/plugins/ — provider .so
- /usr/share/qgis/python/ — Python 模块
- """
- ld_dirs = [
- os.path.join(root, "lib"), # /usr/lib
- os.path.join(root, "lib", "qgis"), # /usr/lib/qgis
- os.path.join(root, "lib", "qgis", "plugins"), # /usr/lib/qgis/plugins
- os.path.join(root, "share", "qgis", "lib"), # 某些安装方式
- os.path.join(root, "apps", "gdal", "lib"), # OSGeo4W 风格
- ]
- existing = os.environ.get("LD_LIBRARY_PATH", "")
- new_dirs = [d for d in ld_dirs if os.path.isdir(d) and d not in existing]
- if new_dirs:
- joined = ":".join(new_dirs)
- os.environ["LD_LIBRARY_PATH"] = f"{joined}:{existing}" if existing else joined
- logger.info(f"LD_LIBRARY_PATH 追加: {new_dirs}")
-
-
-# ─────────────────────────────────────────────────────────────
-# 环境变量
-# ─────────────────────────────────────────────────────────────
-
-def _set_environment(root: str, qgis_app: str) -> None:
- """设置 QGIS 和 GDAL 相关环境变量(平台自适应)"""
- env_vars = {
- "QGIS_PREFIX_PATH": qgis_app,
- "PYTHONUTF8": "1",
- "GDAL_FILENAME_IS_UTF8": "YES",
- "VSI_CACHE": "TRUE",
- "VSI_CACHE_SIZE": "1000000",
- }
-
- if _IS_WINDOWS:
- env_vars["QT_PLUGIN_PATH"] = (
- f"{os.path.join(qgis_app, 'qtplugins')};"
- f"{os.path.join(root, 'apps', 'Qt5', 'plugins')}"
- )
- env_vars["GDAL_DATA"] = os.path.join(root, "apps", "gdal", "share", "gdal")
- else:
- # Linux: GDAL 数据通常在系统路径 /usr/share/gdal/ 下
- gdal_candidates = [
- os.path.join(root, "share", "gdal"),
- os.path.join(root, "share", "qgis", "resources"),
+ # 搜索标准 Linux QGIS 安装路径
+ linux_paths = [
+ "/usr/bin/qgis", # Ubuntu/Debian apt 安装
+ "/usr/bin/qgis.bin",
+ "/opt/QGIS/apps/Python312/bin/python3", # 独立安装器
+ "/opt/QGIS/apps/Python3/bin/python3",
+ "/usr/libexec/qgis/python3", # Fedora/RHEL
]
- for p in gdal_candidates:
- if os.path.isdir(p):
- env_vars["GDAL_DATA"] = p
- break
+ for candidate in linux_paths:
+ if os.path.isfile(candidate):
+ logger.info(f"检测到 QGIS 可执行文件: {candidate}")
+ return candidate
- # QGIS 插件路径(Linux 标准)
- qt_plugin = os.path.join(root, "lib", "qt", "plugins")
- if os.path.isdir(qt_plugin):
- env_vars["QT_PLUGIN_PATH"] = qt_plugin
-
- for key, value in env_vars.items():
- os.environ[key] = value
- logger.debug(f"环境变量: {key}={value}")
+ # 回退到系统 Python(如果 qgis.core 可导入)
+ sys_python = shutil.which("python3") or shutil.which("python")
+ if sys_python:
+ logger.info(f"Linux 环境,使用系统 Python: {sys_python}")
+ return sys_python
+ return None
-# ─────────────────────────────────────────────────────────────
-# Python 模块路径
-# ─────────────────────────────────────────────────────────────
+def build_qgis_command(qgis_root: str = None) -> list[str]:
+ """
+ 构建通过 .bat 包装器启动 QGIS 子进程的命令列表。
-def _add_python_paths(root: str, qgis_app: str) -> None:
- """将 QGIS Python 模块路径加入 sys.path(平台自适应)"""
- if _IS_WINDOWS:
- python_paths = [
- os.path.join(qgis_app, "python"),
- os.path.join(root, "apps", "Python312", "Lib", "site-packages"),
- ]
+ .bat 文件设置环境变量并启动 QGIS Python 3.12。
+ DLL 加载由 qgis_runner.py 中的 ctypes 预加载机制处理。
+
+ Returns:
+ 可直接传给 subprocess.run() 的命令列表
+ """
+ import platform
+ if platform.system() != "Windows":
+ # Linux: 直接用 Python,不需要 .bat 包装
+ python_path = get_qgis_python_path(qgis_root)
+ if not python_path:
+ raise RuntimeError("未找到 QGIS Python 解释器")
+ return [python_path, get_runner_script()]
+
+ if qgis_root is None:
+ qgis_root = _detect_qgis_root() or "D:/QGIS"
+
+ python_path = get_qgis_python_path(qgis_root)
+ if not python_path:
+ raise RuntimeError("未找到 QGIS Python 3.12 解释器")
+
+ runner_script = get_runner_script()
+ if not os.path.isfile(runner_script):
+ raise RuntimeError(f"QGIS Runner 脚本不存在: {runner_script}")
+
+ # 生成 .bat 包装脚本
+ bat_path = _generate_bat_wrapper(qgis_root, python_path, runner_script)
+ return ["cmd.exe", "/c", bat_path]
+
+
+def is_qgis_available(qgis_root: str = None) -> bool:
+ """检查 QGIS 环境是否可用"""
+ return get_qgis_python_path(qgis_root) is not None
+
+
+def get_runner_script() -> str:
+ """获取 qgis_runner.py 的绝对路径"""
+ return str(Path(__file__).parent / "qgis_runner.py")
+
+
+def _generate_bat_wrapper(qgis_root: str, python_path: str, runner_script: str) -> str:
+ """
+ 生成 .bat 包装脚本,设置 QGIS 环境变量并启动 runner。
+
+ .bat 文件通过 cmd.exe 执行,设置必要的环境变量。
+ DLL 加载由 qgis_runner.py 的 ctypes 预加载处理。
+ """
+ qgis_app_dir = os.path.join(qgis_root, "apps", "qgis-ltr").replace("/", "\\")
+ python_dir = os.path.join(qgis_root, "apps", "Python312").replace("/", "\\")
+ qt5_plugins = os.path.join(qgis_root, "apps", "Qt5", "plugins").replace("/", "\\")
+ qtplugins = os.path.join(qgis_app_dir, "qtplugins").replace("/", "\\")
+ gdal_data = os.path.join(qgis_root, "apps", "gdal", "share", "gdal").replace("/", "\\")
+ qgis_python_dir = os.path.join(qgis_app_dir, "python").replace("/", "\\")
+ qgis_bin = os.path.join(qgis_app_dir, "bin").replace("/", "\\")
+ qt5_bin = os.path.join(qgis_root, "apps", "Qt5", "bin").replace("/", "\\")
+ gdal_lib = os.path.join(qgis_root, "apps", "gdal", "lib").replace("/", "\\")
+
+ bat_content = f"""@echo off
+set "PYTHONHOME={python_dir}"
+set "PYTHONPATH={qgis_python_dir}"
+set "QGIS_PREFIX_PATH={qgis_app_dir}"
+set "QT_PLUGIN_PATH={qtplugins};{qt5_plugins}"
+set "GDAL_DATA={gdal_data}"
+set "PYTHONUTF8=1"
+set "GDAL_FILENAME_IS_UTF8=YES"
+set "VSI_CACHE=TRUE"
+set "VSI_CACHE_SIZE=1000000"
+set "PATH={qgis_bin};{qt5_bin};{gdal_lib};%PATH%"
+"{python_path}" "{runner_script}" %*
+"""
+
+ # 写入临时 .bat 文件
+ bat_dir = os.path.join(tempfile.gettempdir(), "qgis_runner")
+ os.makedirs(bat_dir, exist_ok=True)
+ bat_path = os.path.join(bat_dir, "run_qgis.bat")
+
+ with open(bat_path, "w", encoding="utf-8") as f:
+ f.write(bat_content)
+
+ logger.debug(f"生成 QGIS 包装脚本: {bat_path}")
+ return bat_path
+
+
+def _detect_qgis_root() -> str | None:
+ """
+ 自动检测 QGIS 安装根目录。
+
+ 优先级:
+ 1. 环境变量 QGIS_ROOT
+ 2. Windows 默认路径 D:/QGIS
+ 3. Linux 常见路径
+ """
+ env_root = os.environ.get("QGIS_ROOT")
+ if env_root and os.path.isdir(env_root):
+ return env_root
+
+ import platform
+ if platform.system() == "Windows":
+ for candidate in ["D:/QGIS", "C:/OSGeo4W", "C:/QGIS"]:
+ if os.path.isdir(candidate):
+ logger.info(f"检测到 QGIS 根目录: {candidate}")
+ return candidate
else:
- python_paths = [
- os.path.join(qgis_app, "python"), # /usr/share/qgis/python
- os.path.join(root, "lib", "python3", "dist-packages"), # Debian/Ubuntu
- os.path.join(root, "lib", "python3.10", "site-packages"), # 通用
- ]
+ for candidate in ["/usr", "/opt/QGIS", "/home/QGIS"]:
+ if os.path.isdir(candidate):
+ logger.info(f"检测到 QGIS 根目录: {candidate}")
+ return candidate
- for p in python_paths:
- if os.path.isdir(p) and p not in sys.path:
- sys.path.insert(0, p)
- logger.info(f"添加 Python 路径: {p}")
-
-
-# ─────────────────────────────────────────────────────────────
-# QgsApplication 初始化
-# ─────────────────────────────────────────────────────────────
-
-def _init_qgs_application(qgis_app: str) -> None:
- """创建并初始化 QgsApplication 实例"""
- global _qgs_app
-
- from qgis.core import QgsApplication, QgsSettings
-
- QgsApplication.setPrefixPath(qgis_app, True)
- _qgs_app = QgsApplication([], False) # False = 不启动 GUI
-
- settings = QgsSettings()
- settings.setValue("/qgis/render_decorations", False)
- settings.setValue("/qgis/parallel_rendering", True)
- settings.setValue("/qgis/use_spatial_index", True)
-
- _qgs_app.initQgis()
- logger.info("QgsApplication 初始化完成")
\ No newline at end of file
+ logger.warning("未检测到 QGIS 安装目录")
+ return None
diff --git a/app/services/qgis/qgis_runner.py b/app/services/qgis/qgis_runner.py
new file mode 100644
index 0000000..0b4d243
--- /dev/null
+++ b/app/services/qgis/qgis_runner.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python3
+"""
+QGIS 专题图生成子进程入口。
+
+由主进程 (Python 3.10) 通过 subprocess 调用,
+运行在 QGIS 自带的 Python 3.12 环境中。
+
+支持两种模式:
+ - 批量模式(推荐):单次启动 QgsApplication,顺序处理多个模板
+ 输入: { "config": {...}, "models": [{...}, {...}, ...] }
+ - 单任务模式(兼容):
+ 输入: { "config": {...}, "model": {...} }
+
+输出 JSON (stdout):
+ 批量: { "results": [{"name": "...", "output": "..."}, ...] }
+ 单任务: { "name": "...", "output": "..." }
+
+错误: stderr + exit code 1
+"""
+import json
+import os
+import sys
+import time
+
+# ============================================================
+# 1. 环境初始化(必须在任何 QGIS/Qt import 之前)
+# ============================================================
+
+QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
+
+
+def _setup_python_path():
+ """将项目根目录和 QGIS Python 路径加入 sys.path"""
+ project_root = os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+ )
+ if project_root not in sys.path:
+ sys.path.insert(0, project_root)
+
+ qgis_python = os.path.join(QGIS_ROOT, "apps", "qgis-ltr", "python")
+ if os.path.isdir(qgis_python) and qgis_python not in sys.path:
+ sys.path.insert(0, qgis_python)
+
+
+def _setup_environment():
+ """设置 QGIS 运行所需的环境变量"""
+ qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr")
+ os.environ["QGIS_PREFIX_PATH"] = qgis_app_dir
+ os.environ["PYTHONUTF8"] = "1"
+ os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
+ os.environ["VSI_CACHE"] = "TRUE"
+ os.environ["VSI_CACHE_SIZE"] = "1000000"
+
+ if sys.platform == "win32":
+ os.environ["QT_PLUGIN_PATH"] = (
+ f"{os.path.join(qgis_app_dir, 'qtplugins')};"
+ f"{os.path.join(QGIS_ROOT, 'apps', 'Qt5', 'plugins')}"
+ )
+ gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal")
+ if os.path.isdir(gdal_data):
+ os.environ["GDAL_DATA"] = gdal_data
+
+ import ctypes
+ _dll_dirs = [
+ os.path.join(qgis_app_dir, "bin"),
+ os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
+ os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
+ ]
+ _preload_dlls = [
+ "qgis_core.dll", "qgispython.dll",
+ "Qt5Core.dll", "Qt5Gui.dll", "Qt5Widgets.dll",
+ "Qt5Network.dll", "Qt5Svg.dll", "Qt5Xml.dll",
+ "Qt5Concurrent.dll", "Qt5PrintSupport.dll",
+ ]
+ for dll_dir in _dll_dirs:
+ if not os.path.isdir(dll_dir):
+ continue
+ os.add_dll_directory(dll_dir)
+ for dll_name in _preload_dlls:
+ dll_path = os.path.join(dll_dir, dll_name)
+ if os.path.isfile(dll_path):
+ try:
+ ctypes.WinDLL(dll_path)
+ except OSError:
+ pass
+
+
+# ============================================================
+# 2. 主逻辑
+# ============================================================
+
+def _init_qgis():
+ """初始化 QgsApplication(只做一次)"""
+ from qgis.core import QgsApplication
+
+ qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr")
+ QgsApplication.setPrefixPath(qgis_app_dir, True)
+ qgs_app = QgsApplication([], False)
+ qgs_app.initQgis()
+ return qgs_app
+
+
+def _process_single(service, model):
+ """处理单个模板,返回结果 dict"""
+ name = service.generate(model)
+ return {"name": name, "output": model["outFile"]}
+
+
+def main():
+ t_start = time.time()
+
+ # 环境初始化
+ _setup_environment()
+ _setup_python_path()
+
+ # 读取请求 JSON
+ if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]):
+ with open(sys.argv[1], "r", encoding="utf-8") as f:
+ request = json.load(f)
+ else:
+ request = json.load(sys.stdin)
+
+ config = request["config"]
+
+ # 兼容批量和单任务模式
+ models = request.get("models") or [request["model"]]
+
+ # 初始化 QgsApplication(只做一次)
+ qgs_app = _init_qgis()
+
+ try:
+ from app.services.qgis.map_service import MapService
+
+ service = MapService(config)
+ results = []
+
+ for i, model in enumerate(models):
+ t_model = time.time()
+ try:
+ result = _process_single(service, model)
+ results.append(result)
+ elapsed = time.time() - t_model
+ print(
+ f"[qgis_runner] [{i+1}/{len(models)}] 完成: {result['name']}, "
+ f"耗时 {elapsed:.1f}s",
+ file=sys.stderr,
+ )
+ except Exception as e:
+ elapsed = time.time() - t_model
+ error_msg = f"{e}"
+ print(
+ f"[qgis_runner] [{i+1}/{len(models)}] 失败: {model.get('name', '?')}, "
+ f"耗时 {elapsed:.1f}s — {error_msg}",
+ file=sys.stderr,
+ )
+ results.append({"name": model.get("name", ""), "output": "", "error": error_msg})
+
+ # 输出结果
+ if len(models) == 1 and not request.get("models"):
+ # 单任务模式兼容
+ json.dump(results[0], sys.stdout, ensure_ascii=False)
+ else:
+ json.dump({"results": results}, sys.stdout, ensure_ascii=False)
+ sys.stdout.flush()
+
+ total = time.time() - t_start
+ ok = sum(1 for r in results if not r.get("error"))
+ fail = len(results) - ok
+ print(
+ f"\n[qgis_runner] 批量完成: {ok}成功/{fail}失败, 总耗时 {total:.1f}s",
+ file=sys.stderr,
+ )
+ except Exception as e:
+ elapsed = time.time() - t_start
+ print(f"[qgis_runner] 致命错误 ({elapsed:.1f}s): {e}", file=sys.stderr)
+ sys.exit(1)
+ finally:
+ qgs_app.exitQgis()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/app/services/qgis/template_cache.py b/app/services/qgis/template_cache.py
index 3587116..198eff2 100644
--- a/app/services/qgis/template_cache.py
+++ b/app/services/qgis/template_cache.py
@@ -51,7 +51,7 @@ class TemplateCache:
extent = None
layout = project.layoutManager().layoutByName(layout_name)
if layout:
- for item_id in ["mapTitle", "mapTime", "mapUnit", "info"]:
+ for item_id in ["mapTitle", "mapTime", "mapUint", "info"]:
item = layout.itemById(item_id)
if item:
texts[item_id] = item.text()
diff --git a/app/services/qgis/template_modifier.py b/app/services/qgis/template_modifier.py
index 7e10f79..28ef608 100644
--- a/app/services/qgis/template_modifier.py
+++ b/app/services/qgis/template_modifier.py
@@ -9,6 +9,7 @@ import zipfile
import tempfile
from app.config.paths import get_logger
+from app.config.qgis_mappings import OGR_TO_POSTGRES, TABLE_RENAMES, SCHEMA_REPLACEMENTS
logger = get_logger("qgis.modifier")
@@ -33,6 +34,26 @@ class TemplateModifier:
mapping[xml_key] = gpkg_path
return mapping
+ @staticmethod
+ def _fix_provider_tags(content: str) -> str:
+ """
+ 将 GPKG 文件路径对应的 postgres 改为 ogr。
+
+ 策略:按 块处理,若块内 datasource 是文件路径(盘符开头),
+ 则该块内的 provider 改为 ogr。避免跨层误改。
+ """
+ maplayer_re = re.compile(r'(]*>.*?)', re.DOTALL)
+ provider_re = re.compile(r'(]*>)postgres()')
+ file_ds_re = re.compile(r'([A-Za-z]:/[^<]+)')
+
+ def _fix_layer(m):
+ layer_xml = m.group(1)
+ if file_ds_re.search(layer_xml):
+ layer_xml = provider_re.sub(r'\1ogr\2', layer_xml)
+ return layer_xml
+
+ return maplayer_re.sub(_fix_layer, content)
+
def modify(self, template_path: str) -> str:
"""修改模板文件,返回修改后的临时 .qgz 路径"""
override = self.config.get("template_override")
@@ -65,29 +86,8 @@ class TemplateModifier:
if item.filename.endswith(".qgs"):
content = data.decode("utf-8")
- # 替换 host/port/dbname/schema
- if orig and actual:
- content = re.sub(
- rf"(host\s*=\s*['\"])({re.escape(orig['host'])})(['\"])",
- rf"\g<1>{actual['host']}\3",
- content,
- )
- content = re.sub(
- rf"(port\s*=\s*['\"])({re.escape(str(orig['port']))})(['\"])",
- rf"\g<1>{actual['port']}\3",
- content,
- )
- content = re.sub(
- rf"(dbname\s*=\s*['\"])({re.escape(orig['dbname'])})(['\"])",
- rf"\g<1>{actual['dbname']}\3",
- content,
- )
- content = content.replace(
- f'table="{orig["schema"]}".',
- f'table="{actual["schema"]}".',
- )
-
- # 替换静态底图 datasource
+ # ★ 先替换静态底图 datasource(在 schema 替换之前)
+ # 否则 table="base"."xxx" 会被改为 table="qgis"."xxx",导致匹配失败
if self._static_map:
def _replace(m):
tbl = table_re.search(m.group(2))
@@ -99,6 +99,74 @@ class TemplateModifier:
content = datasource_re.sub(_replace, content)
+ # ★ 转换避难场所 OGR 图层为 PostgreSQL(模板引用本地 JSON 文件)
+ _target_schema = (actual or {}).get("schema") or "qgis"
+ _db = self.config.get("db", {})
+ content = self._convert_ogr_to_postgres(content, _db, _target_schema)
+
+ # 替换所有 PostgreSQL 连接参数(统一指向目标库)
+ db = self.config.get("db", {})
+
+ # host: 模板中 host=localhost 无引号,匹配带/不带引号
+ if db.get("host"):
+ content = re.sub(
+ r"(host\s*=\s*)(?:['\"])?(?:localhost|127\.0\.0\.1)(?:['\"])?(?=\s|$)",
+ rf"\g<1>{db['host']}",
+ content,
+ )
+ # port: 模板中 port=5432 无引号,匹配带/不带引号
+ if db.get("port"):
+ content = re.sub(
+ r"(port\s*=\s*)(?:['\"])?5432(?:['\"])?(?=\s|$)",
+ rf"\g<1>{str(db['port'])}",
+ content,
+ )
+ if db.get("database"):
+ content = re.sub(
+ r"(dbname\s*=\s*['\"])([^'\"]+)(['\"])",
+ rf"\g<1>{db['database']}\3",
+ content,
+ )
+ # 移除 authcfg(QGIS 本地认证配置,子进程环境无效)
+ content = re.sub(r'\s*authcfg\s*=\s*\S+', '', content)
+ # 移除可能残留的 user=/password=(防止重复注入)
+ if db.get("username"):
+ content = re.sub(r"\s*user\s*=\s*'[^']*'", '', content)
+ if db.get("password"):
+ content = re.sub(r"\s*password\s*=\s*'[^']*'", '', content)
+ # 在 port 后注入显式 user + password
+ if db.get("username") and db.get("password"):
+ content = re.sub(
+ r'(port\s*=\s*\d+)',
+ rf"\1 user='{db['username']}' password='{db['password']}'",
+ content,
+ )
+ # schema 替换:统一指向目标 schema
+ target_schema = (actual or {}).get("schema") or "qgis"
+ for old_schema in SCHEMA_REPLACEMENTS:
+ content = content.replace(
+ f'table="{old_schema}".',
+ f'table="{target_schema}".',
+ )
+
+ # 表名映射(模板表名 ≠ 目标库表名)
+ for old_name, new_name in TABLE_RENAMES.items():
+ content = content.replace(
+ f'table="{target_schema}"."{old_name}"',
+ f'table="{target_schema}"."{new_name}"',
+ )
+
+ # tid 列已通过 add_tid_column.py 添加到所有 qgis 动态表中
+ # 模板中的 key='tid' 可正常工作,无需移除
+
+ # 修正 srid=0 → srid=4326(QGIS 无法从 srid=0 自动检测 SRID)
+ content = content.replace(" srid=0 ", " srid=4326 ")
+
+ # ★ 修复 provider 标签:GPKG 文件路径的图层必须用 ogr provider
+ # 否则 QGIS 会将 .gpkg 路径当作 PostgreSQL 连接字符串解析,导致图层无效
+ if self._static_map:
+ content = self._fix_provider_tags(content)
+
data = content.encode("utf-8")
zout.writestr(item, data)
@@ -108,4 +176,48 @@ class TemplateModifier:
except Exception as e:
logger.error(f"模板修改失败,将使用原始模板: {e}")
- return template_path
\ No newline at end of file
+ return template_path
+
+ @staticmethod
+ def _convert_ogr_to_postgres(content: str, db: dict, schema: str) -> str:
+ """
+ 将模板中引用本地文件的避难场所 OGR 图层转换为 PostgreSQL 图层。
+
+ 设计师在本地用 JSON 文件制作了避难场所图层(公园/学校/文化馆等),
+ 这些数据已迁移到目标库 qgis schema 中。
+ """
+ _ogr_shelter_map = OGR_TO_POSTGRES
+
+ maplayer_re = re.compile(r'(]*>.*?)', re.DOTALL)
+ provider_re = re.compile(r'(]*>)ogr()')
+ datasource_re = re.compile(r'().*?()', re.DOTALL)
+ file_ds_re = re.compile(r'.*?\|layername=([^<]+)', re.DOTALL)
+
+ def _convert_layer(m):
+ layer_xml = m.group(1)
+ ds_match = file_ds_re.search(layer_xml)
+ if not ds_match:
+ return layer_xml
+ layer_name = ds_match.group(1)
+ table = _ogr_shelter_map.get(layer_name)
+ if not table:
+ return layer_xml
+
+ # 构建 PostgreSQL URI
+ pg_uri = (
+ f"dbname='{db.get('database', 'xian_new')}' "
+ f"host={db.get('host', '47.92.216.173')} "
+ f"port={db.get('port', 7654)} "
+ f"user='{db.get('username', 'postgres')}' "
+ f"password='{db.get('password', '')}' "
+ f"key='tid' srid=4326 type=Point "
+ f"table=\"{schema}\".\"{table}\" (geometry)"
+ )
+ new_ds = f"{pg_uri}"
+
+ layer_xml = datasource_re.sub(new_ds, layer_xml)
+ layer_xml = provider_re.sub(r'\1postgres\2', layer_xml)
+ logger.info(f"避难场所图层已转换: {layer_name} → qgis.{table}")
+ return layer_xml
+
+ return maplayer_re.sub(_convert_layer, content)
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index b8bcc23..267c609 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,4 +9,5 @@ pyyaml == 6.0.3
fastapi == 0.136.3
uvicorn[standard] == 0.48.0
loguru == 0.7.3
-geopandas == 0.14.4
\ No newline at end of file
+geopandas == 0.14.4
+sqlalchemy == 2.0.51
\ No newline at end of file
diff --git a/settings.toml b/settings.toml
index cb6dae0..5382d2e 100644
--- a/settings.toml
+++ b/settings.toml
@@ -11,12 +11,12 @@ PREDICT_PROBABILITY_THRESHOLD = 50
# 静态底图 GeoPackage 目录(相对于项目根目录)
QGIS_GPKG_DIR = "app/data/gpkg"
# 专题图输出子目录(相对于 FILE_STORE_DIR)
-QGIS_OUTPUT_DIR = "xian/qgis/map/:disasterTime"
+QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:disasterTime"
# 专题图默认参数
QGIS_DEFAULTS_MAP_LAYOUT = "A3"
QGIS_DEFAULTS_ZOOM_RULE = "11"
QGIS_DEFAULTS_ZOOM_VALUE = "50"
-QGIS_DEFAULTS_MAP_UNIT = "西安市应急管理局"
+QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# 专题图DPI
QGIS_EXPORT_DPI = 300
# 批量产图线程池
@@ -80,6 +80,9 @@ FILE_STORE_DIR = "G:/files"
# QGIS 配置
# ============================================================
QGIS_ROOT = "D:/QGIS"
+# 专题图输出子目录
+QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:disasterTime"
+QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# 模板数据库覆盖:将模板中硬编码的连接替换为实际环境连接
QGIS_TEMPLATE_OVERRIDE_ENABLED = true
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST = "localhost"
diff --git a/tools/add_tid_column.py b/tools/add_tid_column.py
new file mode 100644
index 0000000..d696bd2
--- /dev/null
+++ b/tools/add_tid_column.py
@@ -0,0 +1,42 @@
+"""
+在所有 qgis 动态表中添加 tid 自增主键列
+QGIS 的 datasource key='tid' 需要此列
+"""
+import psycopg2
+
+TABLES = [
+ 'hazard_hydrops',
+ 'lifeline_outfall',
+ 'lifeline_pipe',
+ 'risk_census_population',
+ 'sx_xa_towns',
+]
+
+c = psycopg2.connect(
+ host='47.92.216.173', port=7654,
+ user='postgres', password='zhangsan',
+ database='xian_new'
+)
+c.autocommit = True
+cur = c.cursor()
+
+for t in TABLES:
+ try:
+ # 检查 tid 列是否存在
+ cur.execute(f"""
+ SELECT EXISTS (
+ SELECT FROM information_schema.columns
+ WHERE table_schema='qgis' AND table_name='{t}' AND column_name='tid'
+ )
+ """)
+ exists = cur.fetchone()[0]
+
+ if exists:
+ print(f"qgis.{t}: tid 列已存在, 跳过")
+ else:
+ cur.execute(f'ALTER TABLE qgis."{t}" ADD COLUMN tid SERIAL')
+ print(f"qgis.{t}: tid 列添加成功")
+ except Exception as e:
+ print(f"qgis.{t}: 失败 - {e}")
+
+c.close()
diff --git a/tools/check_dynamic_layers.py b/tools/check_dynamic_layers.py
new file mode 100644
index 0000000..9d234ad
--- /dev/null
+++ b/tools/check_dynamic_layers.py
@@ -0,0 +1,72 @@
+"""
+批量检查所有 rainfall 模板的图层,看哪些动态图层需要显示、表是否存在
+"""
+import zipfile, re, os, psycopg2
+
+TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
+
+# 收集所有非GPKG动态图层的表名
+all_dynamic = set()
+for fname in sorted(os.listdir(TEMPLATE_DIR)):
+ if not fname.endswith('.qgz') or fname.startswith('tmp'):
+ continue
+ path = os.path.join(TEMPLATE_DIR, fname)
+ z = zipfile.ZipFile(path)
+ qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
+ content = z.read(qgs_name).decode('utf-8')
+
+ maplayer_re = re.compile(r']*>(.*?)', re.DOTALL)
+ for m in maplayer_re.finditer(content):
+ block = m.group(1)
+ name_m = re.search(r'([^<]+)', block)
+ provider_m = re.search(r']*>(\w+)', block)
+ ds_m = re.search(r'(.*?)', block, re.DOTALL)
+
+ provider = provider_m.group(1) if provider_m else '?'
+ if provider != 'postgres':
+ continue
+
+ ds = ds_m.group(1).strip() if ds_m else ''
+ table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
+ layer_name = name_m.group(1) if name_m else '?'
+
+ if table_m:
+ schema = table_m.group(1)
+ table = table_m.group(2)
+ key = f"{schema}.{table}"
+ all_dynamic.add((layer_name, key))
+
+# 已知的GPKG静态层(会被替换为ogr)
+static_tables = {
+ 'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
+ 'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
+ 'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
+ 'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
+ 'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
+}
+
+# DB检查
+c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
+c.autocommit = True
+cur = c.cursor()
+
+print(f"{'图层名':20s} {'原表':35s} {'qgis中存在':12s} {'行数':>8s}")
+print("-" * 80)
+
+for layer_name, table_key in sorted(all_dynamic):
+ if table_key in static_tables:
+ continue # 会被GPKG替换,跳过
+
+ schema, table = table_key.split('.', 1)
+ try:
+ cur.execute(f'SELECT count(*) FROM qgis.{table}')
+ count = cur.fetchone()[0]
+ exists = "YES"
+ except:
+ exists = "NO"
+ count = 0
+
+ marker = " !!!" if exists == "NO" else ""
+ print(f"{layer_name:20s} {table_key:35s} {exists:12s} {count:>8,d}{marker}")
+
+c.close()
diff --git a/tools/check_layer_order.py b/tools/check_layer_order.py
new file mode 100644
index 0000000..8cf061b
--- /dev/null
+++ b/tools/check_layer_order.py
@@ -0,0 +1,54 @@
+"""检查模板中所有图层的渲染顺序(z-order)"""
+import zipfile, re, os
+
+TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
+fname = "暴雨内涝潜在隐患点及人口分布图.qgz"
+path = os.path.join(TEMPLATE_DIR, fname)
+
+z = zipfile.ZipFile(path)
+qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
+content = z.read(qgs_name).decode('utf-8')
+
+# 提取所有 maplayer 块(保持顺序 — 这就是渲染顺序)
+maplayer_re = re.compile(r']*>(.*?)', re.DOTALL)
+
+print(f"模板: {fname}")
+print(f"{'#':>3s} {'图层名':20s} {'类型':10s} {'可见':4s} {'表名'}")
+print("-" * 80)
+
+for i, m in enumerate(maplayer_re.finditer(content)):
+ block = m.group(1)
+
+ # 图层名
+ name_m = re.search(r'([^<]+)', block)
+ name = name_m.group(1) if name_m else '?'
+
+ # provider
+ provider_m = re.search(r']*>(\w+)', block)
+ provider = provider_m.group(1) if provider_m else '?'
+
+ # datasource table
+ ds_m = re.search(r'(.*?)', block, re.DOTALL)
+ ds = ds_m.group(1).strip() if ds_m else ''
+ table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
+ table = f"{table_m.group(1)}.{table_m.group(2)}" if table_m else '?'
+
+ # 可见性
+ visible = 'Y' if 'visible="1"' in m.group(0) or 'visible="1"' in block else 'N'
+
+ # 判断图层类型
+ is_static = table in [
+ 'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
+ 'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
+ 'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
+ 'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
+ 'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
+ ]
+ layer_type = "底图" if is_static else "动态"
+
+ marker = " <<<" if not is_static and provider == 'postgres' else ""
+ print(f"{i+1:>3d} {name:20s} {provider:10s} {visible:4s} {table:35s} [{layer_type}]{marker}")
+
+print()
+print("提示: 图层按从上到下的顺序渲染(序号小的在底层,序号大的在顶层)")
+print("动态图层如果在底图下方,会被底图完全遮盖")
diff --git a/tools/check_layout.py b/tools/check_layout.py
new file mode 100644
index 0000000..b480b4a
--- /dev/null
+++ b/tools/check_layout.py
@@ -0,0 +1,65 @@
+"""检查布局 Map 项的图层配置"""
+import zipfile, re, os
+
+TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
+fname = "暴雨内涝潜在隐患点及人口分布图.qgz"
+path = os.path.join(TEMPLATE_DIR, fname)
+
+z = zipfile.ZipFile(path)
+qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
+content = z.read(qgs_name).decode('utf-8')
+
+# 提取 Layout 元素(找 Map 项)
+# QGIS 有两种布局格式: (QGIS 2.x style) 和 (QGIS 3.x style)
+layout_re = re.compile(r'<(?:Composer|Layout)[^>]*name="([^"]*)"[^>]*>(.*?)(?:Composer|Layout)>', re.DOTALL)
+
+for m in layout_re.finditer(content):
+ layout_name = m.group(1)
+ layout_content = m.group(2)
+ print(f"=== 布局: {layout_name} ===")
+
+ # 检查 Map 项
+ map_item_re = re.compile(r']*>(.*?)', re.DOTALL)
+ for mm in map_item_re.finditer(layout_content):
+ map_content = mm.group(1)
+
+ # 检查是否有 lockedLayers
+ locked = re.findall(r'(.*?)', map_content, re.DOTALL)
+ if locked:
+ print(f" lockedLayers: {locked[0][:500]}")
+
+ # 检查 keepLayerSet
+ keep_set = re.findall(r'keepLayerSet="([^"]*)"', mm.group(0))
+ if keep_set:
+ print(f" keepLayerSet: {keep_set[0]}")
+
+ # 检查 followPreset
+ preset = re.findall(r'followPreset="([^"]*)"', mm.group(0))
+ if preset:
+ print(f" followPreset: {preset[0]}")
+
+ # 检查 followPresetName
+ preset_name = re.findall(r'followPresetName="([^"]*)"', mm.group(0))
+ if preset_name:
+ print(f" followPresetName: {preset_name[0]}")
+
+ # 检查
+ layer_set = re.findall(r'(.*?)', map_content, re.DOTALL)
+ if layer_set:
+ print(f" layerSet: {layer_set[0][:500]}")
+
+ # 检查
+ grids = re.findall(r']*/>', map_content)
+ print(f" grids: {len(grids)}")
+
+ # 也检查 格式 (QGIS 3.x)
+ map_item2_re = re.compile(r']*type="[^"]*map[^"]*"[^>]*>(.*?)', re.DOTALL | re.IGNORECASE)
+ for mm in map_item2_re.finditer(layout_content):
+ map_content = mm.group(1)
+ print(f" [LayoutItem Map] attributes: {mm.group(0)[:300]}")
+
+# 打印模板中所有的 map theme / visibility preset
+presets = re.findall(r'<(?:visibility-presets|map-theme-collection).*?(?:visibility-presets|map-theme-collection)>', content, re.DOTALL)
+if presets:
+ print("\n=== 可见性预设 ===")
+ print(presets[0][:500])
diff --git a/tools/check_shelter.py b/tools/check_shelter.py
new file mode 100644
index 0000000..21fa7ef
--- /dev/null
+++ b/tools/check_shelter.py
@@ -0,0 +1,71 @@
+"""检查暴雨避难场所分布图的所有动态图层是否有数据"""
+import zipfile, re, os, psycopg2
+
+TEMPLATE_PATH = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall\暴雨避难场所分布图.qgz"
+
+z = zipfile.ZipFile(TEMPLATE_PATH)
+qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
+content = z.read(qgs_name).decode('utf-8')
+
+maplayer_re = re.compile(r']*>(.*?)', re.DOTALL)
+
+# 收集所有 PostgreSQL 动态图层
+layers = []
+for m in maplayer_re.finditer(content):
+ block = m.group(1)
+ name_m = re.search(r'([^<]+)', block)
+ provider_m = re.search(r']*>(\w+)', block)
+ ds_m = re.search(r'(.*?)', block, re.DOTALL)
+
+ provider = provider_m.group(1) if provider_m else '?'
+ if provider != 'postgres':
+ continue
+
+ name = name_m.group(1) if name_m else '?'
+ ds = ds_m.group(1).strip() if ds_m else ''
+ table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
+ table_key = f"{table_m.group(1)}.{table_m.group(2)}" if table_m else '?'
+
+ layers.append((name, table_key))
+
+# 已知 GPKG 静态层
+static_tables = {
+ 'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
+ 'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
+ 'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
+ 'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
+ 'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
+}
+
+c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
+c.autocommit = True
+cur = c.cursor()
+
+print(f"模板: 暴雨避难场所分布图")
+print(f"{'图层名':20s} {'原表':40s} {'qgis有数据':10s} {'行数':>6s}")
+print("-" * 85)
+
+for name, table_key in layers:
+ if table_key in static_tables:
+ continue
+
+ schema, table = table_key.split('.', 1)
+ mapped_table = 'hazard_hydrops' if table == 'hazard_waterlogging' else table
+
+ try:
+ cur.execute(f'SELECT count(*) FROM qgis.{mapped_table}')
+ count = cur.fetchone()[0]
+ exists = "YES" if count > 0 else "EMPTY"
+ except:
+ try:
+ cur.execute(f'SELECT count(*) FROM {schema}.{table}')
+ count = cur.fetchone()[0]
+ exists = f"在{schema}"
+ except:
+ exists = "NO"
+ count = 0
+
+ marker = " <-- 无数据!" if count == 0 else ""
+ print(f"{name:20s} {table_key:40s} {exists:10s} {count:>6,d}{marker}")
+
+c.close()
diff --git a/tools/check_tables.py b/tools/check_tables.py
new file mode 100644
index 0000000..d65ad83
--- /dev/null
+++ b/tools/check_tables.py
@@ -0,0 +1,38 @@
+import psycopg2
+c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
+cur = c.cursor()
+
+tables = ['lifeline_outfall', 'lifeline_pipe', 'risk_census_population', 'hazard_waterlogging', 'sx_street', 'sx_xa_county']
+
+print("=== qgis schema ===")
+for t in tables:
+ try:
+ cur.execute(f'SELECT COUNT(*) FROM qgis."{t}"')
+ count = cur.fetchone()[0]
+ cur.execute(f"SELECT GeometryType(geom) FROM qgis.\"{t}\" LIMIT 1")
+ g = cur.fetchone()
+ print(f" qgis.{t}: {count} 行, geom={g[0] if g else 'N/A'}")
+ except Exception as e:
+ print(f" qgis.{t}: 不存在")
+
+print("\n=== base schema (检查积水点原始表) ===")
+for t in ['hazard_waterlogging']:
+ try:
+ cur.execute(f'SELECT COUNT(*) FROM base."{t}"')
+ print(f" base.{t}: {cur.fetchone()[0]} 行")
+ except:
+ print(f" base.{t}: 不存在")
+
+# 查找所有含 water 或 hazard 的表
+print("\n=== 搜索含 water/hazard 的表 ===")
+cur.execute("""
+ SELECT table_schema, table_name
+ FROM information_schema.tables
+ WHERE (table_name LIKE '%water%' OR table_name LIKE '%hazard%')
+ AND table_schema IN ('qgis', 'base', 'public')
+ ORDER BY table_schema, table_name
+""")
+for r in cur.fetchall():
+ print(f" {r[0]}.{r[1]}")
+
+c.close()
diff --git a/tools/inspect_layers.py b/tools/inspect_layers.py
new file mode 100644
index 0000000..34eedb3
--- /dev/null
+++ b/tools/inspect_layers.py
@@ -0,0 +1,42 @@
+"""提取模板 .qgs 中所有图层名称、类型、数据源"""
+import zipfile, re, os, sys
+
+template_dir = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
+for fname in sorted(os.listdir(template_dir)):
+ if not fname.endswith('.qgz') or fname.startswith('tmp'):
+ continue
+ path = os.path.join(template_dir, fname)
+ z = zipfile.ZipFile(path)
+ # 找到 .qgs 文件
+ qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
+ content = z.read(qgs_name).decode('utf-8')
+
+ # 提取 maplayer 块
+ maplayer_re = re.compile(r']*>(.*?)', re.DOTALL)
+ layers = []
+ for m in maplayer_re.finditer(content):
+ block = m.group(1)
+ # 图层名
+ name_m = re.search(r'([^<]+)', block)
+ # provider
+ provider_m = re.search(r']*>(\w+)', block)
+ # datasource (截取前120字符)
+ ds_m = re.search(r'(.*?)', block, re.DOTALL)
+
+ name = name_m.group(1) if name_m else '?'
+ provider = provider_m.group(1) if provider_m else '?'
+ ds = ds_m.group(1).strip()[:120] if ds_m else '?'
+
+ layers.append((name, provider, ds))
+
+ print(f"\n{'='*60}")
+ print(f"模板: {fname}")
+ print(f"{'='*60}")
+ for name, provider, ds in layers:
+ print(f" [{provider:10s}] {name}")
+ if ds:
+ print(f" ds: {ds}")
+
+ # 只分析第一个模板(所有 rainfall 模板结构相同)
+ if fname == '暴雨内涝潜在隐患点及人口分布图.qgz':
+ break
diff --git a/tools/list_shelter_layers.py b/tools/list_shelter_layers.py
new file mode 100644
index 0000000..39d93dc
--- /dev/null
+++ b/tools/list_shelter_layers.py
@@ -0,0 +1,16 @@
+import zipfile, re
+
+z = zipfile.ZipFile(r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall\暴雨避难场所分布图.qgz")
+c = z.read([n for n in z.namelist() if n.endswith('.qgs')][0]).decode()
+layers = re.findall(r']*>(.*?)', c, re.DOTALL)
+
+for i, l in enumerate(layers):
+ nm = re.search(r'([^<]+)', l)
+ pv = re.search(r']*>(\w+)', l)
+ ds = re.search(r'(.*?)', l, re.DOTALL)
+ name = nm.group(1) if nm else '?'
+ prov = pv.group(1) if pv else '?'
+ ds_text = ds.group(1).strip()[:120] if ds else ''
+ print(f"{i:2d}. [{prov:10s}] {name}")
+ if ds_text:
+ print(f" ds: {ds_text}")
diff --git a/tools/test_single_map.py b/tools/test_single_map.py
new file mode 100644
index 0000000..66183c2
--- /dev/null
+++ b/tools/test_single_map.py
@@ -0,0 +1,295 @@
+"""
+单张专题图测试脚本 - 独立版
+直接构建 model/config 并通过 subprocess 调用 qgis_runner.py
+"""
+import json, os, sys, tempfile, subprocess, re
+from datetime import datetime
+
+# ============================================================
+# 配置(和 settings.toml 一致)
+# ============================================================
+DB_CONFIG = {
+ "host": "47.92.216.173",
+ "port": 7654,
+ "username": "postgres",
+ "password": "zhangsan",
+ "database": "xian_new",
+}
+QGIS_ROOT = "D:/QGIS"
+XIAN_CENTER = [108.948024, 34.263161]
+GPKG_DIR = r"F:\project\xian\xian_algorithm_new\app\data\gpkg".replace("\\", "/")
+TEMPLATE_BASE = r"F:\project\xian\xian_algorithm_new\app\data\template"
+
+# ============================================================
+# 模拟推理结果(inference_id=50)
+# ============================================================
+# 从实际数据库查询得知:
+# event_type='rainfall', condition={'region_code':'610116','rainfall':'120'},
+# occurred_time='2025-09-16 20:00:00'
+inference = {
+ "id": 50,
+ "event_type": "rainfall",
+ "condition": {"region_code": "610116", "rainfall": "120"},
+ "occurred_time": datetime(2025, 9, 16, 20, 0, 0),
+}
+
+# ============================================================
+# 区域映射(从 settings.toml)
+# ============================================================
+AREA_MAP = {
+ "610102": "新城区", "610103": "碑林区", "610104": "莲湖区",
+ "610111": "灞桥区", "610112": "未央区", "610113": "雁塔区",
+ "610114": "阎良区", "610115": "临潼区", "610116": "长安区",
+ "610117": "高陵区", "610118": "鄠邑区", "610122": "蓝田县",
+ "610124": "周至县",
+}
+
+STATIC_LAYERS = {
+ "水库": ("base.rivers", "rivers.gpkg"),
+ "市州驻地": ("base.sx_capital", "sx_capital.gpkg"),
+ "河流": ("base.river", "river.gpkg"),
+ "active_fault": ("base.active_fault", "active_fault.gpkg"),
+ "陕西省": ("base.sx", "sx.gpkg"),
+ "乡镇驻地": ("base.sx_street", "sx_street.gpkg"),
+ "区县驻地": ("base.sx_xa_county", "sx_xa_county.gpkg"),
+ "县界": ("base.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
+ "周边区县": ("base.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
+ "周边市州": ("base.sx_zb_city", "sx_zb_city.gpkg"),
+ "周边县区": ("base.sx_zb_county", "sx_zb_county.gpkg"),
+ "traffic_expressway": ("base.traffic_expressway", "traffic_expressway.gpkg"),
+ "traffic_provincial": ("base.traffic_provincial", "traffic_provincial.gpkg"),
+ "traffic_railway": ("base.traffic_railway", "traffic_railway.gpkg"),
+ "traffic_township": ("base.traffic_township", "traffic_township.gpkg"),
+ "traffic_trunk_line": ("base.traffic_trunk_line", "traffic_trunk_line.gpkg"),
+}
+
+# ============================================================
+# 辅助函数(从 qgis_map_export.py 直接复制)
+# ============================================================
+def format_disaster_time(occurred_time):
+ if isinstance(occurred_time, datetime):
+ return occurred_time.strftime("%Y%m%d%H%M%S")
+ return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
+
+def resolve_district(condition):
+ code = condition.get("region_code")
+ if code and str(code) in AREA_MAP:
+ return AREA_MAP[str(code)]
+ return ""
+
+def build_map_title(event_type, condition, template_name):
+ district = resolve_district(condition)
+ prefix = f"陕西西安{district}" if district else "陕西西安"
+ if event_type == "rainfall":
+ rainfall = condition.get("rainfall")
+ if rainfall is not None and rainfall != "":
+ return f"{prefix}{float(rainfall)}mm{template_name}"
+ return f"{prefix}{template_name}"
+ return f"{prefix}{template_name}"
+
+def build_info_text(event_type, condition, occurred_time):
+ """构建信息面板文本(不显示灾害类型标签)"""
+ lines = []
+ if isinstance(occurred_time, datetime):
+ time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分"
+ elif occurred_time:
+ time_str = str(occurred_time)
+ else:
+ time_str = ""
+ lines.append(f"时间:{time_str}")
+
+ if event_type == "rainfall":
+ rainfall = condition.get("rainfall")
+ if rainfall is not None and rainfall != "":
+ lines.append(f"累计降雨量:{float(rainfall)}mm")
+ duration = condition.get("duration")
+ if duration is not None and duration != "":
+ lines.append(f"已持续:{duration}")
+ elif event_type == "earthquake":
+ magnitude = condition.get("magnitude")
+ if magnitude is not None and magnitude != "":
+ lines.append(f"震级:{float(magnitude)}级")
+ lon = condition.get("epicenter_lon")
+ lat = condition.get("epicenter_lat")
+ if lon is not None and lat is not None:
+ lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
+
+ return "\n".join(lines)
+
+def derive_model_params(inference, batch_folder, template_path):
+ event_type = inference["event_type"]
+ condition = inference["condition"]
+ occurred_time = inference["occurred_time"]
+
+ template_name = os.path.splitext(os.path.basename(template_path))[0]
+ map_title = build_map_title(event_type, condition, template_name)
+ safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
+ out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
+
+ if isinstance(occurred_time, datetime):
+ map_time = occurred_time.strftime("%Y-%m-%d %H:%M")
+ else:
+ map_time = str(occurred_time)
+
+ info_text = build_info_text(event_type, condition, occurred_time)
+
+ center_x, center_y = XIAN_CENTER
+ if event_type == "earthquake":
+ lon = condition.get("epicenter_lon", XIAN_CENTER[0])
+ lat = condition.get("epicenter_lat", XIAN_CENTER[1])
+ center_x, center_y = float(lon), float(lat)
+
+ return {
+ "name": f"test_{inference['id']}_{safe_name}",
+ "path": template_path,
+ "outFile": out_file,
+ "mapLayout": "A3",
+ "mapTitle": map_title,
+ "mapTime": map_time,
+ "mapUnit": "西安市应急管理局",
+ "info": info_text,
+ "centerX": center_x,
+ "centerY": center_y,
+ "event": str(inference["id"]),
+ "queueId": str(inference["id"]),
+ "zoomRule": "11",
+ "zoomValue": "50",
+ }
+
+def build_qgis_config(batch_folder):
+ static_layers_config = {}
+ for name, (table, gpkg_file) in STATIC_LAYERS.items():
+ static_layers_config[name] = {"file": gpkg_file, "table": table}
+
+ return {
+ "db": DB_CONFIG,
+ "qgis": {"exportDpi": 300},
+ "template_override": {
+ "enabled": True,
+ "original": {
+ "host": "localhost",
+ "port": 5432,
+ "dbname": "yjzyk_xian",
+ "schema": "base",
+ },
+ "actual": {
+ "host": DB_CONFIG["host"],
+ "port": DB_CONFIG["port"],
+ "dbname": DB_CONFIG["database"],
+ "schema": "qgis",
+ "username": DB_CONFIG["username"],
+ "password": DB_CONFIG["password"],
+ },
+ },
+ "static_layers": {
+ "enabled": True,
+ "gpkg_dir": GPKG_DIR,
+ "layers": static_layers_config,
+ },
+ "batch_folder": batch_folder,
+ }
+
+# ============================================================
+# 主逻辑
+# ============================================================
+event_type = inference["event_type"]
+disaster_time = format_disaster_time(inference["occurred_time"])
+batch_folder = os.path.join("G:/files", "xian/qgis/map", disaster_time).replace("\\", "/")
+os.makedirs(batch_folder, exist_ok=True)
+
+template_dir = os.path.join(TEMPLATE_BASE, event_type)
+template_path = os.path.join(template_dir, "暴雨内涝潜在隐患点及人口分布图.qgz").replace("\\", "/")
+
+model = derive_model_params(inference, batch_folder, template_path)
+config = build_qgis_config(batch_folder)
+
+print("=" * 60)
+print(f"模板: {template_path}")
+print(f"输出: {model['outFile']}")
+print(f"标题: {model['mapTitle']}")
+print(f"时间: {model['mapTime']}")
+print(f"单位: {model['mapUnit']}")
+print(f"info:\n{model['info']}")
+print(f"中心: ({model['centerX']}, {model['centerY']})")
+print(f"event: {model['event']}, queueId: {model['queueId']}")
+print("=" * 60)
+
+request_data = json.dumps(
+ {"config": config, "model": model},
+ ensure_ascii=False,
+)
+
+tmp_json = tempfile.NamedTemporaryFile(
+ suffix=".json", delete=False, mode="w", encoding="utf-8"
+)
+tmp_json.write(request_data)
+tmp_json.close()
+
+runner_script = os.path.join(
+ os.path.dirname(__file__),
+ r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
+).replace("\\", "/")
+# Fix: use absolute path
+runner_script = r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
+
+bat_dir = os.path.join(tempfile.gettempdir(), "qgis_runner")
+os.makedirs(bat_dir, exist_ok=True)
+bat_path = os.path.join(bat_dir, "run_qgis_test.bat")
+
+qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr").replace("/", "\\")
+python_dir = os.path.join(QGIS_ROOT, "apps", "Python312").replace("/", "\\")
+qt5_plugins = os.path.join(QGIS_ROOT, "apps", "Qt5", "plugins").replace("/", "\\")
+qtplugins = os.path.join(qgis_app_dir, "qtplugins").replace("/", "\\")
+gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal").replace("/", "\\")
+qgis_python_dir = os.path.join(qgis_app_dir, "python").replace("/", "\\")
+qgis_bin = os.path.join(qgis_app_dir, "bin").replace("/", "\\")
+qt5_bin = os.path.join(QGIS_ROOT, "apps", "Qt5", "bin").replace("/", "\\")
+gdal_lib = os.path.join(QGIS_ROOT, "apps", "gdal", "lib").replace("/", "\\")
+python_exe = os.path.join(python_dir, "python3.exe").replace("/", "\\")
+
+bat_content = f"""@echo off
+set "PYTHONHOME={python_dir}"
+set "PYTHONPATH={qgis_python_dir}"
+set "QGIS_PREFIX_PATH={qgis_app_dir}"
+set "QT_PLUGIN_PATH={qtplugins};{qt5_plugins}"
+set "GDAL_DATA={gdal_data}"
+set "PYTHONUTF8=1"
+set "GDAL_FILENAME_IS_UTF8=YES"
+set "VSI_CACHE=TRUE"
+set "VSI_CACHE_SIZE=1000000"
+set "PATH={qgis_bin};{qt5_bin};{gdal_lib};%PATH%"
+"{python_exe}" "{runner_script}" "{tmp_json.name}"
+"""
+
+with open(bat_path, "w", encoding="utf-8") as f:
+ f.write(bat_content)
+
+cmd = ["cmd.exe", "/c", bat_path]
+print(f"执行: {' '.join(cmd[:3])} ...")
+print()
+
+try:
+ result = subprocess.run(cmd, capture_output=True, timeout=300)
+ stderr_text = result.stderr.decode("utf-8", errors="replace")
+ stdout_text = result.stdout.decode("utf-8", errors="replace")
+
+ print("=== stderr ===")
+ for line in stderr_text.split("\n"):
+ print(f" {line}")
+ print()
+ print("=== stdout ===")
+ print(stdout_text[:500] if stdout_text else "(empty)")
+
+ if result.returncode != 0:
+ print(f"\n!!! FAIL: exit={result.returncode}")
+ else:
+ print(f"\n=== SUCCESS ===")
+ if os.path.isfile(model["outFile"]):
+ print(f"文件: {model['outFile']} ({os.path.getsize(model['outFile'])/1024:.1f} KB)")
+ else:
+ print(f"文件不存在: {model['outFile']}")
+finally:
+ try:
+ os.remove(tmp_json.name)
+ except OSError:
+ pass