diff --git a/app/api/qgis_map_export.py b/app/api/qgis_map_export.py index 06a924f..bf70911 100644 --- a/app/api/qgis_map_export.py +++ b/app/api/qgis_map_export.py @@ -36,20 +36,6 @@ _locks_lock = asyncio.Lock() xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161]) -# ============================================================ -# 时间戳格式化 -# ============================================================ - -def format_disaster_time(occurred_time) -> str: - """将 occurred_time 格式化为时间戳字符串(YYYYMMDDHHmmss),作为文件夹名""" - if isinstance(occurred_time, datetime): - return occurred_time.strftime("%Y%m%d%H%M%S") - elif occurred_time: - return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14] - else: - return datetime.now().strftime("%Y%m%d%H%M%S") - - # ============================================================ # region_code → 区县名称映射 # ============================================================ @@ -58,7 +44,6 @@ REGION_CODE_MAP = settings.area.to_dict() def _resolve_district(condition: dict) -> str: - """从 condition.region_code 映射区县名称""" code = condition.get("region_code") if code and str(code) in REGION_CODE_MAP: return REGION_CODE_MAP[str(code)] @@ -66,43 +51,23 @@ def _resolve_district(condition: dict) -> str: def _build_map_title(event_type: str, condition: dict, template_name: str) -> str: - """ - 构建地图标题。 - - 格式:陕西西安{区县名称}{震级/降雨量}{模板名称} - - 地震:陕西西安临潼区5.1级地震专题图 - - 暴雨(有降雨量):陕西西安长安区120mm降雨专题图 - - 暴雨(无降雨量):陕西西安降雨专题图 - """ district = _resolve_district(condition) prefix = f"陕西西安{district}" if district else "陕西西安" - if event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None: return f"{prefix}{float(magnitude)}级{template_name}" return f"{prefix}{template_name}" - elif event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": return f"{prefix}{float(rainfall)}mm{template_name}" return f"{prefix}{template_name}" - return f"{prefix}{template_name}" def _build_info_text(event_type: str, condition: dict, occurred_time) -> str: - """ - 构建信息面板文本(左上角橙色矩形区域)。 - - 暴雨:时间 + 累计降雨量(如有)+ 已持续(如有) - 地震:时间 + 震级(如有)+ 位置(如有) - 不显示灾害类型标签(暴雨/地震)。 - """ lines = [] - - # 时间 if isinstance(occurred_time, datetime): time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分" elif occurred_time: @@ -110,26 +75,21 @@ def _build_info_text(event_type: str, condition: dict, occurred_time) -> str: else: time_str = datetime.now().strftime("%Y年%m月%d日%H时%M分") lines.append(f"时间:{time_str}") - if event_type == "rainfall": rainfall = condition.get("rainfall") if rainfall is not None and rainfall != "": lines.append(f"累计降雨量:{float(rainfall)}mm") - duration = condition.get("duration") if duration is not None and duration != "": lines.append(f"已持续:{duration}") - elif event_type == "earthquake": magnitude = condition.get("magnitude") if magnitude is not None and magnitude != "": lines.append(f"震级:{float(magnitude)}级") - lon = condition.get("epicenter_lon") lat = condition.get("epicenter_lat") if lon is not None and lat is not None: lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°") - return "\n".join(lines) @@ -235,66 +195,34 @@ async def export_map(req: QgisMapExportRequest): loop = asyncio.get_event_loop() try: - # 查询推理记录,获取 occurred_time inference = await loop.run_in_executor( None, qgis_repository.query_inference_result, inference_id ) - # 将 occurred_time 格式化为时间戳作为文件夹名 - disaster_time = format_disaster_time(inference["occurred_time"]) event_type = inference["event_type"] + batch_key = str(inference_id) - # 构建批次文件夹路径 - file_store = getattr(settings, "FILE_STORE_DIR", "G:/files") - output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:disasterTime") - output_dir = output_tmpl.replace(":eventType", event_type).replace(":disasterTime", disaster_time) + # 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名) + file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/") + output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId") + output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key) batch_folder = os.path.join(file_store, output_dir).replace("\\", "/") - # 去重检查:同一 disasterTime 只产图一次 - if os.path.isdir(batch_folder): - existing_files = [ - f for f in os.listdir(batch_folder) if f.endswith(".jpg") - ] - if existing_files and disaster_time not in _in_progress_locks: - logger.info( - f"[去重] disasterTime={disaster_time} 已产图,直接返回 " - f"({len(existing_files)} 张)" - ) - return QgisMapExportResponse( - code=200, - message=f"已存在,共{len(existing_files)}张", - data=disaster_time, - ) - - # 去重锁:同一灾害时间排队等待 + # 去重锁 async with _locks_lock: - if disaster_time not in _in_progress_locks: - _in_progress_locks[disaster_time] = asyncio.Lock() - task_lock = _in_progress_locks[disaster_time] + if batch_key not in _in_progress_locks: + _in_progress_locks[batch_key] = asyncio.Lock() + task_lock = _in_progress_locks[batch_key] async with task_lock: - if os.path.isdir(batch_folder): - existing_files = [ - f for f in os.listdir(batch_folder) if f.endswith(".jpg") - ] - if existing_files: - logger.info( - f"[去重] disasterTime={disaster_time} 已被并发请求产图完成" - ) - return QgisMapExportResponse( - code=200, - message=f"已存在,共{len(existing_files)}张", - data=disaster_time, - ) + # 精确判断在模板扫描后(比较文件数 vs 模板数) logger.info( f"推理结果查询成功: id={inference['id']}, " - f"type={inference['event_type']}, " - f"occurred_time={inference['occurred_time']}, " - f"disasterTime={disaster_time}" + f"type={event_type}, " + f"occurred_time={inference['occurred_time']}" ) - # 推导参数 + 构建配置 os.makedirs(batch_folder, exist_ok=True) config = _build_qgis_config(batch_folder) @@ -327,6 +255,31 @@ async def export_map(req: QgisMapExportRequest): if not template_files: raise FileNotFoundError(f"模板文件夹为空: {template_dir}") + # 检查已产出的图片,只生成缺失的 + existing = set() + if os.path.isdir(batch_folder): + existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")} + + if len(existing) == len(template_files): + logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出") + return QgisMapExportResponse( + code=200, + message=f"已存在,共{len(existing)}张", + data=batch_key, + ) + + missing_templates = [] + for tpl_file in template_files: + out_name = os.path.splitext(tpl_file)[0] + ".jpg" + if out_name not in existing: + missing_templates.append(tpl_file) + + if len(missing_templates) < len(template_files): + logger.info( + f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}张" + ) + template_files = missing_templates + # 构建所有模型参数(批量模式) models = [] for tpl_file in template_files: @@ -340,13 +293,13 @@ async def export_map(req: QgisMapExportRequest): ) models.append(model) - # 一次性提交所有模型到 QGIS 子进程(单次 DLL 加载) - _generate_batch_maps(models, config, disaster_time) + # 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度) + _generate_batch_maps(models, config, batch_key, inference_id, file_store) return QgisMapExportResponse( code=200, message=f"任务已完成,共{len(models)}张专题图", - data=disaster_time, + data=batch_key, ) except ValueError as e: @@ -362,11 +315,14 @@ async def export_map(req: QgisMapExportRequest): pass -def _generate_batch_maps(models: list, config: dict, disaster_time: str) -> None: - """并行启动多个 QGIS 子进程,每个处理一部分模板""" - import json, math, concurrent.futures +def _generate_batch_maps(models: list, config: dict, batch_key: str, + inference_id: int = None, file_store: str = None) -> None: + """并行启动多个 QGIS 子进程,实时读取每张图进度并写 DB""" + import json, math, concurrent.futures, subprocess, tempfile, threading + from app.services.qgis.qgis_env import ( + get_qgis_python_path, get_runner_script, build_qgis_subprocess_env, + ) - # 并行度:取配置的 worker 数和模板数的较小值 max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4) workers = min(max_workers, len(models)) chunk_size = math.ceil(len(models) / workers) @@ -382,16 +338,52 @@ def _generate_batch_maps(models: list, config: dict, disaster_time: str) -> None errors = [] all_results = [] + db_lock = threading.Lock() # 保护 DB 写入 - def _run_chunk(chunk_models: list, chunk_idx: int) -> list: - """单个子进程处理一批模板""" - return _generate_maps_subprocess(chunk_models, config, chunk_idx) + def _run_chunk(chunk_models: list, chunk_idx: int): + """单个子进程,逐张读取进度并实时写 DB""" + request = json.dumps({"config": config, "models": chunk_models}, ensure_ascii=False) + tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w", encoding="utf-8") + tmp.write(request) + tmp.close() + + qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS") + python_path = get_qgis_python_path(qgis_root) + runner = get_runner_script() + cmd = [python_path, runner, tmp.name] + + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, + env=build_qgis_subprocess_env(qgis_root), + text=True, encoding="utf-8", errors="replace", + ) + + chunk_results = [] + for line in proc.stdout: + line = line.strip() + if line.startswith("PROGRESS:"): + try: + r = json.loads(line[len("PROGRESS:"):]) + chunk_results.append(r) + # ★ 每张图产出后立即写 DB + if inference_id and file_store and "error" not in r: + _write_single_path(inference_id, r.get("output", ""), file_store, db_lock) + except json.JSONDecodeError: + pass + + proc.wait() + try: + os.remove(tmp.name) + except OSError: + pass + + if proc.returncode != 0: + raise RuntimeError(f"[子进程{chunk_idx}] 失败 (exit={proc.returncode})") + + return chunk_results with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor: - futures = { - executor.submit(_run_chunk, chunk, i): i - for i, chunk in enumerate(chunks) - } + futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)} for future in concurrent.futures.as_completed(futures): chunk_idx = futures[future] try: @@ -484,3 +476,31 @@ def _generate_maps_subprocess(chunk_models: list, config: dict, chunk_idx: int) logger.warning(f"[子进程{chunk_idx}] 无输出") return [] + + +# ============================================================ +# file_path 数据库记录 +# ============================================================ + +def _relative_path(absolute_path: str, file_store: str) -> str: + """绝对路径 → 相对于 FILE_STORE_DIR 的路径""" + fs = file_store.replace("\\", "/").rstrip("/") + ap = absolute_path.replace("\\", "/") + if ap.startswith(fs + "/"): + return ap[len(fs) + 1:] + return ap + + +def _write_single_path(inference_id: int, out_path: str, file_store: str, lock) -> None: + """单张图产出后立即写入进度表""" + if not out_path or not os.path.isfile(out_path): + return + rel = _relative_path(out_path, file_store) + if not rel: + return + try: + from app.repositories.qgis_repository import qgis_repository + with lock: + qgis_repository.insert_file_paths(inference_id, [rel]) + except Exception as e: + logger.error(f"[实时写入] 失败: {e}") diff --git a/app/repositories/qgis_repository.py b/app/repositories/qgis_repository.py index 3ee17a4..53af6ca 100644 --- a/app/repositories/qgis_repository.py +++ b/app/repositories/qgis_repository.py @@ -27,5 +27,31 @@ class QgisRepository: "condition": row["condition"] if isinstance(row["condition"], dict) else {}, } + @staticmethod + def insert_file_paths(inference_id: int, paths: list[str]) -> None: + """实时写入文件路径到进度表(唯一索引自动去重)""" + if not paths: + return + import os + sql = """ + INSERT INTO xian_inference_result_file (inference_id, file_path, file_name) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + """ + for p in paths: + name = os.path.basename(p) + db_helper.execute_update(sql, (inference_id, p, name)) + + @staticmethod + def get_file_paths(inference_id: int) -> list[str]: + """获取已产出的文件路径列表(从进度表)""" + sql = """ + SELECT file_path FROM xian_inference_result_file + WHERE inference_id = %s AND is_delete = 0 + ORDER BY create_time + """ + rows = db_helper.execute_query(sql, (inference_id,)) + return [r["file_path"] for r in rows] + qgis_repository = QgisRepository() diff --git a/app/services/qgis/qgis_runner.py b/app/services/qgis/qgis_runner.py index dcd9137..48b6fb1 100644 --- a/app/services/qgis/qgis_runner.py +++ b/app/services/qgis/qgis_runner.py @@ -113,7 +113,10 @@ def _init_qgis(): def _process_single(service, model): """处理单个模板,返回结果 dict""" name = service.generate(model) - return {"name": name, "output": model["outFile"]} + result = {"name": name, "output": model["outFile"]} + # ★ 实时进度:每完成一张图就输出到 stdout + print(f"PROGRESS:{json.dumps(result, ensure_ascii=False)}", flush=True) + return result def main(): diff --git a/settings.toml b/settings.toml index 03d3e76..904db54 100644 --- a/settings.toml +++ b/settings.toml @@ -11,7 +11,7 @@ PREDICT_PROBABILITY_THRESHOLD = 50 # 静态底图 GeoPackage 目录(相对于项目根目录) QGIS_GPKG_DIR = "app/data/gpkg" # 专题图输出子目录(相对于 FILE_STORE_DIR) -QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:disasterTime" +QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:inferenceId" # 专题图默认参数 QGIS_DEFAULTS_MAP_LAYOUT = "A3" QGIS_DEFAULTS_ZOOM_RULE = "11" @@ -83,7 +83,7 @@ FILE_STORE_DIR = "G:/files" # ============================================================ QGIS_ROOT = "D:/QGIS" # 专题图输出子目录 -QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:disasterTime" +QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:inferenceId" QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局" # ============================================================