""" QGIS 专题图导出接口 - 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名 - 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次 - 线程池并发产图(默认 4 worker) - 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg - 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称} """ import asyncio import concurrent.futures import os import re import subprocess from datetime import datetime from pathlib import Path from typing import Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from app.config.paths import get_logger from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir from app.repositories import qgis_repository from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest from app.utils.db_helper import db_helper from config import settings logger = get_logger("api.qgis") router = APIRouter(prefix="/qgis", tags=["专题图导出"]) # 去重锁 _in_progress_locks: dict[str, asyncio.Lock] = {} _locks_lock = asyncio.Lock() # 西安市中心 xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161]) # ============================================================ # region_code → 区县名称映射 # ============================================================ REGION_CODE_MAP = settings.area.to_dict() def _resolve_district(condition: dict) -> str: code = condition.get("region_code") if code and str(code) in REGION_CODE_MAP: return REGION_CODE_MAP[str(code)] return "" def _build_map_title(event_type: str, condition: dict, template_name: str) -> str: district = _resolve_district(condition) prefix = f"陕西西安{district}" if district else "陕西西安" if event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None: return f"{prefix}{float(magnitude)}级{template_name}" return f"{prefix}{template_name}" elif event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": return f"{prefix}{float(rainfall)}mm{template_name}" return f"{prefix}{template_name}" return f"{prefix}{template_name}" def _build_info_text(event_type: str, condition: dict, occurred_time) -> str: lines = [] if isinstance(occurred_time, datetime): time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分" elif occurred_time: time_str = str(occurred_time) else: time_str = datetime.now().strftime("%Y年%m月%d日%H时%M分") lines.append(f"时间:{time_str}") if event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": lines.append(f"累计降雨量:{float(rainfall)}mm") duration = condition.get("duration") if duration is not None and duration != "": lines.append(f"已持续:{duration}") elif event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None and magnitude != "": lines.append(f"震级:{float(magnitude)}级") lon = condition.get("epicenter_lon") lat = condition.get("epicenter_lat") if lon is not None and lat is not None: lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°") return "\n".join(lines) def _derive_model_params( inference: dict, batch_folder: str, template_path: str ) -> dict: """从推理结果 + 模板路径推导专题图生成所需的全部参数。""" event_type = inference["event_type"] condition = inference["condition"] occurred_time = inference["occurred_time"] map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局") map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3") zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11") zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50") # 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀) template_name = os.path.splitext(os.path.basename(template_path))[0] map_title = _build_map_title(event_type, condition, template_name) safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name) out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/") # 制图时间统一用当前日期 map_time = datetime.now().strftime("%Y-%m-%d") center_x, center_y = _extract_center_from_condition( event_type, condition ) info_text = _build_info_text(event_type, condition, occurred_time) return { "name": f"inference_{inference['id']}", "path": template_path, "outFile": out_file, "mapLayout": map_layout, "mapTitle": map_title, "mapTime": map_time, "mapUint": map_unit, "info": info_text, "centerX": center_x, "centerY": center_y, "event": str(inference["id"]), "queueId": str(inference["id"]), "zoomRule": zoom_rule, "zoomValue": zoom_value, } def _extract_center_from_condition(event_type: str, condition: dict) -> tuple: """从 condition 提取地图中心坐标""" if event_type == "earthquake": lon = condition.get("epicenter_lon", xian_center[0]) lat = condition.get("epicenter_lat", xian_center[1]) return float(lon), float(lat) else: return xian_center[0], xian_center[1] # ============================================================ # 构建 QGIS 服务配置字典 # ============================================================ def _build_qgis_config(batch_folder: str) -> dict: """构建 QGIS 服务配置(含批次输出目录)""" from app.services.qgis.qgis_env import ( get_docker_container, get_host_file_store, get_container_file_store, get_docker_project_dir, ) gpkg_dir = get_gpkg_dir() # Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行) try: result = subprocess.run( ["docker", "inspect", "--format={{.State.Running}}", get_docker_container()], capture_output=True, text=True, timeout=5, ) is_docker = result.stdout.strip() == "true" except Exception: is_docker = False if is_docker: # GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P) # 需先执行 python script/copy_gpkg_to_container.py gpkg_dir = getattr(settings, "QGIS_DOCKER_GPKG_DIR", "") or "" if not gpkg_dir: # fallback: 使用挂载路径(性能差) project_dir = get_docker_project_dir() gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg") gpkg_dir = f"{project_dir}/{gpkg_subdir}" logger.warning(f"GPKG 将从挂载目录读取(慢),建议执行 copy_gpkg_to_container.py") # batch_folder:主机路径 → 容器路径 host_fs = get_host_file_store().rstrip("/") container_fs = get_container_file_store().rstrip("/") batch_folder_container = batch_folder.replace("\\", "/") if batch_folder_container.lower().startswith(host_fs.lower()): batch_folder_container = container_fs + batch_folder_container[len(host_fs):] batch_folder = batch_folder_container logger.info(f"[Docker模式] gpkg_dir={gpkg_dir}, batch_folder={batch_folder}") else: logger.info(f"[本地模式] gpkg_dir={gpkg_dir}") return { "db": { "host": settings.DB_HOST, "port": settings.DB_PORT, "database": settings.DB_NAME, "username": settings.DB_USER, "password": settings.DB_PASSWORD, }, "qgis": { "exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300), }, "static_layers": build_static_layers_config(gpkg_dir), "batch_folder": batch_folder, } # ============================================================ # 接口实现 # ============================================================ # 全局并发限制 import asyncio as _asyncio _concurrent = getattr(settings, "QGIS_MAX_CONCURRENT", 2) _qgis_semaphore = _asyncio.Semaphore(_concurrent) @router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出") async def export_map(req: QgisMapExportRequest): """ 根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。 """ from app.services.qgis.qgis_env import get_docker_container try: result = subprocess.run( ["docker", "inspect", "--format={{.State.Running}}", get_docker_container()], capture_output=True, text=True, timeout=5, ) if result.stdout.strip() != "true": raise HTTPException(status_code=503, detail="QGIS Docker 容器未运行") except HTTPException: raise except Exception: raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用") async with _qgis_semaphore: inference_id = req.inferenceId loop = asyncio.get_event_loop() try: inference = await loop.run_in_executor( None, qgis_repository.query_inference_result, inference_id ) event_type = inference["event_type"] batch_key = str(inference_id) # 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名) file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/") output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId") output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key) batch_folder = os.path.join(file_store, output_dir).replace("\\", "/") # 去重锁 async with _locks_lock: if batch_key not in _in_progress_locks: _in_progress_locks[batch_key] = asyncio.Lock() task_lock = _in_progress_locks[batch_key] async with task_lock: # 精确判断在模板扫描后(比较文件数 vs 模板数) logger.info( f"推理结果查询成功: id={inference['id']}, " f"type={event_type}, " f"occurred_time={inference['occurred_time']}" ) os.makedirs(batch_folder, exist_ok=True) config = _build_qgis_config(batch_folder) # 扫描模板文件夹下所有 .qgz 文件 template_base = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "app", "data", "template" ) template_dir = os.path.join(template_base, event_type) template_files = sorted([ f for f in os.listdir(template_dir) if f.endswith(".qgz") and not f.startswith("tmp") ]) # 优先模板排到前面 priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", []) if priority_keywords: def _priority_order(name: str) -> tuple: for i, kw in enumerate(priority_keywords): if kw in name: return (0, i) # 优先组,按配置顺序 return (1, name) # 非优先组,按文件名 template_files.sort(key=_priority_order) priority_count = sum( 1 for f in template_files if any(kw in f for kw in priority_keywords) ) logger.info(f"优先模板: {priority_count} 张排前面") if not template_files: raise FileNotFoundError(f"模板文件夹为空: {template_dir}") # 检查已产出的图片,只生成缺失的 existing = set() if os.path.isdir(batch_folder): existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")} if len(existing) == len(template_files): logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出") return QgisMapExportResponse( code=200, message=f"已存在,共{len(existing)}张", data=batch_key, ) missing_templates = [] for tpl_file in template_files: out_name = os.path.splitext(tpl_file)[0] + ".jpg" if out_name not in existing: missing_templates.append(tpl_file) if len(missing_templates) < len(template_files): logger.info( f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}张" ) template_files = missing_templates # 构建所有模型参数(批量模式) models = [] for tpl_file in template_files: tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/") model = _derive_model_params(inference, batch_folder, tpl_path) logger.info( f"模板: {tpl_path}, " f"输出: {model['outFile']}, " f"标题: {model['mapTitle']}, " f"中心: ({model['centerX']}, {model['centerY']})" ) models.append(model) # 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度) _generate_batch_maps(models, config, batch_key, inference_id, file_store) return QgisMapExportResponse( code=200, message=f"任务已完成,共{len(models)}张专题图", data=batch_key, ) except ValueError as e: logger.warning(f"参数错误: {e}") raise HTTPException(status_code=400, detail=str(e)) except FileNotFoundError as e: logger.error(f"模板文件不存在: {e}") raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}") except Exception as e: logger.error(f"专题图导出失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"导出失败: {e}") finally: pass def _generate_batch_maps(models: list, config: dict, batch_key: str, inference_id: int = None, file_store: str = None) -> None: """并行启动多个 docker exec 子进程,实时读取每张图进度并写 DB""" import json, math, concurrent.futures, subprocess, tempfile, threading from app.services.qgis.qgis_env import ( get_docker_project_dir, get_container_python_path, build_docker_exec_cmd, map_host_to_container, map_container_to_host, map_template_to_container, ) max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4) workers = min(max_workers, len(models)) chunk_size = math.ceil(len(models) / workers) chunks = [] for i in range(0, len(models), chunk_size): chunks.append(models[i:i + chunk_size]) project_dir = get_docker_project_dir() python_in_container = get_container_python_path() runner_in_container = f"{project_dir}/app/services/qgis/qgis_runner.py" logger.info( f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行 docker exec " f"(每进程 {chunk_size} 张)" ) errors = [] all_results = [] db_lock = threading.Lock() # 保护 DB 写入 def _run_chunk(chunk_models: list, chunk_idx: int): """单个 docker exec 子进程,逐张读取进度并实时写 DB""" # 将主机路径映射为容器内路径(G:/files → /files) container_models = [] for m in chunk_models: cm = dict(m) if "outFile" in cm: cm["outFile"] = map_host_to_container(cm["outFile"]) # 模板 path 映射到容器本地路径(预拷贝后绕过 9P) if "path" in cm: cm["path"] = map_template_to_container(cm["path"]) container_models.append(cm) request = json.dumps({"config": config, "models": container_models}, ensure_ascii=False) # 临时文件写到项目目录内的 tmp/ 子目录(挂载到容器 /app/tmp/),确保容器内可访问 project_dir_host = str(Path(__file__).parent.parent.parent) tmp_dir = os.path.join(project_dir_host, "tmp") os.makedirs(tmp_dir, exist_ok=True) tmp = tempfile.NamedTemporaryFile( suffix=".json", delete=False, mode="w", encoding="utf-8", dir=tmp_dir, ) tmp.write(request) tmp.close() # 主机临时文件路径 → 容器内路径 tmp_in_container = tmp.name.replace("\\", "/") project_root = str(Path(__file__).parent.parent.parent).replace("\\", "/") if tmp_in_container.startswith(project_root): tmp_in_container = tmp_in_container.replace(project_root, project_dir, 1) cmd = build_docker_exec_cmd(python_in_container, runner_in_container, tmp_in_container) logger.info(f"[Docker] chunk{chunk_idx} 命令: {' '.join(cmd)}") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", ) # 并发读取 stderr 防止管道缓冲区满导致死锁 stderr_lines = [] def _drain_stderr(): for ln in proc.stderr: stderr_lines.append(ln) stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) stderr_thread.start() chunk_results = [] for line in proc.stdout: line = line.strip() logger.debug(f"[chunk{chunk_idx}] stdout: {line[:200]}") if line.startswith("PROGRESS:"): try: r = json.loads(line[len("PROGRESS:"):]) # 容器内路径映射回主机路径(/files → G:/files) if "output" in r: r["output"] = map_container_to_host(r["output"]) chunk_results.append(r) status = "FAIL" if "error" in r else "OK" logger.info(f"[chunk{chunk_idx}] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}") if inference_id and file_store and "error" not in r: _write_single_path(inference_id, r.get("output", ""), file_store, db_lock) except json.JSONDecodeError: logger.warning(f"[chunk{chunk_idx}] JSON解析失败: {line[:200]}") proc.wait() stderr_thread.join(timeout=5) stderr_text = "".join(stderr_lines) if stderr_text.strip(): logger.warning(f"[docker exec chunk{chunk_idx}] stderr:\n{stderr_text[:800]}") try: os.remove(tmp.name) except OSError: pass if proc.returncode != 0: raise RuntimeError( f"[docker exec chunk{chunk_idx}] 失败 (exit={proc.returncode}): " f"{stderr_text[:300]}" ) return chunk_results with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor: futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)} for future in concurrent.futures.as_completed(futures): chunk_idx = futures[future] try: results = future.result() all_results.extend(results) except Exception as e: logger.error(f"[批量产图] 子进程 {chunk_idx} 失败: {e}") errors.append(str(e)) if errors and not all_results: raise RuntimeError(f"所有子进程均失败: {'; '.join(errors[:2])}") success_count = sum(1 for r in all_results if "error" not in r) fail_count = len(all_results) - success_count logger.info(f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}") for r in all_results: if "error" not in r: logger.info(f" OK {r.get('output', 'N/A')}") else: logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}") if success_count == 0 and fail_count > 0: first_err = all_results[0].get("error", "unknown") raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}") # ============================================================ # file_path 数据库记录 # ============================================================ def _relative_path(absolute_path: str, file_store: str) -> str: """绝对路径 → 相对于 FILE_STORE_DIR 的路径""" fs = file_store.replace("\\", "/").rstrip("/") ap = absolute_path.replace("\\", "/") if ap.startswith(fs + "/"): return ap[len(fs) + 1:] return ap def _write_single_path(inference_id: int, out_path: str, file_store: str, lock) -> None: """单张图产出后立即写入进度表""" if not out_path or not os.path.isfile(out_path): return rel = _relative_path(out_path, file_store) if not rel: return try: from app.repositories.qgis_repository import qgis_repository with lock: qgis_repository.insert_file_paths(inference_id, [rel]) except Exception as e: logger.error(f"[实时写入] 失败: {e}")