Files
2026-06-20 21:12:52 +08:00

480 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
QGIS 专题图导出接口
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
- 线程池并发产图(默认 4 worker)
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
"""
import asyncio
import concurrent.futures
import os
import re
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from app.config.paths import get_logger
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
from app.repositories import qgis_repository
from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest
from app.utils.api_deps import get_prediction_semaphore
from app.utils.db_helper import db_helper
from config import settings
logger = get_logger("api.qgis")
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
# 线程池(按配置初始化 worker 数量)
_worker_threads = getattr(settings, "QGIS_WORKER_THREADS", 4)
_thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=_worker_threads,
thread_name_prefix="qgis-worker",
)
logger.info(f"QGIS 线程池初始化: {_worker_threads} workers")
# 去重锁:in_progress_locks[disaster_time] = asyncio.Lock
# 同一灾害时间的并发请求会排队,避免重复产图
_in_progress_locks: dict[str, asyncio.Lock] = {}
_locks_lock = asyncio.Lock()
# 西安市中心
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
# ============================================================
# 时间戳格式化
# ============================================================
def format_disaster_time(occurred_time) -> str:
"""将 occurred_time 格式化为时间戳字符串(YYYYMMDDHHmmss),作为文件夹名"""
if isinstance(occurred_time, datetime):
return occurred_time.strftime("%Y%m%d%H%M%S")
elif occurred_time:
return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
else:
return datetime.now().strftime("%Y%m%d%H%M%S")
# ============================================================
# region_code → 区县名称映射
# ============================================================
REGION_CODE_MAP = settings.area.to_dict()
def _resolve_district(condition: dict) -> str:
"""从 condition.region_code 映射区县名称"""
code = condition.get("region_code")
if code and str(code) in REGION_CODE_MAP:
return REGION_CODE_MAP[str(code)]
return ""
def _build_map_title(event_type: str, condition: dict, template_name: str) -> str:
"""
构建地图标题。
格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
- 地震:陕西西安临潼区5.1级地震专题图
- 暴雨(有降雨量):陕西西安长安区120mm降雨专题图
- 暴雨(无降雨量):陕西西安降雨专题图
"""
district = _resolve_district(condition)
prefix = f"陕西西安{district}" if district else "陕西西安"
if event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None:
return f"{prefix}{float(magnitude)}{template_name}"
return f"{prefix}{template_name}"
elif event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
return f"{prefix}{float(rainfall)}mm{template_name}"
return f"{prefix}{template_name}"
return f"{prefix}{template_name}"
def _build_info_text(event_type: str, condition: dict, occurred_time) -> str:
"""
构建信息面板文本(左上角橙色矩形区域)。
暴雨:时间 + 累计降雨量(如有)+ 已持续(如有)
地震:时间 + 震级(如有)+ 位置(如有)
不显示灾害类型标签(暴雨/地震)。
"""
lines = []
# 时间
if isinstance(occurred_time, datetime):
time_str = f"{occurred_time.year}{occurred_time.month:02d}{occurred_time.day:02d}{occurred_time.hour:02d}{occurred_time.minute:02d}"
elif occurred_time:
time_str = str(occurred_time)
else:
time_str = datetime.now().strftime("%Y年%m月%d%H时%M分")
lines.append(f"时间:{time_str}")
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
lines.append(f"累计降雨量:{float(rainfall)}mm")
duration = condition.get("duration")
if duration is not None and duration != "":
lines.append(f"已持续:{duration}")
elif event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None and magnitude != "":
lines.append(f"震级:{float(magnitude)}")
lon = condition.get("epicenter_lon")
lat = condition.get("epicenter_lat")
if lon is not None and lat is not None:
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
return "\n".join(lines)
def _derive_model_params(
inference: dict, batch_folder: str, template_path: str
) -> dict:
"""从推理结果 + 模板路径推导专题图生成所需的全部参数。"""
event_type = inference["event_type"]
condition = inference["condition"]
occurred_time = inference["occurred_time"]
map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3")
zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11")
zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50")
# 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀)
template_name = os.path.splitext(os.path.basename(template_path))[0]
map_title = _build_map_title(event_type, condition, template_name)
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
# 制图时间统一用当前日期
map_time = datetime.now().strftime("%Y-%m-%d")
center_x, center_y = _extract_center_from_condition(
event_type, condition
)
info_text = _build_info_text(event_type, condition, occurred_time)
return {
"name": f"inference_{inference['id']}",
"path": template_path,
"outFile": out_file,
"mapLayout": map_layout,
"mapTitle": map_title,
"mapTime": map_time,
"mapUint": map_unit,
"info": info_text,
"centerX": center_x,
"centerY": center_y,
"event": str(inference["id"]),
"queueId": str(inference["id"]),
"zoomRule": zoom_rule,
"zoomValue": zoom_value,
}
def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
"""从 condition 提取地图中心坐标"""
if event_type == "earthquake":
lon = condition.get("epicenter_lon", xian_center[0])
lat = condition.get("epicenter_lat", xian_center[1])
return float(lon), float(lat)
else:
return xian_center[0], xian_center[1]
# ============================================================
# 构建 QGIS 服务配置字典
# ============================================================
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
gpkg_dir = get_gpkg_dir()
return {
"db": {
"host": settings.DB_HOST,
"port": settings.DB_PORT,
"database": settings.DB_NAME,
"username": settings.DB_USER,
"password": settings.DB_PASSWORD,
},
"qgis": {
"exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300),
},
"template_override": {
"enabled": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ENABLED", False),
"original": {
"host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST", "localhost"),
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_PORT", 5432),
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_DB_NAME", "yjzyk_xian"),
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_SCHEMA", "base"),
},
"actual": {
"host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST", ""),
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT", ""),
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME", "xian_new"),
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA", "qgis"),
"username": getattr(settings, "DB_USER", "postgres"),
"password": getattr(settings, "DB_PASSWORD", ""),
},
},
"static_layers": build_static_layers_config(gpkg_dir),
"batch_folder": batch_folder,
}
# ============================================================
# 接口实现
# ============================================================
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 occurred_time 视为同一场灾害,共享文件夹
"""
from app.services.qgis.qgis_env import is_qgis_available
if not is_qgis_available():
raise HTTPException(status_code=503, detail="QGIS 环境不可用(未找到 QGIS Python 3.12 解释器)")
semaphore = get_prediction_semaphore()
async with semaphore:
inference_id = req.inferenceId
loop = asyncio.get_event_loop()
try:
# 查询推理记录,获取 occurred_time
inference = await loop.run_in_executor(
None, qgis_repository.query_inference_result, inference_id
)
# 将 occurred_time 格式化为时间戳作为文件夹名
disaster_time = format_disaster_time(inference["occurred_time"])
event_type = inference["event_type"]
# 构建批次文件夹路径
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:disasterTime")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":disasterTime", disaster_time)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
# 去重检查:同一 disasterTime 只产图一次
if os.path.isdir(batch_folder):
existing_files = [
f for f in os.listdir(batch_folder) if f.endswith(".jpg")
]
if existing_files and disaster_time not in _in_progress_locks:
logger.info(
f"[去重] disasterTime={disaster_time} 已产图,直接返回 "
f"({len(existing_files)} 张)"
)
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing_files)}",
data=disaster_time,
)
# 去重锁:同一灾害时间排队等待
async with _locks_lock:
if disaster_time not in _in_progress_locks:
_in_progress_locks[disaster_time] = asyncio.Lock()
task_lock = _in_progress_locks[disaster_time]
async with task_lock:
if os.path.isdir(batch_folder):
existing_files = [
f for f in os.listdir(batch_folder) if f.endswith(".jpg")
]
if existing_files:
logger.info(
f"[去重] disasterTime={disaster_time} 已被并发请求产图完成"
)
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing_files)}",
data=disaster_time,
)
logger.info(
f"推理结果查询成功: id={inference['id']}, "
f"type={inference['event_type']}, "
f"occurred_time={inference['occurred_time']}, "
f"disasterTime={disaster_time}"
)
# 推导参数 + 构建配置
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 扫描模板文件夹下所有 .qgz 文件
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
if not template_files:
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
# 构建所有模型参数(批量模式)
models = []
for tpl_file in template_files:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}, "
f"中心: ({model['centerX']}, {model['centerY']})"
)
models.append(model)
# 一次性提交所有模型到 QGIS 子进程(单次 DLL 加载)
_generate_batch_maps(models, config, disaster_time)
return QgisMapExportResponse(
code=200,
message=f"任务已完成,共{len(models)}张专题图",
data=disaster_time,
)
except ValueError as e:
logger.warning(f"参数错误: {e}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
logger.error(f"模板文件不存在: {e}")
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
except Exception as e:
logger.error(f"专题图导出失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
finally:
pass
def _generate_batch_maps(models: list, config: dict, disaster_time: str) -> None:
"""通过 QGIS Python 3.12 子进程批量生成专题图(单次 DLL 加载)"""
import json
import subprocess
import tempfile
from app.services.qgis.qgis_env import build_qgis_command, build_clean_subprocess_env
try:
logger.info(f"[批量产图] 开始: {len(models)} 张专题图, batch={disaster_time}")
# 构建子进程请求 JSON —— 批量格式
request_data = json.dumps(
{"config": config, "models": models},
ensure_ascii=False,
)
# 将请求 JSON 写入临时文件(避免 stdin 管道问题)
tmp_json = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8"
)
tmp_json.write(request_data)
tmp_json.close()
try:
from config import settings
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
cmd = build_qgis_command(qgis_root)
cmd.append(tmp_json.name)
logger.info(f"[批量产图] 启动 QGIS 子进程: {' '.join(cmd[:2])}...")
result = subprocess.run(
cmd,
capture_output=True,
timeout=600, # 10 分钟超时(15 张模板 × ~40s/张 ≈ 600s
env=build_clean_subprocess_env(),
# 使用干净的环境变量:移除 venv 的 PYTHONPATH/VIRTUAL_ENV/PATH 污染,
# 避免 QGIS Python 3.12 的 DLL 加载被干扰导致 0xC0000005 崩溃。
# QGIS Python 3.12 仅通过 sys.path 即可正确加载所有模块和 DLL。
)
finally:
try:
os.remove(tmp_json.name)
except OSError:
pass
# 解析子进程输出 —— 优先检查 stdout 是否有有效结果
stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
parsed_output = None
if stdout_text:
for line in reversed(stdout_text.split("\n")):
line = line.strip()
if line.startswith("{"):
try:
parsed_output = json.loads(line)
break
except json.JSONDecodeError:
continue
if parsed_output is not None:
batch_results = parsed_output.get("results", [])
success_count = sum(1 for r in batch_results if "error" not in r)
fail_count = len(batch_results) - success_count
logger.info(
f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}"
)
for r in batch_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r.get('error', 'unknown')}")
if success_count == 0 and fail_count > 0:
first_err = batch_results[0].get("error", "unknown")
raise RuntimeError(
f"QGIS 子进程所有模型均失败 ({fail_count}张): {first_err}"
)
elif result.returncode != 0:
# stdout 没有有效结果且退出码异常,才报错
stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
logger.error(f"[批量产图] QGIS 子进程失败 (exit={result.returncode}):")
for line in stderr_text.split("\n"):
logger.error(f" {line}")
raise RuntimeError(
f"QGIS 子进程失败: {stderr_text[:300]}"
)
else:
logger.warning("[批量产图] 子进程无有效输出,exit code = 0")
except subprocess.TimeoutExpired:
logger.error(f"[批量产图] QGIS 子进程超时 (300s)")
raise
except Exception as e:
logger.error(f"[批量产图] 产图失败: {e}", exc_info=True)
raise
# ============================================================
# 清理函数
# ============================================================
def shutdown_thread_pool() -> None:
"""关闭线程池(在 server.py lifespan 关闭阶段调用)"""
_thread_pool.shutdown(wait=False)
logger.info("QGIS 线程池已关闭")