Files
xian_algorithm_new/app/api/qgis_map_export.py
T

420 lines
16 KiB
Python
Raw Normal View History

2026-06-19 17:04:03 +08:00
"""
QGIS 专题图导出接口
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
2026-06-24 11:22:20 +08:00
- Worker 进程池串行产图(QGIS 只初始化一次,模板顺序处理)
2026-06-19 17:04:03 +08:00
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
2026-06-24 11:22:20 +08:00
- 异步模式:HTTP 请求立即返回,产图在后台线程执行
2026-06-19 17:04:03 +08:00
"""
import os
import re
2026-06-21 22:30:04 +08:00
import subprocess
2026-06-24 11:22:20 +08:00
import threading
2026-06-19 17:04:03 +08:00
from datetime import datetime
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from app.config.paths import get_logger
2026-06-20 15:50:24 +08:00
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
2026-06-19 17:04:03 +08:00
from app.repositories import qgis_repository
from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest
from app.utils.db_helper import db_helper
from config import settings
logger = get_logger("api.qgis")
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
2026-06-24 11:22:20 +08:00
# 后台任务跟踪(线程安全)
_background_tasks: set[int] = set()
_tasks_lock = threading.Lock()
2026-06-19 17:04:03 +08:00
# 西安市中心
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
# ============================================================
# region_code → 区县名称映射
# ============================================================
REGION_CODE_MAP = settings.area.to_dict()
def _resolve_district(condition: dict) -> str:
code = condition.get("region_code")
if code and str(code) in REGION_CODE_MAP:
return REGION_CODE_MAP[str(code)]
return ""
def _build_map_title(event_type: str, condition: dict, template_name: str) -> str:
district = _resolve_district(condition)
prefix = f"陕西西安{district}" if district else "陕西西安"
if event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None:
return f"{prefix}{float(magnitude)}{template_name}"
return f"{prefix}{template_name}"
elif event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
return f"{prefix}{float(rainfall)}mm{template_name}"
return f"{prefix}{template_name}"
return f"{prefix}{template_name}"
2026-06-20 15:50:24 +08:00
def _build_info_text(event_type: str, condition: dict, occurred_time) -> str:
lines = []
if isinstance(occurred_time, datetime):
time_str = f"{occurred_time.year}{occurred_time.month:02d}{occurred_time.day:02d}{occurred_time.hour:02d}{occurred_time.minute:02d}"
elif occurred_time:
time_str = str(occurred_time)
else:
time_str = datetime.now().strftime("%Y年%m月%d%H时%M分")
lines.append(f"时间:{time_str}")
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
lines.append(f"累计降雨量:{float(rainfall)}mm")
duration = condition.get("duration")
if duration is not None and duration != "":
lines.append(f"已持续:{duration}")
elif event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None and magnitude != "":
lines.append(f"震级:{float(magnitude)}")
lon = condition.get("epicenter_lon")
lat = condition.get("epicenter_lat")
if lon is not None and lat is not None:
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
return "\n".join(lines)
2026-06-19 17:04:03 +08:00
def _derive_model_params(
2026-06-20 15:50:24 +08:00
inference: dict, batch_folder: str, template_path: str
2026-06-19 17:04:03 +08:00
) -> dict:
2026-06-20 15:50:24 +08:00
"""从推理结果 + 模板路径推导专题图生成所需的全部参数。"""
2026-06-19 17:04:03 +08:00
event_type = inference["event_type"]
condition = inference["condition"]
occurred_time = inference["occurred_time"]
2026-06-20 15:50:24 +08:00
map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
2026-06-19 17:04:03 +08:00
map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3")
zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11")
zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50")
2026-06-20 15:50:24 +08:00
# 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀)
template_name = os.path.splitext(os.path.basename(template_path))[0]
2026-06-19 17:04:03 +08:00
map_title = _build_map_title(event_type, condition, template_name)
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
2026-06-20 15:50:24 +08:00
# 制图时间统一用当前日期
map_time = datetime.now().strftime("%Y-%m-%d")
2026-06-19 17:04:03 +08:00
center_x, center_y = _extract_center_from_condition(
event_type, condition
)
2026-06-20 15:50:24 +08:00
info_text = _build_info_text(event_type, condition, occurred_time)
2026-06-19 17:04:03 +08:00
return {
"name": f"inference_{inference['id']}",
"path": template_path,
"outFile": out_file,
"mapLayout": map_layout,
"mapTitle": map_title,
"mapTime": map_time,
"mapUint": map_unit,
2026-06-20 15:50:24 +08:00
"info": info_text,
2026-06-19 17:04:03 +08:00
"centerX": center_x,
"centerY": center_y,
"event": str(inference["id"]),
"queueId": str(inference["id"]),
"zoomRule": zoom_rule,
"zoomValue": zoom_value,
}
def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
"""从 condition 提取地图中心坐标"""
if event_type == "earthquake":
lon = condition.get("epicenter_lon", xian_center[0])
lat = condition.get("epicenter_lat", xian_center[1])
return float(lon), float(lat)
else:
return xian_center[0], xian_center[1]
# ============================================================
# 构建 QGIS 服务配置字典
# ============================================================
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
2026-06-21 22:30:04 +08:00
from app.services.qgis.qgis_env import (
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
2026-06-20 15:50:24 +08:00
gpkg_dir = get_gpkg_dir()
2026-06-19 17:04:03 +08:00
2026-06-21 22:30:04 +08:00
# Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行)
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
)
is_docker = result.stdout.strip() == "true"
except Exception:
is_docker = False
if is_docker:
2026-06-22 11:01:15 +08:00
# GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P)
# 需先执行 python script/copy_gpkg_to_container.py
gpkg_dir = getattr(settings, "QGIS_DOCKER_GPKG_DIR", "") or ""
if not gpkg_dir:
# fallback: 使用挂载路径(性能差)
project_dir = get_docker_project_dir()
gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg")
gpkg_dir = f"{project_dir}/{gpkg_subdir}"
logger.warning(f"GPKG 将从挂载目录读取(慢),建议执行 copy_gpkg_to_container.py")
2026-06-21 22:30:04 +08:00
# batch_folder:主机路径 → 容器路径
host_fs = get_host_file_store().rstrip("/")
container_fs = get_container_file_store().rstrip("/")
batch_folder_container = batch_folder.replace("\\", "/")
if batch_folder_container.lower().startswith(host_fs.lower()):
batch_folder_container = container_fs + batch_folder_container[len(host_fs):]
batch_folder = batch_folder_container
logger.info(f"[Docker模式] gpkg_dir={gpkg_dir}, batch_folder={batch_folder}")
else:
logger.info(f"[本地模式] gpkg_dir={gpkg_dir}")
2026-06-19 17:04:03 +08:00
return {
"db": {
"host": settings.DB_HOST,
"port": settings.DB_PORT,
"database": settings.DB_NAME,
"username": settings.DB_USER,
"password": settings.DB_PASSWORD,
},
"qgis": {
"exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300),
},
2026-06-20 15:50:24 +08:00
"static_layers": build_static_layers_config(gpkg_dir),
2026-06-19 17:04:03 +08:00
"batch_folder": batch_folder,
}
# ============================================================
2026-06-24 11:22:20 +08:00
# 接口实现(异步 fire-and-forget 模式)
2026-06-19 17:04:03 +08:00
# ============================================================
2026-06-21 16:39:47 +08:00
2026-06-24 11:22:20 +08:00
def _background_export(inference_id: int) -> None:
"""后台线程:执行产图全流程(DB 查询 + 模板扫描 + 进程池提交)"""
2026-06-21 22:30:04 +08:00
from app.services.qgis.qgis_env import get_docker_container
2026-06-24 11:22:20 +08:00
from app.services.qgis.qgis_pool import qgis_pool
2026-06-21 16:37:21 +08:00
from app.services.qgis.qgis_env import (
2026-06-22 11:01:15 +08:00
map_host_to_container, map_container_to_host, map_template_to_container,
2026-06-21 16:37:21 +08:00
)
2026-06-21 14:52:23 +08:00
2026-06-24 11:22:20 +08:00
try:
# 1. 查推理结果
inference = qgis_repository.query_inference_result(inference_id)
event_type = inference["event_type"]
batch_key = str(inference_id)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
logger.info(
f"[后台] 推理结果: id={inference['id']}, "
f"type={event_type}, occurred_time={inference['occurred_time']}"
)
2026-06-21 14:52:23 +08:00
2026-06-24 11:22:20 +08:00
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
2026-06-21 22:30:04 +08:00
2026-06-24 11:22:20 +08:00
# 2. 扫描模板
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i)
return (1, name)
template_files.sort(key=_priority_order)
if not template_files:
logger.error(f"[后台] 模板文件夹为空: {template_dir}")
return
# 3. 增量检查
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[后台] [跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return
missing_templates = [
f for f in template_files
if os.path.splitext(f)[0] + ".jpg" not in existing
]
logger.info(
f"[后台] [增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
2026-06-21 14:52:23 +08:00
2026-06-24 11:22:20 +08:00
# 4. 构建模型参数
models = []
for tpl_file in missing_templates:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"[后台] 模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}"
)
models.append(model)
2026-06-21 16:37:21 +08:00
2026-06-24 11:22:20 +08:00
# 5. 路径映射到容器内
2026-06-21 22:30:04 +08:00
container_models = []
2026-06-24 11:22:20 +08:00
for m in models:
2026-06-21 22:30:04 +08:00
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
if "path" in cm:
2026-06-22 11:01:15 +08:00
cm["path"] = map_template_to_container(cm["path"])
2026-06-21 22:30:04 +08:00
container_models.append(cm)
2026-06-24 11:22:20 +08:00
logger.info(f"[后台] 提交 {len(container_models)} 张图到 Worker 进程池")
# 6. 提交到进程池(阻塞等待产图完成)
db_lock = threading.Lock()
all_results = []
def _on_progress(r: dict):
if "output" in r:
r["output"] = map_container_to_host(r["output"])
all_results.append(r)
status = "FAIL" if "error" in r else "OK"
logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
2026-06-24 14:16:23 +08:00
_write_single_path(inference_id, r["output"], file_store, db_lock)
2026-06-24 11:22:20 +08:00
results, summary = qgis_pool.submit_job(config, container_models, _on_progress)
success_count = summary.get("ok", 0)
fail_count = summary.get("fail", 0)
elapsed = summary.get("elapsed", 0)
logger.info(f"[后台] 完成: 成功={success_count}, 失败={fail_count}, 耗时={elapsed}s")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
except Exception as e:
logger.error(f"[后台] 产图失败 (inferenceId={inference_id}): {e}", exc_info=True)
finally:
with _tasks_lock:
_background_tasks.discard(inference_id)
logger.info(f"[后台] 任务结束 (inferenceId={inference_id})")
2026-06-21 16:37:21 +08:00
2026-06-21 22:30:04 +08:00
2026-06-24 11:22:20 +08:00
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。
异步模式:请求立即返回,产图在后台线程执行。
"""
from app.services.qgis.qgis_env import get_docker_container
2026-06-21 14:52:23 +08:00
2026-06-24 11:22:20 +08:00
# 检查 Docker 容器
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
2026-06-21 16:37:21 +08:00
)
2026-06-24 11:22:20 +08:00
if result.stdout.strip() != "true":
raise HTTPException(status_code=503, detail="QGIS Docker 容器未运行")
except HTTPException:
raise
except Exception:
raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用")
2026-06-21 16:37:21 +08:00
2026-06-24 11:22:20 +08:00
inference_id = req.inferenceId
# 去重:同一 inferenceId 不重复提交
with _tasks_lock:
if inference_id in _background_tasks:
logger.info(f"inferenceId={inference_id} 已有任务在执行,跳过")
return QgisMapExportResponse(
code=200,
message=f"任务已在执行中,请稍后查看结果",
data=str(inference_id),
2026-06-21 22:30:04 +08:00
)
2026-06-24 11:22:20 +08:00
_background_tasks.add(inference_id)
# 启动后台线程执行产图
thread = threading.Thread(
target=_background_export,
args=(inference_id,),
daemon=True,
name=f"qgis-export-{inference_id}",
)
thread.start()
logger.info(f"后台产图任务已启动 (inferenceId={inference_id})")
2026-06-21 16:37:21 +08:00
2026-06-24 11:22:20 +08:00
return QgisMapExportResponse(
code=200,
message=f"任务已提交,正在后台生成专题图",
data=str(inference_id),
)
2026-06-21 14:52:23 +08:00
2026-06-21 16:37:21 +08:00
# ============================================================
# file_path 数据库记录
# ============================================================
def _relative_path(absolute_path: str, file_store: str) -> str:
"""绝对路径 → 相对于 FILE_STORE_DIR 的路径"""
fs = file_store.replace("\\", "/").rstrip("/")
ap = absolute_path.replace("\\", "/")
if ap.startswith(fs + "/"):
return ap[len(fs) + 1:]
return ap
def _write_single_path(inference_id: int, out_path: str, file_store: str, lock) -> None:
"""单张图产出后立即写入进度表"""
if not out_path or not os.path.isfile(out_path):
return
rel = _relative_path(out_path, file_store)
if not rel:
return
try:
from app.repositories.qgis_repository import qgis_repository
with lock:
qgis_repository.insert_file_paths(inference_id, [rel])
except Exception as e:
logger.error(f"[实时写入] 失败: {e}")