480 lines
19 KiB
Python
480 lines
19 KiB
Python
"""
|
||
QGIS 专题图导出接口
|
||
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
|
||
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
|
||
- 线程池并发产图(默认 4 worker)
|
||
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
|
||
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
|
||
"""
|
||
import asyncio
|
||
import concurrent.futures
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, HTTPException
|
||
from pydantic import BaseModel, Field
|
||
|
||
from app.config.paths import get_logger
|
||
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
|
||
from app.repositories import qgis_repository
|
||
from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest
|
||
from app.utils.api_deps import get_prediction_semaphore
|
||
from app.utils.db_helper import db_helper
|
||
from config import settings
|
||
|
||
logger = get_logger("api.qgis")
|
||
|
||
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
|
||
|
||
# 线程池(按配置初始化 worker 数量)
|
||
_worker_threads = getattr(settings, "QGIS_WORKER_THREADS", 4)
|
||
_thread_pool = concurrent.futures.ThreadPoolExecutor(
|
||
max_workers=_worker_threads,
|
||
thread_name_prefix="qgis-worker",
|
||
)
|
||
logger.info(f"QGIS 线程池初始化: {_worker_threads} workers")
|
||
|
||
# 去重锁:in_progress_locks[disaster_time] = asyncio.Lock
|
||
# 同一灾害时间的并发请求会排队,避免重复产图
|
||
_in_progress_locks: dict[str, asyncio.Lock] = {}
|
||
_locks_lock = asyncio.Lock()
|
||
|
||
# 西安市中心
|
||
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
|
||
|
||
|
||
# ============================================================
|
||
# 时间戳格式化
|
||
# ============================================================
|
||
|
||
def format_disaster_time(occurred_time) -> str:
|
||
"""将 occurred_time 格式化为时间戳字符串(YYYYMMDDHHmmss),作为文件夹名"""
|
||
if isinstance(occurred_time, datetime):
|
||
return occurred_time.strftime("%Y%m%d%H%M%S")
|
||
elif occurred_time:
|
||
return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
|
||
else:
|
||
return datetime.now().strftime("%Y%m%d%H%M%S")
|
||
|
||
|
||
# ============================================================
|
||
# region_code → 区县名称映射
|
||
# ============================================================
|
||
|
||
REGION_CODE_MAP = settings.area.to_dict()
|
||
|
||
|
||
def _resolve_district(condition: dict) -> str:
|
||
"""从 condition.region_code 映射区县名称"""
|
||
code = condition.get("region_code")
|
||
if code and str(code) in REGION_CODE_MAP:
|
||
return REGION_CODE_MAP[str(code)]
|
||
return ""
|
||
|
||
|
||
def _build_map_title(event_type: str, condition: dict, template_name: str) -> str:
|
||
"""
|
||
构建地图标题。
|
||
|
||
格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
|
||
- 地震:陕西西安临潼区5.1级地震专题图
|
||
- 暴雨(有降雨量):陕西西安长安区120mm降雨专题图
|
||
- 暴雨(无降雨量):陕西西安降雨专题图
|
||
"""
|
||
district = _resolve_district(condition)
|
||
prefix = f"陕西西安{district}" if district else "陕西西安"
|
||
|
||
if event_type == "earthquake":
|
||
magnitude = condition.get("magnitude")
|
||
if magnitude is not None:
|
||
return f"{prefix}{float(magnitude)}级{template_name}"
|
||
return f"{prefix}{template_name}"
|
||
|
||
elif event_type == "rainfall":
|
||
rainfall = condition.get("rainfall")
|
||
if rainfall is not None and rainfall != "":
|
||
return f"{prefix}{float(rainfall)}mm{template_name}"
|
||
return f"{prefix}{template_name}"
|
||
|
||
return f"{prefix}{template_name}"
|
||
|
||
|
||
def _build_info_text(event_type: str, condition: dict, occurred_time) -> str:
|
||
"""
|
||
构建信息面板文本(左上角橙色矩形区域)。
|
||
|
||
暴雨:时间 + 累计降雨量(如有)+ 已持续(如有)
|
||
地震:时间 + 震级(如有)+ 位置(如有)
|
||
不显示灾害类型标签(暴雨/地震)。
|
||
"""
|
||
lines = []
|
||
|
||
# 时间
|
||
if isinstance(occurred_time, datetime):
|
||
time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分"
|
||
elif occurred_time:
|
||
time_str = str(occurred_time)
|
||
else:
|
||
time_str = datetime.now().strftime("%Y年%m月%d日%H时%M分")
|
||
lines.append(f"时间:{time_str}")
|
||
|
||
if event_type == "rainfall":
|
||
rainfall = condition.get("rainfall")
|
||
if rainfall is not None and rainfall != "":
|
||
lines.append(f"累计降雨量:{float(rainfall)}mm")
|
||
|
||
duration = condition.get("duration")
|
||
if duration is not None and duration != "":
|
||
lines.append(f"已持续:{duration}")
|
||
|
||
elif event_type == "earthquake":
|
||
magnitude = condition.get("magnitude")
|
||
if magnitude is not None and magnitude != "":
|
||
lines.append(f"震级:{float(magnitude)}级")
|
||
|
||
lon = condition.get("epicenter_lon")
|
||
lat = condition.get("epicenter_lat")
|
||
if lon is not None and lat is not None:
|
||
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _derive_model_params(
|
||
inference: dict, batch_folder: str, template_path: str
|
||
) -> dict:
|
||
"""从推理结果 + 模板路径推导专题图生成所需的全部参数。"""
|
||
event_type = inference["event_type"]
|
||
condition = inference["condition"]
|
||
occurred_time = inference["occurred_time"]
|
||
|
||
map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
|
||
|
||
map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3")
|
||
zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11")
|
||
zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50")
|
||
|
||
# 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀)
|
||
template_name = os.path.splitext(os.path.basename(template_path))[0]
|
||
map_title = _build_map_title(event_type, condition, template_name)
|
||
|
||
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
|
||
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
|
||
|
||
# 制图时间统一用当前日期
|
||
map_time = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
center_x, center_y = _extract_center_from_condition(
|
||
event_type, condition
|
||
)
|
||
|
||
info_text = _build_info_text(event_type, condition, occurred_time)
|
||
|
||
return {
|
||
"name": f"inference_{inference['id']}",
|
||
"path": template_path,
|
||
"outFile": out_file,
|
||
"mapLayout": map_layout,
|
||
"mapTitle": map_title,
|
||
"mapTime": map_time,
|
||
"mapUint": map_unit,
|
||
"info": info_text,
|
||
"centerX": center_x,
|
||
"centerY": center_y,
|
||
"event": str(inference["id"]),
|
||
"queueId": str(inference["id"]),
|
||
"zoomRule": zoom_rule,
|
||
"zoomValue": zoom_value,
|
||
}
|
||
|
||
|
||
def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
|
||
"""从 condition 提取地图中心坐标"""
|
||
if event_type == "earthquake":
|
||
lon = condition.get("epicenter_lon", xian_center[0])
|
||
lat = condition.get("epicenter_lat", xian_center[1])
|
||
return float(lon), float(lat)
|
||
else:
|
||
return xian_center[0], xian_center[1]
|
||
|
||
|
||
# ============================================================
|
||
# 构建 QGIS 服务配置字典
|
||
# ============================================================
|
||
|
||
def _build_qgis_config(batch_folder: str) -> dict:
|
||
"""构建 QGIS 服务配置(含批次输出目录)"""
|
||
gpkg_dir = get_gpkg_dir()
|
||
|
||
return {
|
||
"db": {
|
||
"host": settings.DB_HOST,
|
||
"port": settings.DB_PORT,
|
||
"database": settings.DB_NAME,
|
||
"username": settings.DB_USER,
|
||
"password": settings.DB_PASSWORD,
|
||
},
|
||
"qgis": {
|
||
"exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300),
|
||
},
|
||
"template_override": {
|
||
"enabled": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ENABLED", False),
|
||
"original": {
|
||
"host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST", "localhost"),
|
||
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_PORT", 5432),
|
||
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_DB_NAME", "yjzyk_xian"),
|
||
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_SCHEMA", "base"),
|
||
},
|
||
"actual": {
|
||
"host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST", ""),
|
||
"port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT", ""),
|
||
"dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME", "xian_new"),
|
||
"schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA", "qgis"),
|
||
"username": getattr(settings, "DB_USER", "postgres"),
|
||
"password": getattr(settings, "DB_PASSWORD", ""),
|
||
},
|
||
},
|
||
"static_layers": build_static_layers_config(gpkg_dir),
|
||
"batch_folder": batch_folder,
|
||
}
|
||
|
||
|
||
# ============================================================
|
||
# 接口实现
|
||
# ============================================================
|
||
|
||
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
|
||
async def export_map(req: QgisMapExportRequest):
|
||
"""
|
||
根据模拟ID批量导出专题图。同一 occurred_time 视为同一场灾害,共享文件夹
|
||
"""
|
||
from app.services.qgis.qgis_env import is_qgis_available
|
||
if not is_qgis_available():
|
||
raise HTTPException(status_code=503, detail="QGIS 环境不可用(未找到 QGIS Python 3.12 解释器)")
|
||
|
||
semaphore = get_prediction_semaphore()
|
||
|
||
async with semaphore:
|
||
inference_id = req.inferenceId
|
||
loop = asyncio.get_event_loop()
|
||
|
||
try:
|
||
# 查询推理记录,获取 occurred_time
|
||
inference = await loop.run_in_executor(
|
||
None, qgis_repository.query_inference_result, inference_id
|
||
)
|
||
|
||
# 将 occurred_time 格式化为时间戳作为文件夹名
|
||
disaster_time = format_disaster_time(inference["occurred_time"])
|
||
event_type = inference["event_type"]
|
||
|
||
# 构建批次文件夹路径
|
||
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files")
|
||
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:disasterTime")
|
||
output_dir = output_tmpl.replace(":eventType", event_type).replace(":disasterTime", disaster_time)
|
||
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
|
||
|
||
# 去重检查:同一 disasterTime 只产图一次
|
||
if os.path.isdir(batch_folder):
|
||
existing_files = [
|
||
f for f in os.listdir(batch_folder) if f.endswith(".jpg")
|
||
]
|
||
if existing_files and disaster_time not in _in_progress_locks:
|
||
logger.info(
|
||
f"[去重] disasterTime={disaster_time} 已产图,直接返回 "
|
||
f"({len(existing_files)} 张)"
|
||
)
|
||
return QgisMapExportResponse(
|
||
code=200,
|
||
message=f"已存在,共{len(existing_files)}张",
|
||
data=disaster_time,
|
||
)
|
||
|
||
# 去重锁:同一灾害时间排队等待
|
||
async with _locks_lock:
|
||
if disaster_time not in _in_progress_locks:
|
||
_in_progress_locks[disaster_time] = asyncio.Lock()
|
||
task_lock = _in_progress_locks[disaster_time]
|
||
|
||
async with task_lock:
|
||
if os.path.isdir(batch_folder):
|
||
existing_files = [
|
||
f for f in os.listdir(batch_folder) if f.endswith(".jpg")
|
||
]
|
||
if existing_files:
|
||
logger.info(
|
||
f"[去重] disasterTime={disaster_time} 已被并发请求产图完成"
|
||
)
|
||
return QgisMapExportResponse(
|
||
code=200,
|
||
message=f"已存在,共{len(existing_files)}张",
|
||
data=disaster_time,
|
||
)
|
||
|
||
logger.info(
|
||
f"推理结果查询成功: id={inference['id']}, "
|
||
f"type={inference['event_type']}, "
|
||
f"occurred_time={inference['occurred_time']}, "
|
||
f"disasterTime={disaster_time}"
|
||
)
|
||
|
||
# 推导参数 + 构建配置
|
||
os.makedirs(batch_folder, exist_ok=True)
|
||
|
||
config = _build_qgis_config(batch_folder)
|
||
|
||
# 扫描模板文件夹下所有 .qgz 文件
|
||
template_base = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
|
||
"app", "data", "template"
|
||
)
|
||
template_dir = os.path.join(template_base, event_type)
|
||
template_files = sorted([
|
||
f for f in os.listdir(template_dir)
|
||
if f.endswith(".qgz") and not f.startswith("tmp")
|
||
])
|
||
|
||
if not template_files:
|
||
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
|
||
|
||
# 构建所有模型参数(批量模式)
|
||
models = []
|
||
for tpl_file in template_files:
|
||
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
|
||
model = _derive_model_params(inference, batch_folder, tpl_path)
|
||
logger.info(
|
||
f"模板: {tpl_path}, "
|
||
f"输出: {model['outFile']}, "
|
||
f"标题: {model['mapTitle']}, "
|
||
f"中心: ({model['centerX']}, {model['centerY']})"
|
||
)
|
||
models.append(model)
|
||
|
||
# 一次性提交所有模型到 QGIS 子进程(单次 DLL 加载)
|
||
_generate_batch_maps(models, config, disaster_time)
|
||
|
||
return QgisMapExportResponse(
|
||
code=200,
|
||
message=f"任务已完成,共{len(models)}张专题图",
|
||
data=disaster_time,
|
||
)
|
||
|
||
except ValueError as e:
|
||
logger.warning(f"参数错误: {e}")
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
except FileNotFoundError as e:
|
||
logger.error(f"模板文件不存在: {e}")
|
||
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
|
||
except Exception as e:
|
||
logger.error(f"专题图导出失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
|
||
finally:
|
||
pass
|
||
|
||
|
||
def _generate_batch_maps(models: list, config: dict, disaster_time: str) -> None:
|
||
"""通过 QGIS Python 3.12 子进程批量生成专题图(单次 DLL 加载)"""
|
||
import json
|
||
import subprocess
|
||
import tempfile
|
||
from app.services.qgis.qgis_env import build_qgis_command
|
||
|
||
try:
|
||
logger.info(f"[批量产图] 开始: {len(models)} 张专题图, batch={disaster_time}")
|
||
|
||
# 构建子进程请求 JSON —— 批量格式
|
||
request_data = json.dumps(
|
||
{"config": config, "models": models},
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# 将请求 JSON 写入临时文件(避免 stdin 管道问题)
|
||
tmp_json = tempfile.NamedTemporaryFile(
|
||
suffix=".json", delete=False, mode="w", encoding="utf-8"
|
||
)
|
||
tmp_json.write(request_data)
|
||
tmp_json.close()
|
||
|
||
try:
|
||
from config import settings
|
||
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
|
||
cmd = build_qgis_command(qgis_root)
|
||
cmd.append(tmp_json.name)
|
||
|
||
logger.info(f"[批量产图] 启动 QGIS 子进程: {' '.join(cmd[:2])}...")
|
||
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
timeout=600, # 10 分钟超时(15 张模板 × ~40s/张 ≈ 600s)
|
||
# 不传 env —— 让子进程继承父进程环境,
|
||
# runner 内部 _setup_environment() 会设置 QGIS 所需变量。
|
||
# build_qgis_env() 设置的 PYTHONPATH/PATH/QGIS_PREFIX_PATH
|
||
# 与 QGIS DLL 加载冲突,导致 0xC0000005 崩溃。
|
||
)
|
||
finally:
|
||
try:
|
||
os.remove(tmp_json.name)
|
||
except OSError:
|
||
pass
|
||
|
||
# 解析子进程输出 —— 优先检查 stdout 是否有有效结果
|
||
stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
|
||
parsed_output = None
|
||
if stdout_text:
|
||
for line in reversed(stdout_text.split("\n")):
|
||
line = line.strip()
|
||
if line.startswith("{"):
|
||
try:
|
||
parsed_output = json.loads(line)
|
||
break
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
if parsed_output is not None:
|
||
batch_results = parsed_output.get("results", [])
|
||
success_count = sum(1 for r in batch_results if "error" not in r)
|
||
fail_count = len(batch_results) - success_count
|
||
logger.info(
|
||
f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}"
|
||
)
|
||
for r in batch_results:
|
||
if "error" not in r:
|
||
logger.info(f" OK {r.get('output', 'N/A')}")
|
||
else:
|
||
logger.error(f" FAIL {r.get('error', 'unknown')}")
|
||
if success_count == 0 and fail_count > 0:
|
||
first_err = batch_results[0].get("error", "unknown")
|
||
raise RuntimeError(
|
||
f"QGIS 子进程所有模型均失败 ({fail_count}张): {first_err}"
|
||
)
|
||
elif result.returncode != 0:
|
||
# stdout 没有有效结果且退出码异常,才报错
|
||
stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
|
||
logger.error(f"[批量产图] QGIS 子进程失败 (exit={result.returncode}):")
|
||
for line in stderr_text.split("\n"):
|
||
logger.error(f" {line}")
|
||
raise RuntimeError(
|
||
f"QGIS 子进程失败: {stderr_text[:300]}"
|
||
)
|
||
else:
|
||
logger.warning("[批量产图] 子进程无有效输出,exit code = 0")
|
||
|
||
except subprocess.TimeoutExpired:
|
||
logger.error(f"[批量产图] QGIS 子进程超时 (300s)")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"[批量产图] 产图失败: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
# ============================================================
|
||
# 清理函数
|
||
# ============================================================
|
||
|
||
def shutdown_thread_pool() -> None:
|
||
"""关闭线程池(在 server.py lifespan 关闭阶段调用)"""
|
||
_thread_pool.shutdown(wait=False)
|
||
logger.info("QGIS 线程池已关闭") |