""" QGIS 专题图导出接口 - 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名 - 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次 - 线程池并发产图(默认 4 worker) - 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg - 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称} """ import asyncio import concurrent.futures import os import re from datetime import datetime from typing import Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from app.config.paths import get_logger from app.repositories import qgis_repository from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest from app.utils.api_deps import get_prediction_semaphore from app.utils.db_helper import db_helper from config import settings logger = get_logger("api.qgis") router = APIRouter(prefix="/qgis", tags=["专题图导出"]) # 线程池(按配置初始化 worker 数量) _worker_threads = getattr(settings, "QGIS_WORKER_THREADS", 4) _thread_pool = concurrent.futures.ThreadPoolExecutor( max_workers=_worker_threads, thread_name_prefix="qgis-worker", ) logger.info(f"QGIS 线程池初始化: {_worker_threads} workers") # 去重锁:in_progress_locks[disaster_time] = asyncio.Lock # 同一灾害时间的并发请求会排队,避免重复产图 _in_progress_locks: dict[str, asyncio.Lock] = {} _locks_lock = asyncio.Lock() # 西安市中心 xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161]) # ============================================================ # 时间戳格式化 # ============================================================ def format_disaster_time(occurred_time) -> str: """将 occurred_time 格式化为时间戳字符串(YYYYMMDDHHmmss),作为文件夹名""" if isinstance(occurred_time, datetime): return occurred_time.strftime("%Y%m%d%H%M%S") elif occurred_time: return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14] else: return datetime.now().strftime("%Y%m%d%H%M%S") # ============================================================ # region_code → 区县名称映射 # ============================================================ REGION_CODE_MAP = settings.area.to_dict() def _resolve_district(condition: dict) -> str: """从 condition.region_code 映射区县名称""" code = condition.get("region_code") if code and str(code) in REGION_CODE_MAP: return REGION_CODE_MAP[str(code)] return "" def _build_map_title(event_type: str, condition: dict, template_name: str) -> str: """ 构建地图标题。 格式:陕西西安{区县名称}{震级/降雨量}{模板名称} - 地震:陕西西安临潼区5.1级地震专题图 - 暴雨(有降雨量):陕西西安长安区120mm降雨专题图 - 暴雨(无降雨量):陕西西安降雨专题图 """ district = _resolve_district(condition) prefix = f"陕西西安{district}" if district else "陕西西安" if event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None: return f"{prefix}{float(magnitude)}级{template_name}" return f"{prefix}{template_name}" elif event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": return f"{prefix}{float(rainfall)}mm{template_name}" return f"{prefix}{template_name}" return f"{prefix}{template_name}" def _derive_model_params( inference: dict, batch_folder: str, out_filename: str ) -> dict: """从推理结果 + 配置文件推导专题图生成所需的全部参数。""" event_type = inference["event_type"] condition = inference["condition"] occurred_time = inference["occurred_time"] map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3") zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11") zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50") map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局") template_base = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "app", "data", "template" ) template_path = os.path.join(template_base, event_type, f"{map_layout}.qgz") template_name_map = {"earthquake": "地震专题图", "rainfall": "降雨专题图"} template_name = template_name_map.get(event_type, f"{event_type}专题图") map_title = _build_map_title(event_type, condition, template_name) safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name) out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/") if occurred_time and isinstance(occurred_time, datetime): map_time = occurred_time.strftime("%Y-%m-%d %H:%M") elif occurred_time: map_time = str(occurred_time) else: map_time = datetime.now().strftime("%Y-%m-%d %H:%M") center_x, center_y = _extract_center_from_condition( event_type, condition ) return { "name": f"inference_{inference['id']}", "path": template_path, "outFile": out_file, "mapLayout": map_layout, "mapTitle": map_title, "mapTime": map_time, "mapUint": map_unit, "info": "", "centerX": center_x, "centerY": center_y, "event": str(inference["id"]), "queueId": str(inference["id"]), "zoomRule": zoom_rule, "zoomValue": zoom_value, } def _extract_center_from_condition(event_type: str, condition: dict) -> tuple: """从 condition 提取地图中心坐标""" if event_type == "earthquake": lon = condition.get("epicenter_lon", xian_center[0]) lat = condition.get("epicenter_lat", xian_center[1]) return float(lon), float(lat) else: return xian_center[0], xian_center[1] # ============================================================ # 构建 QGIS 服务配置字典 # ============================================================ def _build_qgis_config(batch_folder: str) -> dict: """构建 QGIS 服务配置(含批次输出目录)""" gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg") project_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) gpkg_dir = os.path.join(project_root, gpkg_subdir).replace("\\", "/") return { "db": { "host": settings.DB_HOST, "port": settings.DB_PORT, "database": settings.DB_NAME, "username": settings.DB_USER, "password": settings.DB_PASSWORD, }, "qgis": { "exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300), }, "template_override": { "enabled": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ENABLED", False), "original": { "host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_HOST", "localhost"), "port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_PORT", 5432), "dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_DB_NAME", "yjzyk_xian"), "schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ORIGINAL_SCHEMA", "base"), }, "actual": { "host": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_HOST", ""), "port": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_PORT", ""), "dbname": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_DB_NAME", "xian_new"), "schema": getattr(settings, "QGIS_TEMPLATE_OVERRIDE_ACTUAL_SCHEMA", "qgis"), }, }, "static_layers": _build_static_layers_config(gpkg_dir), "batch_folder": batch_folder, } def _build_static_layers_config(gpkg_dir: str) -> dict: """构建静态底图配置""" layer_defs = { "水库": ("qgis.rivers", "rivers.gpkg"), "市州驻地": ("qgis.sx_capital", "sx_capital.gpkg"), "河流": ("qgis.river", "river.gpkg"), "active_fault": ("qgis.active_fault", "active_fault.gpkg"), "陕西省": ("qgis.sx", "sx.gpkg"), "乡镇驻地": ("qgis.sx_street", "sx_street.gpkg"), "区县驻地": ("qgis.sx_xa_county", "sx_xa_county.gpkg"), "县界": ("qgis.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"), "周边区县": ("qgis.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"), "周边市州": ("qgis.sx_zb_city", "sx_zb_city.gpkg"), "周边县区": ("qgis.sx_zb_county", "sx_zb_county.gpkg"), "traffic_expressway": ("qgis.traffic_expressway", "traffic_expressway.gpkg"), "traffic_provincial": ("qgis.traffic_provincial", "traffic_provincial.gpkg"), "traffic_railway": ("qgis.traffic_railway", "traffic_railway.gpkg"), "traffic_township": ("qgis.traffic_township", "traffic_township.gpkg"), "traffic_trunk_line": ("qgis.traffic_trunk_line", "traffic_trunk_line.gpkg"), } layers = {} for name, (table, gpkg_file) in layer_defs.items(): layers[name] = {"file": gpkg_file, "table": table} return {"enabled": True, "gpkg_dir": gpkg_dir, "layers": layers} # ============================================================ # 接口实现 # ============================================================ @router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出") async def export_map(req: QgisMapExportRequest): """ 根据模拟ID批量导出专题图。同一 occurred_time 视为同一场灾害,共享文件夹 """ from app.services.qgis.qgis_env import is_qgis_ready if not is_qgis_ready(): raise HTTPException(status_code=503, detail="QGIS 环境未初始化,专题图功能不可用") semaphore = get_prediction_semaphore() async with semaphore: simulation_id = req.simulationId loop = asyncio.get_event_loop() try: # 查询推理记录,获取 occurred_time inference = await loop.run_in_executor( None, qgis_repository.query_inference_result, simulation_id ) # 将 occurred_time 格式化为时间戳作为文件夹名 disaster_time = format_disaster_time(inference["occurred_time"]) # 构建批次文件夹路径 file_store = getattr(settings, "FILE_STORE_DIR", "G:/files") output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:disasterTime") output_dir = output_tmpl.replace(":disasterTime", disaster_time) batch_folder = os.path.join(file_store, output_dir).replace("\\", "/") # 去重检查:同一 disasterTime 只产图一次 if os.path.isdir(batch_folder): existing_files = [ f for f in os.listdir(batch_folder) if f.endswith(".jpg") ] if existing_files and disaster_time not in _in_progress_locks: logger.info( f"[去重] disasterTime={disaster_time} 已产图,直接返回 " f"({len(existing_files)} 张)" ) return QgisMapExportResponse( code=200, message=f"已存在,共{len(existing_files)}张", data=disaster_time, ) # 去重锁:同一灾害时间排队等待 async with _locks_lock: if disaster_time not in _in_progress_locks: _in_progress_locks[disaster_time] = asyncio.Lock() task_lock = _in_progress_locks[disaster_time] async with task_lock: if os.path.isdir(batch_folder): existing_files = [ f for f in os.listdir(batch_folder) if f.endswith(".jpg") ] if existing_files: logger.info( f"[去重] disasterTime={disaster_time} 已被并发请求产图完成" ) return QgisMapExportResponse( code=200, message=f"已存在,共{len(existing_files)}张", data=disaster_time, ) logger.info( f"推理结果查询成功: id={inference['id']}, " f"type={inference['event_type']}, " f"occurred_time={inference['occurred_time']}, " f"disasterTime={disaster_time}" ) # 推导参数 + 构建配置 os.makedirs(batch_folder, exist_ok=True) event_type = inference["event_type"] template_name_map = {"earthquake": "地震专题图", "rainfall": "降雨专题图"} template_name = template_name_map.get(event_type, f"{event_type}专题图") out_filename = f"{template_name}.jpg" model = _derive_model_params(inference, batch_folder, out_filename) config = _build_qgis_config(batch_folder) logger.info( f"模板路径: {model['path']}, " f"输出: {model['outFile']}, " f"标题: {model['mapTitle']}, " f"中心: ({model['centerX']}, {model['centerY']})" ) # 提交到线程池 _thread_pool.submit( _generate_single_map, model, config, disaster_time ) return QgisMapExportResponse( code=200, message="任务已提交", data=disaster_time, ) except ValueError as e: logger.warning(f"参数错误: {e}") raise HTTPException(status_code=400, detail=str(e)) except FileNotFoundError as e: logger.error(f"模板文件不存在: {e}") raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}") except Exception as e: logger.error(f"专题图导出失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"导出失败: {e}") finally: pass def _generate_single_map(model: dict, config: dict, disaster_time: str) -> None: """线程池工作函数:在工作线程中生成单张专题图""" from app.services.qgis.map_service import MapService try: logger.info(f"[线程池] 开始产图: {model['mapTitle']} → {model['outFile']}") service = MapService(config) service.generate(model) if os.path.exists(model["outFile"]): logger.info(f"[线程池] 产图成功: {model['outFile']}") else: logger.error(f"[线程池] 产图后文件不存在: {model['outFile']}") except Exception as e: logger.error(f"[线程池] 产图失败: {model['name']} — {e}", exc_info=True) # ============================================================ # 清理函数 # ============================================================ def shutdown_thread_pool() -> None: """关闭线程池(在 server.py lifespan 关闭阶段调用)""" _thread_pool.shutdown(wait=False) logger.info("QGIS 线程池已关闭")