21 Commits

Author SHA1 Message Date
zzw 15e6c70103 专题图修改虚拟环境 2026-06-20 21:12:52 +08:00
zzw e115041454 专题图修改 2026-06-20 20:48:39 +08:00
wzy-warehouse 480e793ff8 去掉PYTHONHOME 2026-06-20 17:53:53 +08:00
wzy-warehouse b7a25899b2 Merge remote-tracking branch 'origin/wzy' into wzy 2026-06-20 17:50:21 +08:00
wzy-warehouse b4cd0d4e35 不生成模版文件 2026-06-20 17:50:17 +08:00
wzy-warehouse a267d1fa39 修改路径,上一版正常 2026-06-20 17:44:44 +08:00
wzy-warehouse 293c12fdf7 修复Qt5 2026-06-20 17:37:55 +08:00
wzy-warehouse 7d67096cb6 修复 2026-06-20 17:20:36 +08:00
wzy-warehouse f1e85468c5 回复到之前版本 2026-06-20 17:13:52 +08:00
wzy-warehouse 66f0d18947 PROJ_DATA连接问题 2026-06-20 17:10:42 +08:00
wzy-warehouse 69f45d77b1 硬编码问题 2026-06-20 17:08:49 +08:00
wzy-warehouse 402102cf36 QGIS修改日志报错 2026-06-20 17:07:05 +08:00
wzy-warehouse ca14e931f8 QGIS修改日志报错 2026-06-20 17:04:24 +08:00
wzy-warehouse 4fb65f7f53 QGIS完成初步重构 2026-06-20 16:58:14 +08:00
wzy-warehouse f3c38bbb57 QGIS完成初步重构 2026-06-20 16:34:49 +08:00
wzy-warehouse 943f32cafd QGIS完成初步重构 2026-06-20 16:14:28 +08:00
wzy-warehouse 18d8bcb1a3 QGIS完成初步重构 2026-06-20 15:50:24 +08:00
wzy-warehouse d20b5744bb 移动模板文件 2026-06-19 18:25:36 +08:00
wzy-warehouse fb68864d04 初始化集成qgis 2026-06-19 17:06:53 +08:00
wzy-warehouse b4cce93af0 初始化集成qgis 2026-06-19 17:04:03 +08:00
zxyroy 6a03f66b7d 修改log 2026-06-17 21:47:10 +08:00
86 changed files with 2974 additions and 50 deletions
+4
View File
@@ -55,3 +55,7 @@ htmlcov/
# Ignore dynaconf secret files
.secrets.*
/test/
# QGIS 临时模板文件
app/data/template/*/tmp*.qgz
tmp*.qgz
+2
View File
@@ -8,6 +8,8 @@ def register_routers(application: FastAPI):
"""注册所有路由"""
from app.api.rainfall import router as rainfall_router
from app.api.earthquake import router as earthquake_router
from app.api.qgis_map_export import router as qgis_router
application.include_router(rainfall_router)
application.include_router(earthquake_router)
application.include_router(qgis_router)
+480
View File
@@ -0,0 +1,480 @@
"""
QGIS 专题图导出接口
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
- 线程池并发产图(默认 4 worker)
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
"""
import asyncio
import concurrent.futures
import os
import re
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from app.config.paths import get_logger
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
from app.repositories import qgis_repository
from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest
from app.utils.api_deps import get_prediction_semaphore
from app.utils.db_helper import db_helper
from config import settings
logger = get_logger("api.qgis")
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
# 线程池(按配置初始化 worker 数量)
_worker_threads = getattr(settings, "QGIS_WORKER_THREADS", 4)
_thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=_worker_threads,
thread_name_prefix="qgis-worker",
)
logger.info(f"QGIS 线程池初始化: {_worker_threads} workers")
# 去重锁:in_progress_locks[disaster_time] = asyncio.Lock
# 同一灾害时间的并发请求会排队,避免重复产图
_in_progress_locks: dict[str, asyncio.Lock] = {}
_locks_lock = asyncio.Lock()
# 西安市中心
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
# ============================================================
# 时间戳格式化
# ============================================================
def format_disaster_time(occurred_time) -> str:
"""将 occurred_time 格式化为时间戳字符串(YYYYMMDDHHmmss),作为文件夹名"""
if isinstance(occurred_time, datetime):
return occurred_time.strftime("%Y%m%d%H%M%S")
elif occurred_time:
return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
else:
return datetime.now().strftime("%Y%m%d%H%M%S")
# ============================================================
# region_code → 区县名称映射
# ============================================================
REGION_CODE_MAP = settings.area.to_dict()
def _resolve_district(condition: dict) -> str:
"""从 condition.region_code 映射区县名称"""
code = condition.get("region_code")
if code and str(code) in REGION_CODE_MAP:
return REGION_CODE_MAP[str(code)]
return ""
def _build_map_title(event_type: str, condition: dict, template_name: str) -> str:
"""
构建地图标题。
格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
- 地震:陕西西安临潼区5.1级地震专题图
- 暴雨(有降雨量):陕西西安长安区120mm降雨专题图
- 暴雨(无降雨量):陕西西安降雨专题图
"""
district = _resolve_district(condition)
prefix = f"陕西西安{district}" if district else "陕西西安"
if event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None:
return f"{prefix}{float(magnitude)}{template_name}"
return f"{prefix}{template_name}"
elif event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
return f"{prefix}{float(rainfall)}mm{template_name}"
return f"{prefix}{template_name}"
return f"{prefix}{template_name}"
def _build_info_text(event_type: str, condition: dict, occurred_time) -> str:
"""
构建信息面板文本(左上角橙色矩形区域)。
暴雨:时间 + 累计降雨量(如有)+ 已持续(如有)
地震:时间 + 震级(如有)+ 位置(如有)
不显示灾害类型标签(暴雨/地震)。
"""
lines = []
# 时间
if isinstance(occurred_time, datetime):
time_str = f"{occurred_time.year}{occurred_time.month:02d}{occurred_time.day:02d}{occurred_time.hour:02d}{occurred_time.minute:02d}"
elif occurred_time:
time_str = str(occurred_time)
else:
time_str = datetime.now().strftime("%Y年%m月%d%H时%M分")
lines.append(f"时间:{time_str}")
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
lines.append(f"累计降雨量:{float(rainfall)}mm")
duration = condition.get("duration")
if duration is not None and duration != "":
lines.append(f"已持续:{duration}")
elif event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None and magnitude != "":
lines.append(f"震级:{float(magnitude)}")
lon = condition.get("epicenter_lon")
lat = condition.get("epicenter_lat")
if lon is not None and lat is not None:
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
return "\n".join(lines)
def _derive_model_params(
inference: dict, batch_folder: str, template_path: str
) -> dict:
"""从推理结果 + 模板路径推导专题图生成所需的全部参数。"""
event_type = inference["event_type"]
condition = inference["condition"]
occurred_time = inference["occurred_time"]
map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3")
zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11")
zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50")
# 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀)
template_name = os.path.splitext(os.path.basename(template_path))[0]
map_title = _build_map_title(event_type, condition, template_name)
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
# 制图时间统一用当前日期
map_time = datetime.now().strftime("%Y-%m-%d")
center_x, center_y = _extract_center_from_condition(
event_type, condition
)
info_text = _build_info_text(event_type, condition, occurred_time)
return {
"name": f"inference_{inference['id']}",
"path": template_path,
"outFile": out_file,
"mapLayout": map_layout,
"mapTitle": map_title,
"mapTime": map_time,
"mapUint": map_unit,
"info": info_text,
"centerX": center_x,
"centerY": center_y,
"event": str(inference["id"]),
"queueId": str(inference["id"]),
"zoomRule": zoom_rule,
"zoomValue": zoom_value,
}
def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
"""从 condition 提取地图中心坐标"""
if event_type == "earthquake":
lon = condition.get("epicenter_lon", xian_center[0])
lat = condition.get("epicenter_lat", xian_center[1])
return float(lon), float(lat)
else:
return xian_center[0], xian_center[1]
# ============================================================
# 构建 QGIS 服务配置字典
# ============================================================
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
gpkg_dir = get_gpkg_dir()
return {
"db": {
"host": settings.DB_HOST,
"port": settings.DB_PORT,
"database": settings.DB_NAME,
"username": settings.DB_USER,
"password": settings.DB_PASSWORD,
},
"qgis": {
"exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300),
},
"template_override": {
"enabled": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ENABLED", False),
"original": {
"host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST", "localhost"),
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_PORT", 5432),
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_DB_NAME", "yjzyk_xian"),
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_SCHEMA", "base"),
},
"actual": {
"host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST", ""),
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT", ""),
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME", "xian_new"),
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA", "qgis"),
"username": getattr(settings, "DB_USER", "postgres"),
"password": getattr(settings, "DB_PASSWORD", ""),
},
},
"static_layers": build_static_layers_config(gpkg_dir),
"batch_folder": batch_folder,
}
# ============================================================
# 接口实现
# ============================================================
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 occurred_time 视为同一场灾害,共享文件夹
"""
from app.services.qgis.qgis_env import is_qgis_available
if not is_qgis_available():
raise HTTPException(status_code=503, detail="QGIS 环境不可用(未找到 QGIS Python 3.12 解释器)")
semaphore = get_prediction_semaphore()
async with semaphore:
inference_id = req.inferenceId
loop = asyncio.get_event_loop()
try:
# 查询推理记录,获取 occurred_time
inference = await loop.run_in_executor(
None, qgis_repository.query_inference_result, inference_id
)
# 将 occurred_time 格式化为时间戳作为文件夹名
disaster_time = format_disaster_time(inference["occurred_time"])
event_type = inference["event_type"]
# 构建批次文件夹路径
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:disasterTime")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":disasterTime", disaster_time)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
# 去重检查:同一 disasterTime 只产图一次
if os.path.isdir(batch_folder):
existing_files = [
f for f in os.listdir(batch_folder) if f.endswith(".jpg")
]
if existing_files and disaster_time not in _in_progress_locks:
logger.info(
f"[去重] disasterTime={disaster_time} 已产图,直接返回 "
f"({len(existing_files)} 张)"
)
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing_files)}",
data=disaster_time,
)
# 去重锁:同一灾害时间排队等待
async with _locks_lock:
if disaster_time not in _in_progress_locks:
_in_progress_locks[disaster_time] = asyncio.Lock()
task_lock = _in_progress_locks[disaster_time]
async with task_lock:
if os.path.isdir(batch_folder):
existing_files = [
f for f in os.listdir(batch_folder) if f.endswith(".jpg")
]
if existing_files:
logger.info(
f"[去重] disasterTime={disaster_time} 已被并发请求产图完成"
)
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing_files)}",
data=disaster_time,
)
logger.info(
f"推理结果查询成功: id={inference['id']}, "
f"type={inference['event_type']}, "
f"occurred_time={inference['occurred_time']}, "
f"disasterTime={disaster_time}"
)
# 推导参数 + 构建配置
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 扫描模板文件夹下所有 .qgz 文件
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
if not template_files:
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
# 构建所有模型参数(批量模式)
models = []
for tpl_file in template_files:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}, "
f"中心: ({model['centerX']}, {model['centerY']})"
)
models.append(model)
# 一次性提交所有模型到 QGIS 子进程(单次 DLL 加载)
_generate_batch_maps(models, config, disaster_time)
return QgisMapExportResponse(
code=200,
message=f"任务已完成,共{len(models)}张专题图",
data=disaster_time,
)
except ValueError as e:
logger.warning(f"参数错误: {e}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
logger.error(f"模板文件不存在: {e}")
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
except Exception as e:
logger.error(f"专题图导出失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
finally:
pass
def _generate_batch_maps(models: list, config: dict, disaster_time: str) -> None:
"""通过 QGIS Python 3.12 子进程批量生成专题图(单次 DLL 加载)"""
import json
import subprocess
import tempfile
from app.services.qgis.qgis_env import build_qgis_command, build_clean_subprocess_env
try:
logger.info(f"[批量产图] 开始: {len(models)} 张专题图, batch={disaster_time}")
# 构建子进程请求 JSON —— 批量格式
request_data = json.dumps(
{"config": config, "models": models},
ensure_ascii=False,
)
# 将请求 JSON 写入临时文件(避免 stdin 管道问题)
tmp_json = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8"
)
tmp_json.write(request_data)
tmp_json.close()
try:
from config import settings
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
cmd = build_qgis_command(qgis_root)
cmd.append(tmp_json.name)
logger.info(f"[批量产图] 启动 QGIS 子进程: {' '.join(cmd[:2])}...")
result = subprocess.run(
cmd,
capture_output=True,
timeout=600, # 10 分钟超时(15 张模板 × ~40s/张 ≈ 600s
env=build_clean_subprocess_env(),
# 使用干净的环境变量:移除 venv 的 PYTHONPATH/VIRTUAL_ENV/PATH 污染,
# 避免 QGIS Python 3.12 的 DLL 加载被干扰导致 0xC0000005 崩溃。
# QGIS Python 3.12 仅通过 sys.path 即可正确加载所有模块和 DLL。
)
finally:
try:
os.remove(tmp_json.name)
except OSError:
pass
# 解析子进程输出 —— 优先检查 stdout 是否有有效结果
stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
parsed_output = None
if stdout_text:
for line in reversed(stdout_text.split("\n")):
line = line.strip()
if line.startswith("{"):
try:
parsed_output = json.loads(line)
break
except json.JSONDecodeError:
continue
if parsed_output is not None:
batch_results = parsed_output.get("results", [])
success_count = sum(1 for r in batch_results if "error" not in r)
fail_count = len(batch_results) - success_count
logger.info(
f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}"
)
for r in batch_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r.get('error', 'unknown')}")
if success_count == 0 and fail_count > 0:
first_err = batch_results[0].get("error", "unknown")
raise RuntimeError(
f"QGIS 子进程所有模型均失败 ({fail_count}张): {first_err}"
)
elif result.returncode != 0:
# stdout 没有有效结果且退出码异常,才报错
stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
logger.error(f"[批量产图] QGIS 子进程失败 (exit={result.returncode}):")
for line in stderr_text.split("\n"):
logger.error(f" {line}")
raise RuntimeError(
f"QGIS 子进程失败: {stderr_text[:300]}"
)
else:
logger.warning("[批量产图] 子进程无有效输出,exit code = 0")
except subprocess.TimeoutExpired:
logger.error(f"[批量产图] QGIS 子进程超时 (300s)")
raise
except Exception as e:
logger.error(f"[批量产图] 产图失败: {e}", exc_info=True)
raise
# ============================================================
# 清理函数
# ============================================================
def shutdown_thread_pool() -> None:
"""关闭线程池(在 server.py lifespan 关闭阶段调用)"""
_thread_pool.shutdown(wait=False)
logger.info("QGIS 线程池已关闭")
+2 -2
View File
@@ -27,5 +27,5 @@ def get_logger(name: str = "algorithm"):
Returns:
logging.Logger 实例
"""
from app.utils.logger import get_logger as _get_logger
return _get_logger(name, str(LOG_DIR))
from app.utils.logger import LoggerManager
return LoggerManager.get_logger(name, str(LOG_DIR))
+108
View File
@@ -0,0 +1,108 @@
"""
QGIS 图层映射配置。
所有模板→目标库的表名/图层映射集中管理,避免散落在业务代码中。
"""
from pathlib import Path
import os
# ============================================================
# 静态底图映射:模板中 PostgreSQL 图层 → 本地 GPKG 文件
# key: 模板图层名(用于匹配 maplayer/layername
# table: 原始数据源表名(schema.table 格式,用于匹配 datasource 替换)
# gpkg: 本地 GPKG 文件名(相对于 GPKG 目录)
# ============================================================
STATIC_LAYERS = {
"水库": {"table": "base.rivers", "gpkg": "rivers.gpkg"},
"市州驻地": {"table": "base.sx_capital", "gpkg": "sx_capital.gpkg"},
"河流": {"table": "base.river", "gpkg": "river.gpkg"},
"active_fault": {"table": "base.active_fault", "gpkg": "active_fault.gpkg"},
"陕西省": {"table": "base.sx", "gpkg": "sx.gpkg"},
"乡镇驻地": {"table": "base.sx_street", "gpkg": "sx_street.gpkg"},
"区县驻地": {"table": "base.sx_xa_county", "gpkg": "sx_xa_county.gpkg"},
"县界": {"table": "base.sx_xa_county_boundary", "gpkg": "sx_xa_county_boundary.gpkg"},
"周边区县": {"table": "base.sx_zb_county_boundary", "gpkg": "sx_zb_county_boundary.gpkg"},
"周边市州": {"table": "base.sx_zb_city", "gpkg": "sx_zb_city.gpkg"},
"周边县区": {"table": "base.sx_zb_county", "gpkg": "sx_zb_county.gpkg"},
"traffic_expressway": {"table": "base.traffic_expressway", "gpkg": "traffic_expressway.gpkg"},
"traffic_provincial": {"table": "base.traffic_provincial", "gpkg": "traffic_provincial.gpkg"},
"traffic_railway": {"table": "base.traffic_railway", "gpkg": "traffic_railway.gpkg"},
"traffic_township": {"table": "base.traffic_township", "gpkg": "traffic_township.gpkg"},
"traffic_trunk_line": {"table": "base.traffic_trunk_line", "gpkg": "traffic_trunk_line.gpkg"},
}
# ============================================================
# OGR 本地图层 → PostgreSQL 转换
# 设计师用本地 JSON/shp 文件制作模板,数据已迁移到 qgis schema
# key: layername(模板 datasource 中 |layername=xxx 的值)
# value: qgis schema 中的目标表名
# ============================================================
OGR_TO_POSTGRES = {
"公园": "shelter_park",
"学校": "shelter_school",
"文化馆": "shelter_cultural",
"人防设施": "shelter_defence",
"体育馆": "shelter_gymnasium",
"广场": "shelter_square",
"住宿": "shelter_stay",
}
# ============================================================
# 表名映射:模板中引用的表名 → 目标库中实际表名
# (模板由多个源库的表拼合,迁移后表名可能不同)
# ============================================================
TABLE_RENAMES = {
"hazard_waterlogging": "hazard_hydrops", # 积水点
}
# ============================================================
# Schema 替换:模板源 schema → 目标 schema
# ============================================================
SCHEMA_REPLACEMENTS = ["base", "kspg", "dzxx"]
# ============================================================
# 图层过滤配置(layer_filter.py 使用)
# ============================================================
EVENT_LAYERS = ["eqcenter", "震中"] # 按 event 字段过滤
QUEUE_LAYERS = [
"intensity", "intensity_mian",
"dz_ryss", "dz_jjss", "dz_rysw", "dz_jzph", "dz_xzjl",
] # 按 eqqueue_id 字段过滤
# ============================================================
# GPKG 目录路径(项目根目录相对路径)
# ============================================================
GPKG_SUBDIR = "app/data/gpkg"
def get_gpkg_dir(project_root: str = None) -> str:
"""获取 GPKG 目录绝对路径"""
if project_root is None:
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
gpkg_dir = os.path.join(project_root, GPKG_SUBDIR)
return os.path.normpath(gpkg_dir).replace("\\", "/")
def build_static_layers_config(gpkg_dir: str = None) -> dict:
"""
构建 template_modifier / qgis_runner 使用的静态底图配置。
返回格式与原有 template_override 的 static_layers 段兼容。
"""
if gpkg_dir is None:
gpkg_dir = get_gpkg_dir()
layers = {}
for name, info in STATIC_LAYERS.items():
layers[name] = {
"file": info["gpkg"],
"table": info["table"],
}
return {
"enabled": True,
"gpkg_dir": gpkg_dir,
"layers": layers,
}
+5 -5
View File
@@ -78,13 +78,13 @@ def check_dependencies(project_root: Path):
],
check=True
)
print(" 依赖安装完成(虚拟环境)")
print("[OK] 依赖安装完成(虚拟环境)")
else:
print(" 所有依赖已安装(虚拟环境)")
print("[OK] 所有依赖已安装(虚拟环境)")
except subprocess.CalledProcessError as e:
print(f" 依赖检查/安装失败: {e}")
print(f"[FAIL] 依赖检查/安装失败: {e}")
sys.exit(1)
except Exception as e:
print(f" 依赖检查出错: {e}")
print(f"[FAIL] 依赖检查出错: {e}")
sys.exit(1)
+1 -1
View File
@@ -32,7 +32,7 @@ def check_environment():
if major == 3 and minor == 10:
return True
else:
print(f" Python版本不符合要求!")
print(f"[FAIL] Python版本不符合要求!")
print(f" 当前版本: {python_version}")
print(f" 要求版本: 3.10.x")
print(f"\n请使用 Python 3.10 版本运行此项目")
+14 -5
View File
@@ -21,9 +21,12 @@ class AppLauncher:
project_root: 项目根目录路径
"""
self.project_root = project_root
# 延迟导入logger
from app.utils.logger import get_logger
self.logger = get_logger()
"""
初始化logger
"""
self.logger = None
def run(self):
"""执行完整的启动流程"""
@@ -67,13 +70,19 @@ class AppLauncher:
# 启动应用
print("\n" + "=" * 50)
print(" 所有检查通过,准备启动应用...")
print("[OK] 所有检查通过,准备启动应用...")
print("=" * 50)
# 延迟导入logger
from app.utils.logger import get_logger
self.logger = get_logger()
self.logger.info("系统环境检查通过,开始执行主程序...")
start()
except Exception as e:
self.logger.error(f"启动失败: {e}")
if self.logger:
self.logger.error(f"启动失败: {e}")
else:
print(f"[FAIL] 启动失败: {e}")
sys.exit(1)
+23 -2
View File
@@ -9,6 +9,7 @@ from fastapi import FastAPI, Request
from app.utils.api_deps import get_rainfall_model, get_earthquake_model, is_model_loaded
from app.schemas.api_schemas import HealthResponse
from app.config.paths import get_logger
from config import settings
logger = get_logger("api")
@@ -20,9 +21,29 @@ async def lifespan(app: FastAPI):
get_rainfall_model()
get_earthquake_model()
logger.info("DBN模型预加载完成")
yield
logger.info("应用关闭")
# 检测 QGIS 子进程环境
qgis_root = getattr(settings, "QGIS_ROOT", None)
if qgis_root:
try:
from app.services.qgis.qgis_env import is_qgis_available
if is_qgis_available(qgis_root):
logger.info("QGIS 环境检测通过(子进程模式)")
else:
logger.warning("QGIS 环境不可用(未找到 Python 3.12 解释器),专题图功能降级")
except Exception as e:
logger.error(f"QGIS 环境检测失败: {e}")
yield
# 清理资源
try:
from app.api.qgis_map_export import shutdown_thread_pool
shutdown_thread_pool()
except Exception:
pass
logger.info("应用关闭")
def create_app() -> FastAPI:
"""创建 FastAPI 应用实例"""
+4 -4
View File
@@ -31,14 +31,14 @@ def check_virtualenv(project_root: Path) -> bool:
python_exe = venv_path / "bin" / "python3"
if not venv_path.exists():
print(f"\n 虚拟环境不存在,正在创建...")
print(f"\n[WARN] 虚拟环境不存在,正在创建...")
try:
subprocess.run([sys.executable, "-m", "venv", str(venv_path)], check=True)
print(" 虚拟环境创建成功")
print("[OK] 虚拟环境创建成功")
return True # 继续执行后续步骤
except subprocess.CalledProcessError as e:
print(f" 虚拟环境创建失败: {e}")
print(f"[FAIL] 虚拟环境创建失败: {e}")
sys.exit(1)
else:
print(f" 虚拟环境已存在: {venv_path}")
print(f"[OK] 虚拟环境已存在: {venv_path}")
return True
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+2 -1
View File
@@ -2,5 +2,6 @@
Repositories package - 数据访问层
"""
from app.repositories.rainfall_repository import rainfall_repository, RainfallRepository
from app.repositories.qgis_repository import qgis_repository, QgisRepository
__all__ = ['rainfall_repository', 'RainfallRepository']
__all__ = ['rainfall_repository', 'RainfallRepository', 'qgis_repository', 'QgisRepository']
+31
View File
@@ -0,0 +1,31 @@
from app.config.paths import get_logger
from app.utils.db_helper import db_helper
logger = get_logger("qgis")
class QgisRepository:
@staticmethod
def query_inference_result(inference_id: int) -> dict:
"""根据 inferenceId 查询 xian_inference_result"""
sql = """
SELECT id, name, event_type, occurred_time, condition
FROM xian_inference_result
WHERE id = %s
"""
rows = db_helper.execute_query(sql, (inference_id,))
if not rows:
raise ValueError(f"推理结果不存在: id={inference_id}")
row = rows[0]
return {
"id": row["id"],
"name": row["name"] or "",
"event_type": row["event_type"] or "",
"occurred_time": row["occurred_time"],
"condition": row["condition"] if isinstance(row["condition"], dict) else {},
}
qgis_repository = QgisRepository()
+15
View File
@@ -44,6 +44,21 @@ class EarthquakePredictRequest(BaseModel):
description="操作类型(如 '模拟', '实时监测', '应急评估'")
# ============================================================
# 专题图产出
# ============================================================
class QgisMapExportRequest(BaseModel):
"""专题图导出请求"""
inferenceId: int = Field(..., description="推理结果IDxian_inference_result.id")
class QgisMapExportResponse(BaseModel):
"""专题图导出响应"""
code: int = Field(200, description="状态码")
message: str = Field("success", description="提示信息")
data: Optional[str] = Field(None, description="导出图片的访问路径")
# ============================================================
# 通用响应
# ============================================================
+174
View File
@@ -0,0 +1,174 @@
"""
一次性脚本:从 PostgreSQL 导出静态底图为 GeoPackage 文件。
优先使用 ogr2ogrQGIS 自带),回退到 geopandas。
运行一次即可,之后服务直接读本地 GPKG。
用法: python -m app.script.export_static_layers
"""
import os
import subprocess
import sys
import time
# 确保项目根目录在 sys.path 中
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.insert(0, project_root)
# ============================================================
# 配置
# ============================================================
QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
ogr2ogr_path = os.path.join(QGIS_ROOT, "bin", "ogr2ogr.exe")
GPKG_DIR = os.path.join(project_root, "app", "data", "gpkg")
# 静态图层定义: {显示名: (schema.table, gpkg文件名)}
STATIC_LAYERS = [
("水库", "qgis.rivers", "rivers.gpkg"),
("市州驻地", "qgis.sx_capital", "sx_capital.gpkg"),
("河流", "qgis.river", "river.gpkg"),
("active_fault", "qgis.active_fault", "active_fault.gpkg"),
("陕西省", "qgis.sx", "sx.gpkg"),
("乡镇驻地", "qgis.sx_street", "sx_street.gpkg"),
("区县驻地", "qgis.sx_xa_county", "sx_xa_county.gpkg"),
("县界", "qgis.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
("周边区县", "qgis.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
("周边市州", "qgis.sx_zb_city", "sx_zb_city.gpkg"),
("周边县区", "qgis.sx_zb_county", "sx_zb_county.gpkg"),
("traffic_expressway", "qgis.traffic_expressway", "traffic_expressway.gpkg"),
("traffic_provincial", "qgis.traffic_provincial", "traffic_provincial.gpkg"),
("traffic_railway", "qgis.traffic_railway", "traffic_railway.gpkg"),
("traffic_township", "qgis.traffic_township", "traffic_township.gpkg"),
("traffic_trunk_line", "qgis.traffic_trunk_line", "traffic_trunk_line.gpkg"),
]
# ============================================================
# 方法一: ogr2ogr(推荐,QGIS 自带)
# ============================================================
def _setup_gdal_env():
"""设置 GDAL/OGR 运行环境"""
gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal")
gdal_lib = os.path.join(QGIS_ROOT, "apps", "gdal", "lib")
gdal_bin = os.path.join(QGIS_ROOT, "apps", "gdal", "bin")
if os.path.isdir(gdal_data):
os.environ["GDAL_DATA"] = gdal_data
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
paths_to_add = [p for p in [gdal_bin, gdal_lib] if os.path.isdir(p)]
os.environ["PATH"] = ";".join(paths_to_add) + ";" + os.environ.get("PATH", "")
def _export_with_ogr2ogr(host, port, dbname, user, password, schema, table, gpkg_path):
"""用 ogr2ogr 导出单个图层"""
conn = f"PG:host={host} port={port} dbname={dbname} user={user} password={password}"
cmd = [
ogr2ogr_path,
"-f", "GPKG",
gpkg_path,
conn,
"-sql", f'SELECT * FROM "{schema}"."{table}"',
"-nln", table,
"-overwrite",
"-t_srs", "EPSG:4326",
]
result = subprocess.run(cmd, capture_output=True, timeout=120)
if result.returncode != 0:
stderr = result.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(stderr[:300])
return True
# ============================================================
# 方法二: geopandas(回退)
# ============================================================
def _export_with_geopandas(host, port, dbname, user, password, schema, table, gpkg_path):
"""用 geopandas 导出单个图层"""
import geopandas as gpd
from sqlalchemy import create_engine
conn_str = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
engine = create_engine(conn_str)
gdf = gpd.read_postgis(
f'SELECT * FROM "{schema}"."{table}"',
engine,
geom_col="Geometry",
)
if gdf.crs is None:
gdf = gdf.set_crs(epsg=4326)
gdf.to_file(gpkg_path, driver="GPKG")
engine.dispose()
return len(gdf)
# ============================================================
# 主入口
# ============================================================
def main():
# 从 config.settings 读取数据库配置
try:
from config import settings
host = getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST",
getattr(settings, "DB_HOST", "47.92.216.173"))
port = str(getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT",
getattr(settings, "DB_PORT", 7654)))
dbname = getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME",
getattr(settings, "DB_NAME", "xian_new"))
user = getattr(settings, "DB_USER", "postgres")
password = getattr(settings, "DB_PASSWORD", "zhangsan")
except Exception:
host, port, dbname, user, password = "47.92.216.173", "7654", "xian_new", "postgres", "zhangsan"
os.makedirs(GPKG_DIR, exist_ok=True)
# 选择导出方法
use_ogr2ogr = os.path.isfile(ogr2ogr_path)
method = "ogr2ogr" if use_ogr2ogr else "geopandas"
if use_ogr2ogr:
_setup_gdal_env()
print(f"数据库: {host}:{port}/{dbname}")
print(f"输出目录: {GPKG_DIR}")
print(f"导出方法: {method}")
print(f"{len(STATIC_LAYERS)} 个图层\n")
success = 0
failed = 0
for name, table_ref, gpkg_file in STATIC_LAYERS:
schema, table = table_ref.split(".", 1)
gpkg_path = os.path.join(GPKG_DIR, gpkg_file)
print(f"[{success + failed + 1}/{len(STATIC_LAYERS)}] {name} ({table_ref})", end=" ... ", flush=True)
t0 = time.time()
try:
if use_ogr2ogr:
_export_with_ogr2ogr(host, port, dbname, user, password, schema, table, gpkg_path)
size_kb = os.path.getsize(gpkg_path) / 1024
print(f"{size_kb:.0f} KB, {time.time() - t0:.1f}s")
else:
count = _export_with_geopandas(host, port, dbname, user, password, schema, table, gpkg_path)
print(f"{count} 行, {time.time() - t0:.1f}s")
success += 1
except Exception as e:
print(f"✗ 失败: {e}")
failed += 1
print(f"\n{'='*50}")
print(f"完成: 成功={success}, 失败={failed}, 共={len(STATIC_LAYERS)}")
print(f"输出目录: {GPKG_DIR}")
if __name__ == "__main__":
main()
+54
View File
@@ -0,0 +1,54 @@
@echo off
REM ============================================================
REM QGIS 子进程启动脚本(Windows
REM 自动检测 QGIS 安装结构,支持 qgis-ltr/qgis + Python312/Python39
REM ============================================================
if "%QGIS_ROOT%"=="" goto :no_root
if not exist "%QGIS_ROOT%" goto :no_root_dir
REM --- 检测 QGIS 应用目录 ---
set "QGIS_APP=%QGIS_ROOT%\apps\qgis-ltr"
if not exist "%QGIS_APP%" set "QGIS_APP=%QGIS_ROOT%\apps\qgis"
if not exist "%QGIS_APP%" goto :no_qgis_app
REM --- 检测 Python 目录 ---
set "PY=%QGIS_ROOT%\apps\Python312"
if not exist "%PY%" set "PY=%QGIS_ROOT%\apps\Python39"
if not exist "%PY%" set "PY=%QGIS_ROOT%\apps\Python310"
if not exist "%PY%" set "PY=%QGIS_ROOT%\apps\Python311"
if not exist "%PY%" goto :no_python
REM --- 设置环境 ---
set "PYTHONHOME=%PY%"
set "PYTHONPATH=%QGIS_APP%\python"
set "QGIS_PREFIX_PATH=%QGIS_APP%"
set "QT_PLUGIN_PATH=%QGIS_APP%\qtplugins;%QGIS_ROOT%\apps\Qt5\plugins"
set "GDAL_DATA=%QGIS_ROOT%\apps\gdal\share\gdal"
set "PYTHONUTF8=1"
set "GDAL_FILENAME_IS_UTF8=YES"
set "VSI_CACHE=TRUE"
set "VSI_CACHE_SIZE=1000000"
set "PATH=%QGIS_APP%\bin;%QGIS_ROOT%\apps\Qt5\bin;%QGIS_ROOT%\apps\gdal\lib;%PATH%"
REM 强制使用 QGIS 自带的 PROJ,避免 PostgreSQL/PostGIS 的旧 proj.db 干扰
if exist "%QGIS_ROOT%\share\proj" set "PROJ_DATA=%QGIS_ROOT%\share\proj"
REM --- 启动 ---
"%PY%\python3.exe" %*
exit /b %ERRORLEVEL%
:no_root
echo [ERROR] QGIS_ROOT 环境变量未设置
exit /b 1
:no_root_dir
echo [ERROR] QGIS_ROOT 目录不存在: %QGIS_ROOT%
exit /b 1
:no_qgis_app
echo [ERROR] 未找到 QGIS 应用目录
exit /b 1
:no_python
echo [ERROR] 未找到 QGIS Python 目录
exit /b 1
+31
View File
@@ -0,0 +1,31 @@
"""
QGIS 服务模块。
注意:此包的核心模块(map_service、template_cache 等)依赖 qgis.core
仅在 QGIS Python 3.12 子进程中可导入。主进程(Python 3.10)不应直接导入
这些模块,而应通过 qgis_runner.py 子进程调用。
安全导入(不依赖 qgis.core):
- qgis_env: 环境检测与子进程配置
- qgis_runner: 子进程入口脚本路径
"""
def _lazy_import(module_name: str, attr: str):
"""延迟导入,仅在子进程中实际使用时才加载"""
import importlib
mod = importlib.import_module(f".{module_name}", package=__name__)
return getattr(mod, attr)
__all__ = [
"MapService",
"MapExporter",
"TemplateModifier",
"TemplateCache",
"LayerFilter",
]
# 不在顶层导入 QGIS 依赖模块,避免主进程崩溃
# 使用方式:
# from app.services.qgis.map_service import MapService (仅在子进程中)
# from app.services.qgis.qgis_env import is_qgis_available (主进程安全)
+26
View File
@@ -0,0 +1,26 @@
"""
图层过滤模块。按 event 和 eqqueue_id 筛选要素。
"""
from app.config.paths import get_logger
from app.config.qgis_mappings import EVENT_LAYERS, QUEUE_LAYERS
logger = get_logger("qgis.filter")
class LayerFilter:
def apply(self, project, model: dict) -> None:
"""对项目中的图层应用过滤条件"""
event = model.get("event", "")
queue_id = model.get("queueId", "")
logger.info(f"图层过滤: event='{event}', queueId='{queue_id}'")
for name in EVENT_LAYERS:
layers = project.mapLayersByName(name)
if layers:
layers[0].setSubsetString(f"event = '{event}'")
for name in QUEUE_LAYERS:
layers = project.mapLayersByName(name)
if layers:
layers[0].setSubsetString(f"eqqueue_id = '{queue_id}'")
+49
View File
@@ -0,0 +1,49 @@
"""
地图导出模块。布局文本更新、比例尺调整、图片导出。
"""
from qgis.core import QgsLayoutExporter, QgsScaleBarSettings
from app.config.paths import get_logger
logger = get_logger("qgis.exporter")
class MapExporter:
def __init__(self, config: dict, layout):
self.config = config
self.layout = layout
def update_texts(self, model: dict) -> None:
"""更新布局中的文本标签"""
for key in ["mapTitle", "mapTime", "mapUint", "info"]:
label = self.layout.itemById(key)
if label is not None:
label.setText(model[key])
def update_scale_bar(self) -> None:
"""调整比例尺为自适应宽度模式"""
scale_bar = self.layout.itemById("ScaleBar")
if scale_bar is None:
logger.warning("比例尺控件不存在")
return
scale_bar.setSegmentSizeMode(
QgsScaleBarSettings.SegmentSizeMode.SegmentSizeFitWidth
)
scale_bar.setMaximumBarWidth(70)
scale_bar.setMinimumBarWidth(40)
def export(self, path: str) -> None:
"""导出布局为图片"""
dpi = self.config["qgis"]["exportDpi"]
settings = QgsLayoutExporter.ImageExportSettings()
settings.dpi = dpi
exporter = QgsLayoutExporter(self.layout)
result = exporter.exportToImage(path, settings)
if result != QgsLayoutExporter.Success:
raise RuntimeError(f"图片导出失败: {path}")
logger.info(f"图片已导出: {path}")
+176
View File
@@ -0,0 +1,176 @@
"""
地图生成主流程控制器。
协调模板加载、图层过滤、缩放、文本更新、导出。
"""
import os
import time
from qgis.core import QgsProject, QgsDataSourceUri
from app.config.paths import get_logger
from app.config.qgis_mappings import TABLE_RENAMES, SCHEMA_REPLACEMENTS
from .template_cache import TemplateCache
from .template_modifier import TemplateModifier
from .layer_filter import LayerFilter
from .map_exporter import MapExporter
from app.utils.map_zoom import MapZoom
from app.config.paths import get_logger
logger = get_logger("qgis.service")
# 全局模板缓存(跨请求复用)
template_cache = TemplateCache()
class MapService:
def __init__(self, config: dict):
self.config = config
def generate(self, model: dict) -> str:
"""
执行完整的地图生成流程。
Args:
model: 包含地图参数的字典
Returns:
地图名称
"""
t_start = time.time()
template_path = model["path"]
project = QgsProject.instance()
is_cache_hit = template_cache.is_loaded(template_path)
# 加载/恢复模板
if is_cache_hit:
logger.info(f"[缓存命中] {os.path.basename(template_path)}")
project, texts, extent = template_cache.restore_template(template_path)
template_cache.reset_project_state(project, texts, extent)
else:
logger.info(f"[首次加载] {os.path.basename(template_path)}")
# 直接读原始模板,不做文件级修改(避免 ZIP 兼容性问题)
project.read(template_path)
# 更新图层连接 + GPKG/SRID/表名修正(仅首次加载)
if not is_cache_hit:
self._update_db_connections(project)
# 图层过滤
LayerFilter().apply(project, model)
# 地图缩放
layout = project.layoutManager().layoutByName(model["mapLayout"])
if layout is None:
available = [l.name() for l in project.layoutManager().layouts()]
raise RuntimeError(
f"模板中未找到布局 '{model['mapLayout']}',可用布局:{available}"
)
map_item = layout.itemById("Map")
zoom = MapZoom(project, layout, map_item)
zoom.execute(model["zoomRule"], {
"X": model["centerX"],
"Y": model["centerY"],
"value": model["zoomValue"],
})
# 文本更新 + 比例尺 + 导出
exporter = MapExporter(self.config, layout)
exporter.update_texts(model)
exporter.update_scale_bar()
exporter.export(model["outFile"])
elapsed = time.time() - t_start
logger.info(
f"{'[缓存命中]' if is_cache_hit else '[首次加载]'} "
f"导出完成: {model['name']},耗时 {elapsed:.1f}s"
)
return model["name"]
def _update_db_connections(self, project: QgsProject) -> None:
"""更新图层连接 + SRID修正 + GPKG静态层替换"""
db_config = self.config["db"]
override = self.config.get("template_override", {})
actual_schema = override.get("actual", {}).get("schema", "qgis")
static_config = self.config.get("static_layers", {})
static_enabled = static_config.get("enabled", False)
gpkg_dir = static_config.get("gpkg_dir", "")
static_layers_map = static_config.get("layers", {})
static_count = 0
for layer in project.mapLayers().values():
provider = layer.providerType()
# GPKG 静态层替换:postgres 表 → 本地 GPKG 文件
if provider == "postgres" and static_enabled:
uri_str = layer.dataProvider().uri().uri()
for name, info in static_layers_map.items():
table_key = info["table"]
schema, table = table_key.split(".", 1)
if f'table="{schema}"."{table}"' in uri_str:
gpkg_path = os.path.join(gpkg_dir, info["file"]).replace("\\", "/")
layer.setDataSource(gpkg_path, name, "ogr")
static_count += 1
logger.debug(f"静态图层 {name} → GPKG")
break
else:
# 没匹配到静态层,继续处理为 postgres
self._fix_postgres_layer(layer, db_config, actual_schema)
continue
if provider == "ogr":
static_count += 1
continue
if provider == "postgres":
self._fix_postgres_layer(layer, db_config, actual_schema)
if static_count:
logger.info(f"静态底图已本地化: {static_count} 个图层")
@staticmethod
def _fix_postgres_layer(layer, db_config, actual_schema):
"""修正单个 PostgreSQL 图层的连接参数"""
try:
uri = layer.dataProvider().uri()
uri.setConnection(
db_config["host"],
str(db_config["port"]),
db_config["database"],
db_config["username"],
db_config["password"],
)
# Schema 替换
uri_str = uri.uri()
for old_schema in SCHEMA_REPLACEMENTS:
if f'table="{old_schema}".' in uri_str:
uri_str = uri_str.replace(
f'table="{old_schema}".',
f'table="{actual_schema}".',
)
uri = QgsDataSourceUri(uri_str)
break
# 表名映射
uri_str = uri.uri()
for old_name, new_name in TABLE_RENAMES.items():
full_old = f'table="{actual_schema}"."{old_name}"'
full_new = f'table="{actual_schema}"."{new_name}"'
if full_old in uri_str:
uri_str = uri_str.replace(full_old, full_new)
uri = QgsDataSourceUri(uri_str)
# SRID 修正
uri_str = uri.uri()
if " srid=0 " in uri_str:
uri_str = uri_str.replace(" srid=0 ", " srid=4326 ")
uri = QgsDataSourceUri(uri_str)
layer.setDataSource(uri.uri(), layer.name(), "postgres")
if layer.isValid():
fc = layer.featureCount()
logger.info(f"图层 {layer.name()} 连接更新成功 ({fc} features)")
else:
logger.error(f"图层 {layer.name()} 更新后仍无效")
except Exception as e:
logger.error(f"更新图层 {layer.name()} 连接失败: {e}")
+314
View File
@@ -0,0 +1,314 @@
"""
QGIS 环境检测与子进程配置模块。
主进程运行在 Python 3.10,无法直接加载 QGIS 的 Python 3.12 C 扩展。
所有 QGIS 操作通过 subprocess 调用 QGIS Python 3.12 执行。
本模块提供:
- get_qgis_python_path(): 检测 QGIS Python 3.12 解释器路径
- build_qgis_command(): 构建通过 .bat 启动 QGIS 子进程的命令
- is_qgis_available(): 检查 QGIS 是否可用
- get_runner_script(): 获取 qgis_runner.py 的路径
"""
import os
import tempfile
from pathlib import Path
from app.config.paths import get_logger
logger = get_logger("qgis.env")
def get_qgis_python_path(qgis_root: str = None) -> str | None:
"""
检测 QGIS 自带的 Python 3.12 解释器路径。
Windows: {QGIS_ROOT}/apps/Python312/python3.exe
Returns:
解释器绝对路径,不存在则返回 None
"""
if qgis_root is None:
qgis_root = _detect_qgis_root()
if qgis_root is None:
return None
import platform
if platform.system() == "Windows":
for name in ("python3.exe", "python.exe"):
candidate = os.path.join(qgis_root, "apps", "Python312", name)
if os.path.isfile(candidate):
logger.info(f"检测到 QGIS Python: {candidate}")
return candidate
logger.warning(f"未找到 QGIS Python 3.12")
return None
else:
import shutil
# 优先检查环境变量
env_python = os.environ.get("QGIS_PYTHON_PATH")
if env_python and os.path.isfile(env_python):
logger.info(f"QGIS_PYTHON_PATH 指定: {env_python}")
return env_python
# 搜索标准 Linux QGIS 安装路径
linux_paths = [
"/usr/bin/qgis", # Ubuntu/Debian apt 安装
"/usr/bin/qgis.bin",
"/opt/QGIS/apps/Python312/bin/python3", # 独立安装器
"/opt/QGIS/apps/Python3/bin/python3",
"/usr/libexec/qgis/python3", # Fedora/RHEL
]
for candidate in linux_paths:
if os.path.isfile(candidate):
logger.info(f"检测到 QGIS 可执行文件: {candidate}")
return candidate
# 回退到系统 Python(如果 qgis.core 可导入)
sys_python = shutil.which("python3") or shutil.which("python")
if sys_python:
logger.info(f"Linux 环境,使用系统 Python: {sys_python}")
return sys_python
return None
def build_qgis_command(qgis_root: str = None) -> list[str]:
"""
构建 QGIS 子进程启动命令(直接调用 Python,不经过 bat 包装器)。
环境变量通过 build_qgis_env() 构建后传给 subprocess.run(env=...)。
"""
if qgis_root is None:
qgis_root = _detect_qgis_root() or "D:/QGIS"
python_path = get_qgis_python_path(qgis_root)
if not python_path:
raise RuntimeError("未找到 QGIS Python 3.12 解释器")
runner_script = get_runner_script()
if not os.path.isfile(runner_script):
raise RuntimeError(f"QGIS Runner 脚本不存在: {runner_script}")
return [python_path, runner_script]
def build_qgis_env(qgis_root: str = None) -> dict:
"""
构建 QGIS 子进程所需的完整环境变量字典。
基于当前进程环境继承,设置 QGIS 所需的 PYTHONPATH、PATH、
GDAL_DATA、PROJ_DATA 等变量。调用方直接传给 subprocess.run(env=...)。
"""
import platform
if qgis_root is None:
qgis_root = _detect_qgis_root() or "D:/QGIS"
env = dict(os.environ)
if platform.system() != "Windows":
return env
# --- 检测 QGIS 应用目录 ---
qgis_app_dir = os.path.join(qgis_root, "apps", "qgis-ltr")
if not os.path.isdir(qgis_app_dir):
qgis_app_dir = os.path.join(qgis_root, "apps", "qgis")
if not os.path.isdir(qgis_app_dir):
raise RuntimeError(f"未找到 QGIS 应用目录: {qgis_root}\\apps\\qgis-ltr 或 qgis")
# --- 检测 Python 目录 ---
python_dir = os.path.join(qgis_root, "apps", "Python312")
if not os.path.isdir(python_dir):
for name in ("Python39", "Python310", "Python311"):
candidate = os.path.join(qgis_root, "apps", name)
if os.path.isdir(candidate):
python_dir = candidate
break
# --- 核心路径 ---
qtplugins = os.path.join(qgis_app_dir, "qtplugins")
qt5_plugins = os.path.join(qgis_root, "apps", "Qt5", "plugins")
gdal_data = os.path.join(qgis_root, "apps", "gdal", "share", "gdal")
qgis_python = os.path.join(qgis_app_dir, "python")
qgis_bin = os.path.join(qgis_app_dir, "bin")
qt5_bin = os.path.join(qgis_root, "apps", "Qt5", "bin")
gdal_lib = os.path.join(qgis_root, "apps", "gdal", "lib")
env["PYTHONPATH"] = qgis_python
env["QGIS_PREFIX_PATH"] = qgis_app_dir
env["QT_PLUGIN_PATH"] = f"{qtplugins};{qt5_plugins}"
env["GDAL_DATA"] = gdal_data
env["PYTHONUTF8"] = "1"
env["GDAL_FILENAME_IS_UTF8"] = "YES"
env["VSI_CACHE"] = "TRUE"
env["VSI_CACHE_SIZE"] = "1000000"
# --- PROJ 数据目录 ---
for pd in [
os.path.join(qgis_app_dir, "share", "proj"),
os.path.join(qgis_root, "share", "proj"),
os.path.join(qgis_root, "apps", "gdal", "share", "proj"),
]:
if os.path.isfile(os.path.join(pd, "proj.db")):
env["PROJ_DATA"] = pd
break
# --- PATH:前置 QGIS 二进制目录 ---
prepend = f"{qgis_bin};{qt5_bin};{gdal_lib}"
env["PATH"] = f"{prepend};{env.get('PATH', '')}"
logger.debug(f"QGIS env built: QGIS_ROOT={qgis_root}, QGIS_APP={qgis_app_dir}")
return env
def build_clean_subprocess_env() -> dict:
"""
为 QGIS Python 3.12 子进程构建干净的环境变量。
问题背景:
主进程运行在 venv Python 3.10 中,继承了 venv 的环境变量
PYTHONPATH 指向 venv site-packages、VIRTUAL_ENV、PATH 含 venv Scripts 等)。
QGIS Python 3.12 子进程如果继承这些变量,DLL 加载会被干扰,
导致 0xC0000005 (ACCESS_VIOLATION) 崩溃。
同时,QGIS_PREFIX_PATH、QT_PLUGIN_PATH 等变量也会与
QGIS 内置 DLL 加载机制冲突,同样导致崩溃。
策略:
从 os.environ 中移除所有 Python/venv 相关的污染变量,
只保留操作系统正常运行所需的基础变量(SystemRoot、TEMP 等)。
QGIS Python 3.12 仅通过 sys.path 即可正确加载所有模块和 DLL。
"""
env = dict(os.environ)
# 先记录 venv 路径(后续清理 PATH 时需要用)
venv_root = env.get("VIRTUAL_ENV", "").lower()
# 移除 Python/venv 相关变量 —— 这些会污染 QGIS Python 3.12 的 DLL 加载
for key in [
"PYTHONPATH", # venv 设置,指向 site-packages → DLL 冲突
"PYTHONHOME", # 如果存在,改变 Python 查找路径
"VIRTUAL_ENV", # venv 标识,不必要
"PYTHONDONTWRITEBYTECODE",
"QGIS_PREFIX_PATH", # 与 QGIS 内置 DLL 加载冲突
"QT_PLUGIN_PATH", # 与 Qt DLL 加载冲突
"GDAL_DATA", # build_qgis_env 设置,可能导致路径冲突
"PROJ_DATA", # build_qgis_env 设置,可能导致路径冲突
]:
env.pop(key, None)
# 清理 PATH:仅移除 venv 相关路径(避免 DLL 搜索顺序冲突)
# 保留 QGIS/系统路径(测试证明这些是安全的)
if venv_root:
path_parts = env.get("PATH", "").split(";")
clean_parts = []
for p in path_parts:
pl = p.lower().strip()
if not pl:
continue
# 跳过 venv 相关路径(.venv/Scripts, .venv/Lib 等)
if venv_root in pl:
continue
clean_parts.append(p)
env["PATH"] = ";".join(clean_parts)
# 移除 VIRTUAL_ENV 本身(已从 PATH 清理中使用过)
env.pop("VIRTUAL_ENV", None)
logger.debug(
f"Clean env built: removed PYTHONPATH/VIRTUAL_ENV/venv-PATH, "
f"kept {len(env)} vars"
)
return env
def is_qgis_available(qgis_root: str = None) -> bool:
"""检查 QGIS 环境是否可用"""
return get_qgis_python_path(qgis_root) is not None
def get_runner_script() -> str:
"""获取 qgis_runner.py 的绝对路径"""
return str(Path(__file__).parent / "qgis_runner.py")
def _generate_bat_wrapper(qgis_root: str, python_path: str, runner_script: str) -> str:
"""生成 .bat 包装脚本,设置 QGIS 环境变量并启动 runner"""
# 自动检测 QGIS 应用目录(qgis-ltr 或 qgis
qgis_app_dir = os.path.join(qgis_root, "apps", "qgis-ltr")
if not os.path.isdir(qgis_app_dir):
qgis_app_dir = os.path.join(qgis_root, "apps", "qgis")
qgis_app_dir = qgis_app_dir.replace("/", "\\")
# 自动检测 PROJ 目录
proj_data = ""
for pd in [
os.path.join(qgis_app_dir, "share", "proj"),
os.path.join(qgis_root, "share", "proj"),
os.path.join(qgis_root, "apps", "gdal", "share", "proj"),
]:
if os.path.isfile(os.path.join(pd, "proj.db")):
proj_data = pd.replace("/", "\\")
break
python_dir = os.path.join(qgis_root, "apps", "Python312").replace("/", "\\")
qt5_plugins = os.path.join(qgis_root, "apps", "Qt5", "plugins").replace("/", "\\")
qtplugins = os.path.join(qgis_app_dir, "qtplugins").replace("/", "\\")
gdal_data = os.path.join(qgis_root, "apps", "gdal", "share", "gdal").replace("/", "\\")
qgis_python_dir = os.path.join(qgis_app_dir, "python").replace("/", "\\")
qgis_bin = os.path.join(qgis_app_dir, "bin").replace("/", "\\")
qt5_bin = os.path.join(qgis_root, "apps", "Qt5", "bin").replace("/", "\\")
gdal_lib = os.path.join(qgis_root, "apps", "gdal", "lib").replace("/", "\\")
proj_line = f'set "PROJ_DATA={proj_data}"' if proj_data else "REM PROJ_DATA not found"
bat_content = f"""@echo off
chcp 65001 >nul
set "PYTHONPATH={qgis_python_dir}"
set "QGIS_PREFIX_PATH={qgis_app_dir}"
set "QT_PLUGIN_PATH={qtplugins};{qt5_plugins}"
set "GDAL_DATA={gdal_data}"
{proj_line}
set "PYTHONUTF8=1"
set "GDAL_FILENAME_IS_UTF8=YES"
set "VSI_CACHE=TRUE"
set "VSI_CACHE_SIZE=1000000"
set "PATH={qt5_bin};{qgis_bin};{gdal_lib};%PATH%"
"{python_path}" "{runner_script}" %*
"""
bat_dir = os.path.join(tempfile.gettempdir(), "qgis_runner")
os.makedirs(bat_dir, exist_ok=True)
bat_path = os.path.join(bat_dir, "run_qgis.bat")
with open(bat_path, "w", encoding="utf-8") as f:
f.write(bat_content)
logger.debug(f"生成 QGIS 包装脚本: {bat_path}")
return bat_path
def _detect_qgis_root() -> str | None:
"""
自动检测 QGIS 安装根目录。
优先级:
1. 环境变量 QGIS_ROOT
2. Windows 默认路径 D:/QGIS
3. Linux 常见路径
"""
env_root = os.environ.get("QGIS_ROOT")
if env_root and os.path.isdir(env_root):
return env_root
import platform
if platform.system() == "Windows":
for candidate in ["D:/QGIS", "C:/OSGeo4W", "C:/QGIS"]:
if os.path.isdir(candidate):
logger.info(f"检测到 QGIS 根目录: {candidate}")
return candidate
else:
for candidate in ["/usr", "/opt/QGIS", "/home/QGIS"]:
if os.path.isdir(candidate):
logger.info(f"检测到 QGIS 根目录: {candidate}")
return candidate
logger.warning("未检测到 QGIS 安装目录")
return None
+170
View File
@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
QGIS 专题图生成子进程入口。
由主进程 (Python 3.10) 通过 subprocess 调用,
运行在 QGIS 自带的 Python 3.12 环境中。
支持两种模式:
- 批量模式(推荐):单次启动 QgsApplication,顺序处理多个模板
输入: { "config": {...}, "models": [{...}, {...}, ...] }
- 单任务模式(兼容):
输入: { "config": {...}, "model": {...} }
输出 JSON (stdout):
批量: { "results": [{"name": "...", "output": "..."}, ...] }
单任务: { "name": "...", "output": "..." }
错误: stderr + exit code 1
"""
import json
import os
import sys
import time
# ============================================================
# 1. 环境初始化(必须在任何 QGIS/Qt import 之前)
# ============================================================
QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
def _setup_python_path():
"""将项目根目录和 QGIS Python 路径加入 sys.path"""
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
if project_root not in sys.path:
sys.path.insert(0, project_root)
qgis_python = os.path.join(QGIS_ROOT, "apps", "qgis-ltr", "python")
if os.path.isdir(qgis_python) and qgis_python not in sys.path:
sys.path.insert(0, qgis_python)
def _setup_environment():
"""设置 QGIS 运行所需的环境变量。
注意:QGIS Python 3.12 自带完整的 DLL 配置,不应设置
QGIS_PREFIX_PATH、QT_PLUGIN_PATH、PATH 等变量,
否则会与 QGIS 内置的 DLL 加载机制冲突,
导致 0xC0000005 (ACCESS_VIOLATION) 崩溃。
仅设置不影响 DLL 加载的辅助变量。
"""
# 仅设置 UTF-8 和 GDAL 相关的编码变量(不影响 DLL 加载)
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
# ============================================================
# 2. 主逻辑
# ============================================================
def _init_qgis():
"""初始化 QgsApplication(只做一次)"""
from qgis.core import QgsApplication
qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr")
QgsApplication.setPrefixPath(qgis_app_dir, True)
qgs_app = QgsApplication([], False)
qgs_app.initQgis()
return qgs_app
def _process_single(service, model):
"""处理单个模板,返回结果 dict"""
name = service.generate(model)
return {"name": name, "output": model["outFile"]}
def main():
t_start = time.time()
# 环境初始化
_setup_environment()
_setup_python_path()
# 读取请求 JSON
if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]):
with open(sys.argv[1], "r", encoding="utf-8") as f:
request = json.load(f)
else:
request = json.load(sys.stdin)
config = request["config"]
# 兼容批量和单任务模式
models = request.get("models") or [request["model"]]
# 初始化 QgsApplication(只做一次)
qgs_app = _init_qgis()
try:
from app.services.qgis.map_service import MapService
service = MapService(config)
results = []
for i, model in enumerate(models):
t_model = time.time()
try:
result = _process_single(service, model)
results.append(result)
elapsed = time.time() - t_model
print(
f"[qgis_runner] [{i+1}/{len(models)}] 完成: {result['name']}, "
f"耗时 {elapsed:.1f}s",
file=sys.stderr,
)
except Exception as e:
elapsed = time.time() - t_model
error_msg = f"{e}"
print(
f"[qgis_runner] [{i+1}/{len(models)}] 失败: {model.get('name', '?')}, "
f"耗时 {elapsed:.1f}s — {error_msg}",
file=sys.stderr,
)
results.append({"name": model.get("name", ""), "output": "", "error": error_msg})
# 输出结果
if len(models) == 1 and not request.get("models"):
# 单任务模式兼容
json.dump(results[0], sys.stdout, ensure_ascii=False)
else:
json.dump({"results": results}, sys.stdout, ensure_ascii=False)
sys.stdout.flush()
total = time.time() - t_start
ok = sum(1 for r in results if not r.get("error"))
fail = len(results) - ok
print(
f"\n[qgis_runner] 批量完成: {ok}成功/{fail}失败, 总耗时 {total:.1f}s",
file=sys.stderr,
)
except Exception as e:
elapsed = time.time() - t_start
print(f"[qgis_runner] 致命错误 ({elapsed:.1f}s): {e}", file=sys.stderr)
sys.exit(1)
finally:
# 跳过 qgs_app.exitQgis() — 已知 Qt DLL 在 exitQgis() 时会触发
# STATUS_ACCESS_VIOLATION (0xC0000005) 崩溃,进程通过 os._exit 终止,
# 操作系统会回收所有资源。
pass
if __name__ == "__main__":
try:
main()
except Exception:
pass
# 强制终止进程,避免 Qt/QGIS 后台线程在 ExitProcess 等待期间 DLL 崩溃
try:
import ctypes
ctypes.windll.kernel32.TerminateProcess(
ctypes.windll.kernel32.GetCurrentProcess(), 0
)
except Exception:
os._exit(0)
+120
View File
@@ -0,0 +1,120 @@
"""
模板缓存引擎。解决 QgsProject 单例问题。
流程:
1. 首次请求:project.read() 加载模板(慢,仅一次)
2. 加载后 project.write() 保存到临时文件
3. 后续同模板请求:从临时文件恢复(快,连接复用)
4. 手动恢复文本/过滤/缩放(毫秒级)
"""
import os
import time
import tempfile
from qgis.core import QgsProject, QgsRectangle
from app.config.paths import get_logger
logger = get_logger("qgis.cache")
class TemplateCache:
def __init__(self):
self._cache: dict[str, dict] = {}
def is_loaded(self, template_path: str) -> bool:
return template_path in self._cache
def load_template(self, template_path: str, layout_name: str = "A4") -> None:
"""首次加载模板"""
start = time.time()
project = QgsProject.instance()
logger.info(f"首次加载: {os.path.basename(template_path)}")
project.read(template_path)
logger.info(f"project.read() 耗时: {time.time() - start:.1f}s")
# 保存到临时文件
tmp_file = tempfile.NamedTemporaryFile(
suffix=".qgz", delete=False,
dir=tempfile.gettempdir(),
)
tmp_path = tmp_file.name
tmp_file.close()
t_save = time.time()
project.write(tmp_path)
logger.info(f"项目保存耗时: {time.time() - t_save:.1f}s")
# 记录初始状态
texts = {}
extent = None
layout = project.layoutManager().layoutByName(layout_name)
if layout:
for item_id in ["mapTitle", "mapTime", "mapUint", "info"]:
item = layout.itemById(item_id)
if item:
texts[item_id] = item.text()
map_item = layout.itemById("Map")
if map_item:
extent = QgsRectangle(map_item.extent())
self._cache[template_path] = {
"file": tmp_path,
"texts": texts,
"extent": extent,
"layout": layout_name,
}
logger.info(f"模板加载完成,总耗时: {time.time() - start:.1f}s")
def restore_template(self, template_path: str) -> tuple:
"""从缓存恢复模板"""
cached = self._cache.get(template_path)
if not cached:
raise RuntimeError(f"模板未缓存: {template_path}")
start = time.time()
project = QgsProject.instance()
logger.info(f"恢复模板: {os.path.basename(template_path)}")
project.read(cached["file"])
logger.info(f"project.read() 耗时: {time.time() - start:.1f}s")
return project, cached["texts"], cached["extent"]
def reset_project_state(self, project: QgsProject, texts: dict, extent) -> None:
"""重置项目到干净状态"""
start = time.time()
for layer in project.mapLayers().values():
if layer.subsetString():
layer.setSubsetString("")
# 获取 layout 名称(从缓存中)
layout_name = "A4"
for cached in self._cache.values():
layout_name = cached.get("layout", "A4")
break
layout = project.layoutManager().layoutByName(layout_name)
if layout:
for item_id, text in texts.items():
item = layout.itemById(item_id)
if item:
item.setText(text)
if extent:
map_item = layout.itemById("Map")
if map_item:
map_item.zoomToExtent(extent)
logger.info(f"状态重置耗时: {time.time() - start:.3f}s")
def cleanup(self) -> None:
"""清理所有临时文件"""
for cached in self._cache.values():
try:
os.remove(cached["file"])
except OSError:
pass
self._cache.clear()
logger.info("已清理所有缓存")
+223
View File
@@ -0,0 +1,223 @@
"""
模板 XML 修改模块。在 project.read() 之前:
1. 替换 PostgreSQL 连接参数(host/port/dbname/schema
2. 将静态底图的 datasource 替换为本地 GeoPackage 路径
"""
import os
import re
import zipfile
import tempfile
from app.config.paths import get_logger
from app.config.qgis_mappings import OGR_TO_POSTGRES, TABLE_RENAMES, SCHEMA_REPLACEMENTS
logger = get_logger("qgis.modifier")
class TemplateModifier:
def __init__(self, config: dict):
self.config = config
self._static_map = self._build_static_map()
def _build_static_map(self) -> dict[str, str]:
"""构建 table_key → gpkg_abs_path 的映射"""
sl = self.config.get("static_layers")
if not sl or not sl.get("enabled", False):
return {}
gpkg_dir = sl["gpkg_dir"]
mapping = {}
for info in sl["layers"].values():
schema, table = info["table"].split(".", 1)
xml_key = f'table="{schema}"."{table}"'
gpkg_path = os.path.join(gpkg_dir, info["file"]).replace("\\", "/")
mapping[xml_key] = gpkg_path
return mapping
@staticmethod
def _fix_provider_tags(content: str) -> str:
"""
将 GPKG 文件路径对应的 <provider ...>postgres</provider> 改为 ogr。
策略:按 <maplayer> 块处理,若块内 datasource 是文件路径(盘符开头),
则该块内的 provider 改为 ogr。避免跨层误改。
"""
maplayer_re = re.compile(r'(<maplayer[^>]*>.*?</maplayer>)', re.DOTALL)
provider_re = re.compile(r'(<provider[^>]*>)postgres(</provider>)')
file_ds_re = re.compile(r'<datasource>([A-Za-z]:/[^<]+)</datasource>')
def _fix_layer(m):
layer_xml = m.group(1)
if file_ds_re.search(layer_xml):
layer_xml = provider_re.sub(r'\1ogr\2', layer_xml)
return layer_xml
return maplayer_re.sub(_fix_layer, content)
def modify(self, template_path: str) -> str:
"""修改模板文件,返回修改后的临时 .qgz 路径"""
override = self.config.get("template_override")
has_override = override and override.get("enabled", False)
has_static = bool(self._static_map)
if not has_override and not has_static:
return template_path
orig = override["original"] if has_override else None
actual = override["actual"] if has_override else None
try:
tmp = tempfile.NamedTemporaryFile(
suffix=".qgz", delete=False,
dir=tempfile.gettempdir(),
)
tmp_path = tmp.name
tmp.close()
datasource_re = re.compile(r"(<datasource>)(.*?)(</datasource>)", re.DOTALL)
table_re = re.compile(r'table="(\w+)"\."(\w+)"')
with zipfile.ZipFile(template_path, "r") as zin, \
zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zout:
for item in zin.infolist():
data = zin.read(item.filename)
if item.filename.endswith(".qgs"):
content = data.decode("utf-8")
# ★ 先替换静态底图 datasource(在 schema 替换之前)
# 否则 table="base"."xxx" 会被改为 table="qgis"."xxx",导致匹配失败
if self._static_map:
def _replace(m):
tbl = table_re.search(m.group(2))
if tbl:
key = f'table="{tbl.group(1)}"."{tbl.group(2)}"'
if key in self._static_map:
return f"{m.group(1)}{self._static_map[key]}{m.group(3)}"
return m.group(0)
content = datasource_re.sub(_replace, content)
# ★ 转换避难场所 OGR 图层为 PostgreSQL(模板引用本地 JSON 文件)
_target_schema = (actual or {}).get("schema") or "qgis"
_db = self.config.get("db", {})
content = self._convert_ogr_to_postgres(content, _db, _target_schema)
# 替换所有 PostgreSQL 连接参数(统一指向目标库)
db = self.config.get("db", {})
# host: 模板中 host=localhost 无引号,匹配带/不带引号
if db.get("host"):
content = re.sub(
r"(host\s*=\s*)(?:['\"])?(?:localhost|127\.0\.0\.1)(?:['\"])?(?=\s|$)",
rf"\g<1>{db['host']}",
content,
)
# port: 模板中 port=5432 无引号,匹配带/不带引号
if db.get("port"):
content = re.sub(
r"(port\s*=\s*)(?:['\"])?5432(?:['\"])?(?=\s|$)",
rf"\g<1>{str(db['port'])}",
content,
)
if db.get("database"):
content = re.sub(
r"(dbname\s*=\s*['\"])([^'\"]+)(['\"])",
rf"\g<1>{db['database']}\3",
content,
)
# 移除 authcfg(QGIS 本地认证配置,子进程环境无效)
content = re.sub(r'\s*authcfg\s*=\s*\S+', '', content)
# 移除可能残留的 user=/password=(防止重复注入)
if db.get("username"):
content = re.sub(r"\s*user\s*=\s*'[^']*'", '', content)
if db.get("password"):
content = re.sub(r"\s*password\s*=\s*'[^']*'", '', content)
# 在 port 后注入显式 user + password
if db.get("username") and db.get("password"):
content = re.sub(
r'(port\s*=\s*\d+)',
rf"\1 user='{db['username']}' password='{db['password']}'",
content,
)
# schema 替换:统一指向目标 schema
target_schema = (actual or {}).get("schema") or "qgis"
for old_schema in SCHEMA_REPLACEMENTS:
content = content.replace(
f'table="{old_schema}".',
f'table="{target_schema}".',
)
# 表名映射(模板表名 ≠ 目标库表名)
for old_name, new_name in TABLE_RENAMES.items():
content = content.replace(
f'table="{target_schema}"."{old_name}"',
f'table="{target_schema}"."{new_name}"',
)
# tid 列已通过 add_tid_column.py 添加到所有 qgis 动态表中
# 模板中的 key='tid' 可正常工作,无需移除
# 修正 srid=0 → srid=4326QGIS 无法从 srid=0 自动检测 SRID
content = content.replace(" srid=0 ", " srid=4326 ")
# ★ 修复 provider 标签:GPKG 文件路径的图层必须用 ogr provider
# 否则 QGIS 会将 .gpkg 路径当作 PostgreSQL 连接字符串解析,导致图层无效
if self._static_map:
content = self._fix_provider_tags(content)
data = content.encode("utf-8")
zout.writestr(item, data)
logger.info(f"模板已修改并写入: {tmp_path}")
return tmp_path
except Exception as e:
logger.error(f"模板修改失败,将使用原始模板: {e}")
return template_path
@staticmethod
def _convert_ogr_to_postgres(content: str, db: dict, schema: str) -> str:
"""
将模板中引用本地文件的避难场所 OGR 图层转换为 PostgreSQL 图层。
设计师在本地用 JSON 文件制作了避难场所图层(公园/学校/文化馆等),
这些数据已迁移到目标库 qgis schema 中。
"""
_ogr_shelter_map = OGR_TO_POSTGRES
maplayer_re = re.compile(r'(<maplayer[^>]*>.*?</maplayer>)', re.DOTALL)
provider_re = re.compile(r'(<provider[^>]*>)ogr(</provider>)')
datasource_re = re.compile(r'(<datasource>).*?(</datasource>)', re.DOTALL)
file_ds_re = re.compile(r'<datasource>.*?\|layername=([^<]+)</datasource>', re.DOTALL)
def _convert_layer(m):
layer_xml = m.group(1)
ds_match = file_ds_re.search(layer_xml)
if not ds_match:
return layer_xml
layer_name = ds_match.group(1)
table = _ogr_shelter_map.get(layer_name)
if not table:
return layer_xml
# 构建 PostgreSQL URI
pg_uri = (
f"dbname='{db.get('database', 'xian_new')}' "
f"host={db.get('host', '47.92.216.173')} "
f"port={db.get('port', 7654)} "
f"user='{db.get('username', 'postgres')}' "
f"password='{db.get('password', '')}' "
f"key='tid' srid=4326 type=Point "
f"table=\"{schema}\".\"{table}\" (geometry)"
)
new_ds = f"<datasource>{pg_uri}</datasource>"
layer_xml = datasource_re.sub(new_ds, layer_xml)
layer_xml = provider_re.sub(r'\1postgres\2', layer_xml)
logger.info(f"避难场所图层已转换: {layer_name} → qgis.{table}")
return layer_xml
return maplayer_re.sub(_convert_layer, content)
+25 -15
View File
@@ -1,35 +1,45 @@
"""
日志工具类
使用 loguru 提供增强的日志功能,支持按天分割、自动清理过期日志
主进程使用 loguruQGIS 子进程(无 loguru)回退到标准 logging。
"""
import sys
from pathlib import Path
from loguru import logger
try:
from loguru import logger
_HAS_LOGURU = True
except ImportError:
_HAS_LOGURU = False
import logging
class LoggerManager:
"""日志管理器 - 基于 loguru"""
"""日志管理器"""
_configured = False
@classmethod
def get_logger(cls, name: str = "algorithm", log_dir: str = "logs"):
"""
获取日志记录器(loguru 不需要传统意义上的 logger 实例)
Args:
name: 日志名称(用于文件命名)
log_dir: 日志目录
Returns:
loguru.logger 实例
"""
if not _HAS_LOGURU:
return cls._get_stdlib_logger(name)
if not cls._configured:
cls._configure_logger(name, log_dir)
cls._configure_loguru(name, log_dir)
return logger
@classmethod
def _configure_logger(cls, name: str, log_dir: str):
def _get_stdlib_logger(cls, name: str):
lg = logging.getLogger(name)
if not lg.handlers:
lg.setLevel(logging.INFO)
h = logging.StreamHandler(sys.stderr)
h.setFormatter(logging.Formatter(
"%(asctime)s [%(threadName)s] %(levelname)-5s %(name)s - %(message)s"
))
lg.addHandler(h)
return lg
@classmethod
def _configure_loguru(cls, name: str, log_dir: str):
"""
配置 loguru 日志处理器
+123
View File
@@ -0,0 +1,123 @@
"""
地图缩放策略模块。支持 5 种缩放模式。
"""
from qgis.core import (
QgsPointXY, QgsGeometry, QgsRectangle,
QgsCoordinateTransform, QgsCoordinateReferenceSystem,
)
from app.config.paths import get_logger
logger = get_logger("qgis.zoom")
ZOOM_RULES = {
"11": "pan_to_center",
"12": "by_layer",
"13": "layer_intersect",
"14": "center_distance",
"15": "layer_merged",
}
class MapZoom:
def __init__(self, project, layout, map_item):
self.project = project
self.layout = layout
self.map_item = map_item
def execute(self, rule: str, data: dict) -> None:
"""执行缩放操作"""
method_name = ZOOM_RULES.get(rule, "pan_to_center")
method = getattr(self, method_name)
logger.info(f"缩放: {method_name}, 参数: {data}")
method(data)
def _make_transform(self, source_crs=None) -> QgsCoordinateTransform:
qct = QgsCoordinateTransform()
qct.setDestinationCrs(self.map_item.crs())
qct.setSourceCrs(
source_crs if source_crs
else QgsCoordinateReferenceSystem("EPSG:4326")
)
return qct
def pan_to_center(self, data: dict) -> None:
qct = self._make_transform()
point = QgsPointXY(float(data["X"]), float(data["Y"]))
geom = QgsGeometry.fromWkt(point.asWkt())
geom.transform(qct, QgsCoordinateTransform.ForwardTransform)
center = geom.asPoint()
qr = QgsRectangle.fromCenterAndSize(
center, self.map_item.extent().width(), self.map_item.extent().height()
)
self.map_item.zoomToExtent(qr)
def center_distance(self, data: dict) -> None:
qct = self._make_transform()
point = QgsPointXY(float(data["X"]), float(data["Y"]))
geom = QgsGeometry.fromWkt(point.asWkt())
geom.transform(qct, QgsCoordinateTransform.ForwardTransform)
box = geom.buffer(float(data["value"]) / 1000, 100).boundingBox()
self.map_item.zoomToExtent(box.buffered(box.width() * 0.1))
def by_layer(self, data: dict) -> None:
layers = self.project.mapLayersByName(data["value"])
if not layers:
logger.warning(f"图层不存在: {data['value']}")
return
layer = layers[0]
if layer.featureCount() == 0:
self.pan_to_center(data)
return
qct = self._make_transform(layer.crs())
geom = QgsGeometry.fromWkt(layer.extent().asWktPolygon())
geom.transform(qct, QgsCoordinateTransform.ForwardTransform)
box = geom.boundingBox()
self.map_item.zoomToExtent(box.buffered(box.width() * 0.1))
def layer_merged(self, data: dict) -> None:
names = data["value"].split(",")
combined = None
for name in names:
layers = self.project.mapLayersByName(name.strip())
if not layers:
continue
layer = layers[0]
qct = self._make_transform(layer.crs())
geom = QgsGeometry.fromWkt(layer.extent().asWktPolygon())
geom.transform(qct, QgsCoordinateTransform.ForwardTransform)
box = geom.boundingBox()
if combined is None:
combined = box
else:
combined.combineExtentWith(box)
if combined:
self.map_item.zoomToExtent(combined.buffered(combined.width() * 0.1))
def layer_intersect(self, data: dict) -> None:
names = data["value"].split(",")
intersection = None
for name in names:
layers = self.project.mapLayersByName(name.strip())
if not layers:
continue
layer = layers[0]
qct = self._make_transform(layer.crs())
layer.selectAll()
geom = None
for fid in layer.selectedFeatureIds():
if geom is None:
geom = layer.getGeometry(fid)
else:
geom = geom.combine(layer.getGeometry(fid))
if geom:
qgm = QgsGeometry.fromWkt(geom.asWkt())
qgm.transform(qct, QgsCoordinateTransform.ForwardTransform)
box = qgm.boundingBox()
if intersection is None:
intersection = box
else:
intersection = intersection.intersection(box)
if intersection:
bbox = intersection.boundingBox()
self.map_item.zoomToExtent(bbox.buffered(bbox.width() * 0.1))
+3 -1
View File
@@ -8,4 +8,6 @@ Pillow == 12.2.0
pyyaml == 6.0.3
fastapi == 0.136.3
uvicorn[standard] == 0.48.0
loguru == 0.7.3
loguru == 0.7.3
geopandas == 0.14.4
sqlalchemy == 2.0.51
+100 -14
View File
@@ -1,57 +1,143 @@
# 公共配置
# ============================================================
# 公共配置(所有环境共享)
# ============================================================
[default]
APP_NAME = "西安项目算法服务"
LOG_DIR = "logs"
# 雨量站栅格存储位置,:id会被替换成数据id
RAIN_STATION_GRID_DIR = "/xian/rainfall/grid/images/:id"
# 雨量站栅格存储redis的key
REDIS_RAIN_STATION_GRID_KEY = "xian:rainfall:rain_station_grid"
# 雨量站存储标识符的redis的key
REDIS_RAIN_STATION_IDENTIFIER_KEY = "xian:rainfall:rain_station_identifier"
# 预测结果概率阈值(低于此值不返回给前端)
PREDICT_PROBABILITY_THRESHOLD = 50
# 静态底图 GeoPackage 目录(相对于项目根目录)
QGIS_GPKG_DIR = "app/data/gpkg"
# 专题图输出子目录(相对于 FILE_STORE_DIR
QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:disasterTime"
# 专题图默认参数
QGIS_DEFAULTS_MAP_LAYOUT = "A3"
QGIS_DEFAULTS_ZOOM_RULE = "11"
QGIS_DEFAULTS_ZOOM_VALUE = "50"
QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# 专题图DPI
QGIS_EXPORT_DPI = 300
# 批量产图线程池
QGIS_WORKER_THREADS = 4
# 西安市中心经纬度
XIAN_CENTER = [108.948024, 34.263161]
# ============================================================
# 行政区划代码映射
# ============================================================
[default.area]
"610102" = "新城区"
"610103" = "碑林区"
"610104" = "莲湖区"
"610111" = "灞桥区"
"610112" = "未央区"
"610113" = "雁塔区"
"610114" = "阎良区"
"610115" = "临潼区"
"610116" = "长安区"
"610117" = "高陵区"
"610118" = "鄠邑区"
"610122" = "蓝田县"
"610124" = "周至县"
# ============================================================
# 开发环境
# ============================================================
[development]
DEBUG = true
# 数据库
# ============================================================
# 数据库配置
# ============================================================
DB_HOST = "47.92.216.173"
DB_PORT = 7654
DB_USER = "postgres"
DB_PASSWORD = "zhangsan"
DB_NAME = "xian_new"
# FastAPI配置
# ============================================================
# FastAPI 配置
# ============================================================
API_HOST = "127.0.0.1"
API_PORT = 8082
# ============================================================
# 日志配置
# ============================================================
LOG_LEVEL = "DEBUG"
# redis
# ============================================================
# Redis 配置
# ============================================================
REDIS_HOST = "47.92.216.173"
REDIS_PORT = 7655
REDIS_PASSWORD = "zhangsan"
REDIS_DB = 0
# 文件存储
# ============================================================
# 文件路径配置
# ============================================================
FILE_STORE_DIR = "G:/files"
# ============================================================
# QGIS 配置
# ============================================================
QGIS_ROOT = "D:/QGIS"
# 专题图输出子目录
QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:disasterTime"
QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# 模板数据库覆盖:将模板中硬编码的连接替换为实际环境连接
QGIS_TEMPLATE_OVERRIDE_ENABLED = true
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST = "localhost"
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_PORT = 5432
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_DB_NAME = "yjzyk_xian"
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_SCHEMA = "base"
QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST = "47.92.216.173"
QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT = 7654
QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME = "xian_new"
QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA = "qgis"
# ============================================================
# 生产环境
# ============================================================
[production]
DEBUG = false
# ============================================================
# 数据库配置
# ============================================================
DB_HOST = "10.22.245.138"
DB_PORT = 54321
DB_USER = "zaihailian"
DB_PASSWORD = "XAYJ@gis2603"
DB_NAME = "xianDC"
# FastAPI配置
# ============================================================
# FastAPI 配置
# ============================================================
API_HOST = "127.0.0.1"
API_PORT = 8081
# ============================================================
# 日志配置
# ============================================================
LOG_LEVEL = "WARNING"
# redis
# ============================================================
# Redis 配置
# ============================================================
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD = "XAYJ@gis2603"
REDIS_DB = 0
# 文件存储
FILE_STORE_DIR = "D:/files"
# ============================================================
# 文件路径配置
# ============================================================
FILE_STORE_DIR = "/data"
# ============================================================
# QGIS 配置
# ============================================================
QGIS_ROOT = "/home/QGIS"
# 模板数据库覆盖:将模板中硬编码的连接替换为实际环境连接
QGIS_TEMPLATE_OVERRIDE_ENABLED = true
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST = "localhost"
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_PORT = 5432
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_DB_NAME = "yjzyk_xian"
QGIS_TEMPLATE_OVERRIDE_ORIGINAL_SCHEMA = "base"
QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST = "10.22.245.138"
QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT = 54321
QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME = "xian_new"
QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA = "qgis"
+42
View File
@@ -0,0 +1,42 @@
"""
在所有 qgis 动态表中添加 tid 自增主键列
QGIS 的 datasource key='tid' 需要此列
"""
import psycopg2
TABLES = [
'hazard_hydrops',
'lifeline_outfall',
'lifeline_pipe',
'risk_census_population',
'sx_xa_towns',
]
c = psycopg2.connect(
host='47.92.216.173', port=7654,
user='postgres', password='zhangsan',
database='xian_new'
)
c.autocommit = True
cur = c.cursor()
for t in TABLES:
try:
# 检查 tid 列是否存在
cur.execute(f"""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema='qgis' AND table_name='{t}' AND column_name='tid'
)
""")
exists = cur.fetchone()[0]
if exists:
print(f"qgis.{t}: tid 列已存在, 跳过")
else:
cur.execute(f'ALTER TABLE qgis."{t}" ADD COLUMN tid SERIAL')
print(f"qgis.{t}: tid 列添加成功")
except Exception as e:
print(f"qgis.{t}: 失败 - {e}")
c.close()
+72
View File
@@ -0,0 +1,72 @@
"""
批量检查所有 rainfall 模板的图层,看哪些动态图层需要显示、表是否存在
"""
import zipfile, re, os, psycopg2
TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
# 收集所有非GPKG动态图层的表名
all_dynamic = set()
for fname in sorted(os.listdir(TEMPLATE_DIR)):
if not fname.endswith('.qgz') or fname.startswith('tmp'):
continue
path = os.path.join(TEMPLATE_DIR, fname)
z = zipfile.ZipFile(path)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
for m in maplayer_re.finditer(content):
block = m.group(1)
name_m = re.search(r'<layername>([^<]+)</layername>', block)
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
provider = provider_m.group(1) if provider_m else '?'
if provider != 'postgres':
continue
ds = ds_m.group(1).strip() if ds_m else ''
table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
layer_name = name_m.group(1) if name_m else '?'
if table_m:
schema = table_m.group(1)
table = table_m.group(2)
key = f"{schema}.{table}"
all_dynamic.add((layer_name, key))
# 已知的GPKG静态层(会被替换为ogr)
static_tables = {
'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
}
# DB检查
c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
c.autocommit = True
cur = c.cursor()
print(f"{'图层名':20s} {'原表':35s} {'qgis中存在':12s} {'行数':>8s}")
print("-" * 80)
for layer_name, table_key in sorted(all_dynamic):
if table_key in static_tables:
continue # 会被GPKG替换,跳过
schema, table = table_key.split('.', 1)
try:
cur.execute(f'SELECT count(*) FROM qgis.{table}')
count = cur.fetchone()[0]
exists = "YES"
except:
exists = "NO"
count = 0
marker = " !!!" if exists == "NO" else ""
print(f"{layer_name:20s} {table_key:35s} {exists:12s} {count:>8,d}{marker}")
c.close()
+54
View File
@@ -0,0 +1,54 @@
"""检查模板中所有图层的渲染顺序(z-order)"""
import zipfile, re, os
TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
fname = "暴雨内涝潜在隐患点及人口分布图.qgz"
path = os.path.join(TEMPLATE_DIR, fname)
z = zipfile.ZipFile(path)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
# 提取所有 maplayer 块(保持顺序 — 这就是渲染顺序)
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
print(f"模板: {fname}")
print(f"{'#':>3s} {'图层名':20s} {'类型':10s} {'可见':4s} {'表名'}")
print("-" * 80)
for i, m in enumerate(maplayer_re.finditer(content)):
block = m.group(1)
# 图层名
name_m = re.search(r'<layername>([^<]+)</layername>', block)
name = name_m.group(1) if name_m else '?'
# provider
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
provider = provider_m.group(1) if provider_m else '?'
# datasource table
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
ds = ds_m.group(1).strip() if ds_m else ''
table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
table = f"{table_m.group(1)}.{table_m.group(2)}" if table_m else '?'
# 可见性
visible = 'Y' if 'visible="1"' in m.group(0) or 'visible="1"' in block else 'N'
# 判断图层类型
is_static = table in [
'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
]
layer_type = "底图" if is_static else "动态"
marker = " <<<" if not is_static and provider == 'postgres' else ""
print(f"{i+1:>3d} {name:20s} {provider:10s} {visible:4s} {table:35s} [{layer_type}]{marker}")
print()
print("提示: 图层按从上到下的顺序渲染(序号小的在底层,序号大的在顶层)")
print("动态图层如果在底图下方,会被底图完全遮盖")
+65
View File
@@ -0,0 +1,65 @@
"""检查布局 Map 项的图层配置"""
import zipfile, re, os
TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
fname = "暴雨内涝潜在隐患点及人口分布图.qgz"
path = os.path.join(TEMPLATE_DIR, fname)
z = zipfile.ZipFile(path)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
# 提取 Layout 元素(找 Map 项)
# QGIS 有两种布局格式:<Composer> (QGIS 2.x style) 和 <Layout> (QGIS 3.x style)
layout_re = re.compile(r'<(?:Composer|Layout)[^>]*name="([^"]*)"[^>]*>(.*?)</(?:Composer|Layout)>', re.DOTALL)
for m in layout_re.finditer(content):
layout_name = m.group(1)
layout_content = m.group(2)
print(f"=== 布局: {layout_name} ===")
# 检查 Map 项
map_item_re = re.compile(r'<ComposerMap[^>]*>(.*?)</ComposerMap>', re.DOTALL)
for mm in map_item_re.finditer(layout_content):
map_content = mm.group(1)
# 检查是否有 lockedLayers
locked = re.findall(r'<lockedLayers>(.*?)</lockedLayers>', map_content, re.DOTALL)
if locked:
print(f" lockedLayers: {locked[0][:500]}")
# 检查 keepLayerSet
keep_set = re.findall(r'keepLayerSet="([^"]*)"', mm.group(0))
if keep_set:
print(f" keepLayerSet: {keep_set[0]}")
# 检查 followPreset
preset = re.findall(r'followPreset="([^"]*)"', mm.group(0))
if preset:
print(f" followPreset: {preset[0]}")
# 检查 followPresetName
preset_name = re.findall(r'followPresetName="([^"]*)"', mm.group(0))
if preset_name:
print(f" followPresetName: {preset_name[0]}")
# 检查 <layerSet>
layer_set = re.findall(r'<layerSet>(.*?)</layerSet>', map_content, re.DOTALL)
if layer_set:
print(f" layerSet: {layer_set[0][:500]}")
# 检查 <ComposerMapGrid>
grids = re.findall(r'<ComposerMapGrid[^>]*/>', map_content)
print(f" grids: {len(grids)}")
# 也检查 <Layout> 格式 (QGIS 3.x)
map_item2_re = re.compile(r'<LayoutItem[^>]*type="[^"]*map[^"]*"[^>]*>(.*?)</LayoutItem>', re.DOTALL | re.IGNORECASE)
for mm in map_item2_re.finditer(layout_content):
map_content = mm.group(1)
print(f" [LayoutItem Map] attributes: {mm.group(0)[:300]}")
# 打印模板中所有的 map theme / visibility preset
presets = re.findall(r'<(?:visibility-presets|map-theme-collection).*?</(?:visibility-presets|map-theme-collection)>', content, re.DOTALL)
if presets:
print("\n=== 可见性预设 ===")
print(presets[0][:500])
+71
View File
@@ -0,0 +1,71 @@
"""检查暴雨避难场所分布图的所有动态图层是否有数据"""
import zipfile, re, os, psycopg2
TEMPLATE_PATH = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall\暴雨避难场所分布图.qgz"
z = zipfile.ZipFile(TEMPLATE_PATH)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
# 收集所有 PostgreSQL 动态图层
layers = []
for m in maplayer_re.finditer(content):
block = m.group(1)
name_m = re.search(r'<layername>([^<]+)</layername>', block)
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
provider = provider_m.group(1) if provider_m else '?'
if provider != 'postgres':
continue
name = name_m.group(1) if name_m else '?'
ds = ds_m.group(1).strip() if ds_m else ''
table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
table_key = f"{table_m.group(1)}.{table_m.group(2)}" if table_m else '?'
layers.append((name, table_key))
# 已知 GPKG 静态层
static_tables = {
'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
}
c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
c.autocommit = True
cur = c.cursor()
print(f"模板: 暴雨避难场所分布图")
print(f"{'图层名':20s} {'原表':40s} {'qgis有数据':10s} {'行数':>6s}")
print("-" * 85)
for name, table_key in layers:
if table_key in static_tables:
continue
schema, table = table_key.split('.', 1)
mapped_table = 'hazard_hydrops' if table == 'hazard_waterlogging' else table
try:
cur.execute(f'SELECT count(*) FROM qgis.{mapped_table}')
count = cur.fetchone()[0]
exists = "YES" if count > 0 else "EMPTY"
except:
try:
cur.execute(f'SELECT count(*) FROM {schema}.{table}')
count = cur.fetchone()[0]
exists = f"{schema}"
except:
exists = "NO"
count = 0
marker = " <-- 无数据!" if count == 0 else ""
print(f"{name:20s} {table_key:40s} {exists:10s} {count:>6,d}{marker}")
c.close()
+38
View File
@@ -0,0 +1,38 @@
import psycopg2
c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
cur = c.cursor()
tables = ['lifeline_outfall', 'lifeline_pipe', 'risk_census_population', 'hazard_waterlogging', 'sx_street', 'sx_xa_county']
print("=== qgis schema ===")
for t in tables:
try:
cur.execute(f'SELECT COUNT(*) FROM qgis."{t}"')
count = cur.fetchone()[0]
cur.execute(f"SELECT GeometryType(geom) FROM qgis.\"{t}\" LIMIT 1")
g = cur.fetchone()
print(f" qgis.{t}: {count} 行, geom={g[0] if g else 'N/A'}")
except Exception as e:
print(f" qgis.{t}: 不存在")
print("\n=== base schema (检查积水点原始表) ===")
for t in ['hazard_waterlogging']:
try:
cur.execute(f'SELECT COUNT(*) FROM base."{t}"')
print(f" base.{t}: {cur.fetchone()[0]}")
except:
print(f" base.{t}: 不存在")
# 查找所有含 water 或 hazard 的表
print("\n=== 搜索含 water/hazard 的表 ===")
cur.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE (table_name LIKE '%water%' OR table_name LIKE '%hazard%')
AND table_schema IN ('qgis', 'base', 'public')
ORDER BY table_schema, table_name
""")
for r in cur.fetchall():
print(f" {r[0]}.{r[1]}")
c.close()
+42
View File
@@ -0,0 +1,42 @@
"""提取模板 .qgs 中所有图层名称、类型、数据源"""
import zipfile, re, os, sys
template_dir = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
for fname in sorted(os.listdir(template_dir)):
if not fname.endswith('.qgz') or fname.startswith('tmp'):
continue
path = os.path.join(template_dir, fname)
z = zipfile.ZipFile(path)
# 找到 .qgs 文件
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
# 提取 maplayer 块
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
layers = []
for m in maplayer_re.finditer(content):
block = m.group(1)
# 图层名
name_m = re.search(r'<layername>([^<]+)</layername>', block)
# provider
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
# datasource (截取前120字符)
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
name = name_m.group(1) if name_m else '?'
provider = provider_m.group(1) if provider_m else '?'
ds = ds_m.group(1).strip()[:120] if ds_m else '?'
layers.append((name, provider, ds))
print(f"\n{'='*60}")
print(f"模板: {fname}")
print(f"{'='*60}")
for name, provider, ds in layers:
print(f" [{provider:10s}] {name}")
if ds:
print(f" ds: {ds}")
# 只分析第一个模板(所有 rainfall 模板结构相同)
if fname == '暴雨内涝潜在隐患点及人口分布图.qgz':
break
+16
View File
@@ -0,0 +1,16 @@
import zipfile, re
z = zipfile.ZipFile(r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall\暴雨避难场所分布图.qgz")
c = z.read([n for n in z.namelist() if n.endswith('.qgs')][0]).decode()
layers = re.findall(r'<maplayer[^>]*>(.*?)</maplayer>', c, re.DOTALL)
for i, l in enumerate(layers):
nm = re.search(r'<layername>([^<]+)</layername>', l)
pv = re.search(r'<provider[^>]*>(\w+)</provider>', l)
ds = re.search(r'<datasource>(.*?)</datasource>', l, re.DOTALL)
name = nm.group(1) if nm else '?'
prov = pv.group(1) if pv else '?'
ds_text = ds.group(1).strip()[:120] if ds else ''
print(f"{i:2d}. [{prov:10s}] {name}")
if ds_text:
print(f" ds: {ds_text}")
+295
View File
@@ -0,0 +1,295 @@
"""
单张专题图测试脚本 - 独立版
直接构建 model/config 并通过 subprocess 调用 qgis_runner.py
"""
import json, os, sys, tempfile, subprocess, re
from datetime import datetime
# ============================================================
# 配置(和 settings.toml 一致)
# ============================================================
DB_CONFIG = {
"host": "47.92.216.173",
"port": 7654,
"username": "postgres",
"password": "zhangsan",
"database": "xian_new",
}
QGIS_ROOT = "D:/QGIS"
XIAN_CENTER = [108.948024, 34.263161]
GPKG_DIR = r"F:\project\xian\xian_algorithm_new\app\data\gpkg".replace("\\", "/")
TEMPLATE_BASE = r"F:\project\xian\xian_algorithm_new\app\data\template"
# ============================================================
# 模拟推理结果(inference_id=50
# ============================================================
# 从实际数据库查询得知:
# event_type='rainfall', condition={'region_code':'610116','rainfall':'120'},
# occurred_time='2025-09-16 20:00:00'
inference = {
"id": 50,
"event_type": "rainfall",
"condition": {"region_code": "610116", "rainfall": "120"},
"occurred_time": datetime(2025, 9, 16, 20, 0, 0),
}
# ============================================================
# 区域映射(从 settings.toml
# ============================================================
AREA_MAP = {
"610102": "新城区", "610103": "碑林区", "610104": "莲湖区",
"610111": "灞桥区", "610112": "未央区", "610113": "雁塔区",
"610114": "阎良区", "610115": "临潼区", "610116": "长安区",
"610117": "高陵区", "610118": "鄠邑区", "610122": "蓝田县",
"610124": "周至县",
}
STATIC_LAYERS = {
"水库": ("base.rivers", "rivers.gpkg"),
"市州驻地": ("base.sx_capital", "sx_capital.gpkg"),
"河流": ("base.river", "river.gpkg"),
"active_fault": ("base.active_fault", "active_fault.gpkg"),
"陕西省": ("base.sx", "sx.gpkg"),
"乡镇驻地": ("base.sx_street", "sx_street.gpkg"),
"区县驻地": ("base.sx_xa_county", "sx_xa_county.gpkg"),
"县界": ("base.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
"周边区县": ("base.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
"周边市州": ("base.sx_zb_city", "sx_zb_city.gpkg"),
"周边县区": ("base.sx_zb_county", "sx_zb_county.gpkg"),
"traffic_expressway": ("base.traffic_expressway", "traffic_expressway.gpkg"),
"traffic_provincial": ("base.traffic_provincial", "traffic_provincial.gpkg"),
"traffic_railway": ("base.traffic_railway", "traffic_railway.gpkg"),
"traffic_township": ("base.traffic_township", "traffic_township.gpkg"),
"traffic_trunk_line": ("base.traffic_trunk_line", "traffic_trunk_line.gpkg"),
}
# ============================================================
# 辅助函数(从 qgis_map_export.py 直接复制)
# ============================================================
def format_disaster_time(occurred_time):
if isinstance(occurred_time, datetime):
return occurred_time.strftime("%Y%m%d%H%M%S")
return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
def resolve_district(condition):
code = condition.get("region_code")
if code and str(code) in AREA_MAP:
return AREA_MAP[str(code)]
return ""
def build_map_title(event_type, condition, template_name):
district = resolve_district(condition)
prefix = f"陕西西安{district}" if district else "陕西西安"
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
return f"{prefix}{float(rainfall)}mm{template_name}"
return f"{prefix}{template_name}"
return f"{prefix}{template_name}"
def build_info_text(event_type, condition, occurred_time):
"""构建信息面板文本(不显示灾害类型标签)"""
lines = []
if isinstance(occurred_time, datetime):
time_str = f"{occurred_time.year}{occurred_time.month:02d}{occurred_time.day:02d}{occurred_time.hour:02d}{occurred_time.minute:02d}"
elif occurred_time:
time_str = str(occurred_time)
else:
time_str = ""
lines.append(f"时间:{time_str}")
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
lines.append(f"累计降雨量:{float(rainfall)}mm")
duration = condition.get("duration")
if duration is not None and duration != "":
lines.append(f"已持续:{duration}")
elif event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None and magnitude != "":
lines.append(f"震级:{float(magnitude)}")
lon = condition.get("epicenter_lon")
lat = condition.get("epicenter_lat")
if lon is not None and lat is not None:
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
return "\n".join(lines)
def derive_model_params(inference, batch_folder, template_path):
event_type = inference["event_type"]
condition = inference["condition"]
occurred_time = inference["occurred_time"]
template_name = os.path.splitext(os.path.basename(template_path))[0]
map_title = build_map_title(event_type, condition, template_name)
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
if isinstance(occurred_time, datetime):
map_time = occurred_time.strftime("%Y-%m-%d %H:%M")
else:
map_time = str(occurred_time)
info_text = build_info_text(event_type, condition, occurred_time)
center_x, center_y = XIAN_CENTER
if event_type == "earthquake":
lon = condition.get("epicenter_lon", XIAN_CENTER[0])
lat = condition.get("epicenter_lat", XIAN_CENTER[1])
center_x, center_y = float(lon), float(lat)
return {
"name": f"test_{inference['id']}_{safe_name}",
"path": template_path,
"outFile": out_file,
"mapLayout": "A3",
"mapTitle": map_title,
"mapTime": map_time,
"mapUnit": "西安市应急管理局",
"info": info_text,
"centerX": center_x,
"centerY": center_y,
"event": str(inference["id"]),
"queueId": str(inference["id"]),
"zoomRule": "11",
"zoomValue": "50",
}
def build_qgis_config(batch_folder):
static_layers_config = {}
for name, (table, gpkg_file) in STATIC_LAYERS.items():
static_layers_config[name] = {"file": gpkg_file, "table": table}
return {
"db": DB_CONFIG,
"qgis": {"exportDpi": 300},
"template_override": {
"enabled": True,
"original": {
"host": "localhost",
"port": 5432,
"dbname": "yjzyk_xian",
"schema": "base",
},
"actual": {
"host": DB_CONFIG["host"],
"port": DB_CONFIG["port"],
"dbname": DB_CONFIG["database"],
"schema": "qgis",
"username": DB_CONFIG["username"],
"password": DB_CONFIG["password"],
},
},
"static_layers": {
"enabled": True,
"gpkg_dir": GPKG_DIR,
"layers": static_layers_config,
},
"batch_folder": batch_folder,
}
# ============================================================
# 主逻辑
# ============================================================
event_type = inference["event_type"]
disaster_time = format_disaster_time(inference["occurred_time"])
batch_folder = os.path.join("G:/files", "xian/qgis/map", disaster_time).replace("\\", "/")
os.makedirs(batch_folder, exist_ok=True)
template_dir = os.path.join(TEMPLATE_BASE, event_type)
template_path = os.path.join(template_dir, "暴雨内涝潜在隐患点及人口分布图.qgz").replace("\\", "/")
model = derive_model_params(inference, batch_folder, template_path)
config = build_qgis_config(batch_folder)
print("=" * 60)
print(f"模板: {template_path}")
print(f"输出: {model['outFile']}")
print(f"标题: {model['mapTitle']}")
print(f"时间: {model['mapTime']}")
print(f"单位: {model['mapUnit']}")
print(f"info:\n{model['info']}")
print(f"中心: ({model['centerX']}, {model['centerY']})")
print(f"event: {model['event']}, queueId: {model['queueId']}")
print("=" * 60)
request_data = json.dumps(
{"config": config, "model": model},
ensure_ascii=False,
)
tmp_json = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8"
)
tmp_json.write(request_data)
tmp_json.close()
runner_script = os.path.join(
os.path.dirname(__file__),
r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
).replace("\\", "/")
# Fix: use absolute path
runner_script = r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
bat_dir = os.path.join(tempfile.gettempdir(), "qgis_runner")
os.makedirs(bat_dir, exist_ok=True)
bat_path = os.path.join(bat_dir, "run_qgis_test.bat")
qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr").replace("/", "\\")
python_dir = os.path.join(QGIS_ROOT, "apps", "Python312").replace("/", "\\")
qt5_plugins = os.path.join(QGIS_ROOT, "apps", "Qt5", "plugins").replace("/", "\\")
qtplugins = os.path.join(qgis_app_dir, "qtplugins").replace("/", "\\")
gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal").replace("/", "\\")
qgis_python_dir = os.path.join(qgis_app_dir, "python").replace("/", "\\")
qgis_bin = os.path.join(qgis_app_dir, "bin").replace("/", "\\")
qt5_bin = os.path.join(QGIS_ROOT, "apps", "Qt5", "bin").replace("/", "\\")
gdal_lib = os.path.join(QGIS_ROOT, "apps", "gdal", "lib").replace("/", "\\")
python_exe = os.path.join(python_dir, "python3.exe").replace("/", "\\")
bat_content = f"""@echo off
set "PYTHONHOME={python_dir}"
set "PYTHONPATH={qgis_python_dir}"
set "QGIS_PREFIX_PATH={qgis_app_dir}"
set "QT_PLUGIN_PATH={qtplugins};{qt5_plugins}"
set "GDAL_DATA={gdal_data}"
set "PYTHONUTF8=1"
set "GDAL_FILENAME_IS_UTF8=YES"
set "VSI_CACHE=TRUE"
set "VSI_CACHE_SIZE=1000000"
set "PATH={qgis_bin};{qt5_bin};{gdal_lib};%PATH%"
"{python_exe}" "{runner_script}" "{tmp_json.name}"
"""
with open(bat_path, "w", encoding="utf-8") as f:
f.write(bat_content)
cmd = ["cmd.exe", "/c", bat_path]
print(f"执行: {' '.join(cmd[:3])} ...")
print()
try:
result = subprocess.run(cmd, capture_output=True, timeout=300)
stderr_text = result.stderr.decode("utf-8", errors="replace")
stdout_text = result.stdout.decode("utf-8", errors="replace")
print("=== stderr ===")
for line in stderr_text.split("\n"):
print(f" {line}")
print()
print("=== stdout ===")
print(stdout_text[:500] if stdout_text else "(empty)")
if result.returncode != 0:
print(f"\n!!! FAIL: exit={result.returncode}")
else:
print(f"\n=== SUCCESS ===")
if os.path.isfile(model["outFile"]):
print(f"文件: {model['outFile']} ({os.path.getsize(model['outFile'])/1024:.1f} KB)")
else:
print(f"文件不存在: {model['outFile']}")
finally:
try:
os.remove(tmp_json.name)
except OSError:
pass