""" QGIS 专题图导出接口 - 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名 - 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次 - Worker 进程池串行产图(QGIS 只初始化一次,模板顺序处理) - 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg - 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称} - 异步模式:HTTP 请求立即返回,产图在后台线程执行 """ import os import re import subprocess import threading from datetime import datetime from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from app.config.paths import get_logger from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir from app.repositories import qgis_repository from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest from app.utils.db_helper import db_helper from config import settings logger = get_logger("api.qgis") router = APIRouter(prefix="/qgis", tags=["专题图导出"]) # 后台任务跟踪(线程安全) _background_tasks: set[int] = set() _tasks_lock = threading.Lock() # 西安市中心 xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161]) # ============================================================ # region_code → 区县名称映射 # ============================================================ REGION_CODE_MAP = settings.area.to_dict() def _resolve_district(condition: dict) -> str: code = condition.get("region_code") if code and str(code) in REGION_CODE_MAP: return REGION_CODE_MAP[str(code)] return "" def _build_map_title(event_type: str, condition: dict, template_name: str) -> str: district = _resolve_district(condition) prefix = f"陕西西安{district}" if district else "陕西西安" if event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None: return f"{prefix}{float(magnitude)}级{template_name}" return f"{prefix}{template_name}" elif event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": return f"{prefix}{float(rainfall)}mm{template_name}" return f"{prefix}{template_name}" return f"{prefix}{template_name}" def _build_info_text(event_type: str, condition: dict, occurred_time) -> str: lines = [] if isinstance(occurred_time, datetime): time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分" elif occurred_time: time_str = str(occurred_time) else: time_str = datetime.now().strftime("%Y年%m月%d日%H时%M分") lines.append(f"时间:{time_str}") if event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": lines.append(f"累计降雨量:{float(rainfall)}mm") duration = condition.get("duration") if duration is not None and duration != "": lines.append(f"已持续:{duration}") elif event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None and magnitude != "": lines.append(f"震级:{float(magnitude)}级") lon = condition.get("epicenter_lon") lat = condition.get("epicenter_lat") if lon is not None and lat is not None: lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°") return "\n".join(lines) def _derive_model_params( inference: dict, batch_folder: str, template_path: str ) -> dict: """从推理结果 + 模板路径推导专题图生成所需的全部参数。""" event_type = inference["event_type"] condition = inference["condition"] occurred_time = inference["occurred_time"] map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局") map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3") zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11") zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50") # 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀) template_name = os.path.splitext(os.path.basename(template_path))[0] map_title = _build_map_title(event_type, condition, template_name) safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name) out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/") # 制图时间统一用当前日期 map_time = datetime.now().strftime("%Y-%m-%d") center_x, center_y = _extract_center_from_condition( event_type, condition ) info_text = _build_info_text(event_type, condition, occurred_time) return { "name": f"inference_{inference['id']}", "path": template_path, "outFile": out_file, "mapLayout": map_layout, "mapTitle": map_title, "mapTime": map_time, "mapUint": map_unit, "info": info_text, "centerX": center_x, "centerY": center_y, "event": str(inference["id"]), "queueId": str(inference["id"]), "zoomRule": zoom_rule, "zoomValue": zoom_value, } def _extract_center_from_condition(event_type: str, condition: dict) -> tuple: """从 condition 提取地图中心坐标""" if event_type == "earthquake": lon = condition.get("epicenter_lon", xian_center[0]) lat = condition.get("epicenter_lat", xian_center[1]) return float(lon), float(lat) else: return xian_center[0], xian_center[1] # ============================================================ # 构建 QGIS 服务配置字典 # ============================================================ def _build_qgis_config(batch_folder: str) -> dict: """构建 QGIS 服务配置(含批次输出目录)""" from app.services.qgis.qgis_env import ( get_docker_container, get_host_file_store, get_container_file_store, get_docker_project_dir, ) gpkg_dir = get_gpkg_dir() # Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行) try: result = subprocess.run( ["docker", "inspect", "--format={{.State.Running}}", get_docker_container()], capture_output=True, text=True, timeout=5, ) is_docker = result.stdout.strip() == "true" except Exception: is_docker = False if is_docker: # GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P) # 需先执行 python script/copy_gpkg_to_container.py gpkg_dir = getattr(settings, "QGIS_DOCKER_GPKG_DIR", "") or "" if not gpkg_dir: # fallback: 使用挂载路径(性能差) project_dir = get_docker_project_dir() gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg") gpkg_dir = f"{project_dir}/{gpkg_subdir}" logger.warning(f"GPKG 将从挂载目录读取(慢),建议执行 copy_gpkg_to_container.py") # batch_folder:主机路径 → 容器路径 host_fs = get_host_file_store().rstrip("/") container_fs = get_container_file_store().rstrip("/") batch_folder_container = batch_folder.replace("\\", "/") if batch_folder_container.lower().startswith(host_fs.lower()): batch_folder_container = container_fs + batch_folder_container[len(host_fs):] batch_folder = batch_folder_container logger.info(f"[Docker模式] gpkg_dir={gpkg_dir}, batch_folder={batch_folder}") else: logger.info(f"[本地模式] gpkg_dir={gpkg_dir}") return { "db": { "host": settings.DB_HOST, "port": settings.DB_PORT, "database": settings.DB_NAME, "username": settings.DB_USER, "password": settings.DB_PASSWORD, }, "qgis": { "exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300), }, "static_layers": build_static_layers_config(gpkg_dir), "batch_folder": batch_folder, } # ============================================================ # 接口实现(异步 fire-and-forget 模式) # ============================================================ def _background_export(inference_id: int) -> None: """后台线程:执行产图全流程(DB 查询 + 模板扫描 + 进程池提交)""" from app.services.qgis.qgis_env import get_docker_container from app.services.qgis.qgis_pool import qgis_pool from app.services.qgis.qgis_env import ( map_host_to_container, map_container_to_host, map_template_to_container, ) try: # 1. 查推理结果 inference = qgis_repository.query_inference_result(inference_id) event_type = inference["event_type"] batch_key = str(inference_id) file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/") output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId") output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key) batch_folder = os.path.join(file_store, output_dir).replace("\\", "/") logger.info( f"[后台] 推理结果: id={inference['id']}, " f"type={event_type}, occurred_time={inference['occurred_time']}" ) os.makedirs(batch_folder, exist_ok=True) config = _build_qgis_config(batch_folder) # 2. 扫描模板 template_base = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "app", "data", "template" ) template_dir = os.path.join(template_base, event_type) template_files = sorted([ f for f in os.listdir(template_dir) if f.endswith(".qgz") and not f.startswith("tmp") ]) priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", []) if priority_keywords: def _priority_order(name: str) -> tuple: for i, kw in enumerate(priority_keywords): if kw in name: return (0, i) return (1, name) template_files.sort(key=_priority_order) if not template_files: logger.error(f"[后台] 模板文件夹为空: {template_dir}") return # 3. 增量检查 existing = set() if os.path.isdir(batch_folder): existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")} if len(existing) == len(template_files): logger.info(f"[后台] [跳过] {len(existing)}/{len(template_files)} 张已全部产出") return missing_templates = [ f for f in template_files if os.path.splitext(f)[0] + ".jpg" not in existing ] logger.info( f"[后台] [增量] 已有{len(existing)}张, 缺{len(missing_templates)}张" ) # 4. 构建模型参数 models = [] for tpl_file in missing_templates: tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/") model = _derive_model_params(inference, batch_folder, tpl_path) logger.info( f"[后台] 模板: {tpl_path}, " f"输出: {model['outFile']}, " f"标题: {model['mapTitle']}" ) models.append(model) # 5. 路径映射到容器内 container_models = [] for m in models: cm = dict(m) if "outFile" in cm: cm["outFile"] = map_host_to_container(cm["outFile"]) if "path" in cm: cm["path"] = map_template_to_container(cm["path"]) container_models.append(cm) logger.info(f"[后台] 提交 {len(container_models)} 张图到 Worker 进程池") # 6. 提交到进程池(阻塞等待产图完成) db_lock = threading.Lock() all_results = [] def _on_progress(r: dict): if "output" in r: r["output"] = map_container_to_host(r["output"]) all_results.append(r) status = "FAIL" if "error" in r else "OK" logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}") if inference_id and file_store and "error" not in r: _write_single_path(inference_id, r["output"], file_store, db_lock) results, summary = qgis_pool.submit_job(config, container_models, _on_progress) success_count = summary.get("ok", 0) fail_count = summary.get("fail", 0) elapsed = summary.get("elapsed", 0) logger.info(f"[后台] 完成: 成功={success_count}, 失败={fail_count}, 耗时={elapsed}s") for r in all_results: if "error" not in r: logger.info(f" OK {r.get('output', 'N/A')}") else: logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}") except Exception as e: logger.error(f"[后台] 产图失败 (inferenceId={inference_id}): {e}", exc_info=True) finally: with _tasks_lock: _background_tasks.discard(inference_id) logger.info(f"[后台] 任务结束 (inferenceId={inference_id})") @router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出") async def export_map(req: QgisMapExportRequest): """ 根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。 异步模式:请求立即返回,产图在后台线程执行。 """ from app.services.qgis.qgis_env import get_docker_container # 检查 Docker 容器 try: result = subprocess.run( ["docker", "inspect", "--format={{.State.Running}}", get_docker_container()], capture_output=True, text=True, timeout=5, ) if result.stdout.strip() != "true": raise HTTPException(status_code=503, detail="QGIS Docker 容器未运行") except HTTPException: raise except Exception: raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用") inference_id = req.inferenceId # 去重:同一 inferenceId 不重复提交 with _tasks_lock: if inference_id in _background_tasks: logger.info(f"inferenceId={inference_id} 已有任务在执行,跳过") return QgisMapExportResponse( code=200, message=f"任务已在执行中,请稍后查看结果", data=str(inference_id), ) _background_tasks.add(inference_id) # 启动后台线程执行产图 thread = threading.Thread( target=_background_export, args=(inference_id,), daemon=True, name=f"qgis-export-{inference_id}", ) thread.start() logger.info(f"后台产图任务已启动 (inferenceId={inference_id})") return QgisMapExportResponse( code=200, message=f"任务已提交,正在后台生成专题图", data=str(inference_id), ) # ============================================================ # file_path 数据库记录 # ============================================================ def _relative_path(absolute_path: str, file_store: str) -> str: """绝对路径 → 相对于 FILE_STORE_DIR 的路径""" fs = file_store.replace("\\", "/").rstrip("/") ap = absolute_path.replace("\\", "/") if ap.startswith(fs + "/"): return ap[len(fs) + 1:] return ap def _write_single_path(inference_id: int, out_path: str, file_store: str, lock) -> None: """单张图产出后立即写入进度表""" if not out_path or not os.path.isfile(out_path): return rel = _relative_path(out_path, file_store) if not rel: return try: from app.repositories.qgis_repository import qgis_repository with lock: qgis_repository.insert_file_paths(inference_id, [rel]) except Exception as e: logger.error(f"[实时写入] 失败: {e}")