From cd638d9a5c6ba185e211df55e6e503831a5c332a Mon Sep 17 00:00:00 2001 From: wzy-warehouse <18135009705@163.com> Date: Wed, 24 Jun 2026 11:22:20 +0800 Subject: [PATCH] =?UTF-8?q?QGIS=E8=AE=BE=E7=BD=AE=E5=85=81=E8=AE=B8?= =?UTF-8?q?=E5=B9=B6=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/qgis_map_export.py | 428 +++++++++++-------------------- app/services/qgis/qgis_pool.py | 324 +++++++++++++++++++++++ app/services/qgis/qgis_worker.py | 191 ++++++++++++++ settings.toml | 12 +- 4 files changed, 679 insertions(+), 276 deletions(-) create mode 100644 app/services/qgis/qgis_pool.py create mode 100644 app/services/qgis/qgis_worker.py diff --git a/app/api/qgis_map_export.py b/app/api/qgis_map_export.py index e8d9d5e..3771978 100644 --- a/app/api/qgis_map_export.py +++ b/app/api/qgis_map_export.py @@ -2,18 +2,16 @@ QGIS 专题图导出接口 - 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名 - 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次 -- 线程池并发产图(默认 4 worker) +- Worker 进程池串行产图(QGIS 只初始化一次,模板顺序处理) - 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg - 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称} +- 异步模式:HTTP 请求立即返回,产图在后台线程执行 """ -import asyncio -import concurrent.futures import os import re import subprocess +import threading from datetime import datetime -from pathlib import Path -from typing import Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field @@ -29,9 +27,9 @@ logger = get_logger("api.qgis") router = APIRouter(prefix="/qgis", tags=["专题图导出"]) -# 去重锁 -_in_progress_locks: dict[str, asyncio.Lock] = {} -_locks_lock = asyncio.Lock() +# 后台任务跟踪(线程安全) +_background_tasks: set[int] = set() +_tasks_lock = threading.Lock() # 西安市中心 xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161]) @@ -212,21 +210,144 @@ def _build_qgis_config(batch_folder: str) -> dict: # ============================================================ -# 接口实现 +# 接口实现(异步 fire-and-forget 模式) # ============================================================ -# 全局并发限制 -import asyncio as _asyncio -_concurrent = getattr(settings, "QGIS_MAX_CONCURRENT", 2) -_qgis_semaphore = _asyncio.Semaphore(_concurrent) + +def _background_export(inference_id: int) -> None: + """后台线程:执行产图全流程(DB 查询 + 模板扫描 + 进程池提交)""" + from app.services.qgis.qgis_env import get_docker_container + from app.services.qgis.qgis_pool import qgis_pool + from app.services.qgis.qgis_env import ( + map_host_to_container, map_container_to_host, map_template_to_container, + ) + + try: + # 1. 查推理结果 + inference = qgis_repository.query_inference_result(inference_id) + event_type = inference["event_type"] + batch_key = str(inference_id) + + file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/") + output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId") + output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key) + batch_folder = os.path.join(file_store, output_dir).replace("\\", "/") + + logger.info( + f"[后台] 推理结果: id={inference['id']}, " + f"type={event_type}, occurred_time={inference['occurred_time']}" + ) + + os.makedirs(batch_folder, exist_ok=True) + config = _build_qgis_config(batch_folder) + + # 2. 扫描模板 + template_base = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + "app", "data", "template" + ) + template_dir = os.path.join(template_base, event_type) + template_files = sorted([ + f for f in os.listdir(template_dir) + if f.endswith(".qgz") and not f.startswith("tmp") + ]) + priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", []) + if priority_keywords: + def _priority_order(name: str) -> tuple: + for i, kw in enumerate(priority_keywords): + if kw in name: + return (0, i) + return (1, name) + template_files.sort(key=_priority_order) + + if not template_files: + logger.error(f"[后台] 模板文件夹为空: {template_dir}") + return + + # 3. 增量检查 + existing = set() + if os.path.isdir(batch_folder): + existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")} + + if len(existing) == len(template_files): + logger.info(f"[后台] [跳过] {len(existing)}/{len(template_files)} 张已全部产出") + return + + missing_templates = [ + f for f in template_files + if os.path.splitext(f)[0] + ".jpg" not in existing + ] + logger.info( + f"[后台] [增量] 已有{len(existing)}张, 缺{len(missing_templates)}张" + ) + + # 4. 构建模型参数 + models = [] + for tpl_file in missing_templates: + tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/") + model = _derive_model_params(inference, batch_folder, tpl_path) + logger.info( + f"[后台] 模板: {tpl_path}, " + f"输出: {model['outFile']}, " + f"标题: {model['mapTitle']}" + ) + models.append(model) + + # 5. 路径映射到容器内 + container_models = [] + for m in models: + cm = dict(m) + if "outFile" in cm: + cm["outFile"] = map_host_to_container(cm["outFile"]) + if "path" in cm: + cm["path"] = map_template_to_container(cm["path"]) + container_models.append(cm) + + logger.info(f"[后台] 提交 {len(container_models)} 张图到 Worker 进程池") + + # 6. 提交到进程池(阻塞等待产图完成) + db_lock = threading.Lock() + all_results = [] + + def _on_progress(r: dict): + if "output" in r: + r["output"] = map_container_to_host(r["output"]) + all_results.append(r) + status = "FAIL" if "error" in r else "OK" + logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}") + if inference_id and file_store and "error" not in r: + _write_single_path(inference_id, r.get("output", ""), file_store, db_lock) + + results, summary = qgis_pool.submit_job(config, container_models, _on_progress) + + success_count = summary.get("ok", 0) + fail_count = summary.get("fail", 0) + elapsed = summary.get("elapsed", 0) + logger.info(f"[后台] 完成: 成功={success_count}, 失败={fail_count}, 耗时={elapsed}s") + + for r in all_results: + if "error" not in r: + logger.info(f" OK {r.get('output', 'N/A')}") + else: + logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}") + + except Exception as e: + logger.error(f"[后台] 产图失败 (inferenceId={inference_id}): {e}", exc_info=True) + finally: + with _tasks_lock: + _background_tasks.discard(inference_id) + logger.info(f"[后台] 任务结束 (inferenceId={inference_id})") @router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出") async def export_map(req: QgisMapExportRequest): """ 根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。 + 异步模式:请求立即返回,产图在后台线程执行。 """ from app.services.qgis.qgis_env import get_docker_container + + # 检查 Docker 容器 try: result = subprocess.run( ["docker", "inspect", "--format={{.State.Running}}", get_docker_container()], @@ -239,272 +360,35 @@ async def export_map(req: QgisMapExportRequest): except Exception: raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用") - async with _qgis_semaphore: - inference_id = req.inferenceId - loop = asyncio.get_event_loop() + inference_id = req.inferenceId - try: - inference = await loop.run_in_executor( - None, qgis_repository.query_inference_result, inference_id + # 去重:同一 inferenceId 不重复提交 + with _tasks_lock: + if inference_id in _background_tasks: + logger.info(f"inferenceId={inference_id} 已有任务在执行,跳过") + return QgisMapExportResponse( + code=200, + message=f"任务已在执行中,请稍后查看结果", + data=str(inference_id), ) + _background_tasks.add(inference_id) - event_type = inference["event_type"] - batch_key = str(inference_id) - - # 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名) - file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/") - output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId") - output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key) - batch_folder = os.path.join(file_store, output_dir).replace("\\", "/") - - # 去重锁 - async with _locks_lock: - if batch_key not in _in_progress_locks: - _in_progress_locks[batch_key] = asyncio.Lock() - task_lock = _in_progress_locks[batch_key] - - async with task_lock: - # 精确判断在模板扫描后(比较文件数 vs 模板数) - - logger.info( - f"推理结果查询成功: id={inference['id']}, " - f"type={event_type}, " - f"occurred_time={inference['occurred_time']}" - ) - - os.makedirs(batch_folder, exist_ok=True) - - config = _build_qgis_config(batch_folder) - - # 扫描模板文件夹下所有 .qgz 文件 - template_base = os.path.join( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), - "app", "data", "template" - ) - template_dir = os.path.join(template_base, event_type) - template_files = sorted([ - f for f in os.listdir(template_dir) - if f.endswith(".qgz") and not f.startswith("tmp") - ]) - # 优先模板排到前面 - priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", []) - if priority_keywords: - def _priority_order(name: str) -> tuple: - for i, kw in enumerate(priority_keywords): - if kw in name: - return (0, i) # 优先组,按配置顺序 - return (1, name) # 非优先组,按文件名 - template_files.sort(key=_priority_order) - priority_count = sum( - 1 for f in template_files - if any(kw in f for kw in priority_keywords) - ) - logger.info(f"优先模板: {priority_count} 张排前面") - - if not template_files: - raise FileNotFoundError(f"模板文件夹为空: {template_dir}") - - # 检查已产出的图片,只生成缺失的 - existing = set() - if os.path.isdir(batch_folder): - existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")} - - if len(existing) == len(template_files): - logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出") - return QgisMapExportResponse( - code=200, - message=f"已存在,共{len(existing)}张", - data=batch_key, - ) - - missing_templates = [] - for tpl_file in template_files: - out_name = os.path.splitext(tpl_file)[0] + ".jpg" - if out_name not in existing: - missing_templates.append(tpl_file) - - if len(missing_templates) < len(template_files): - logger.info( - f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}张" - ) - template_files = missing_templates - - # 构建所有模型参数(批量模式) - models = [] - for tpl_file in template_files: - tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/") - model = _derive_model_params(inference, batch_folder, tpl_path) - logger.info( - f"模板: {tpl_path}, " - f"输出: {model['outFile']}, " - f"标题: {model['mapTitle']}, " - f"中心: ({model['centerX']}, {model['centerY']})" - ) - models.append(model) - - # 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度) - _generate_batch_maps(models, config, batch_key, inference_id, file_store) - - return QgisMapExportResponse( - code=200, - message=f"任务已完成,共{len(models)}张专题图", - data=batch_key, - ) - - except ValueError as e: - logger.warning(f"参数错误: {e}") - raise HTTPException(status_code=400, detail=str(e)) - except FileNotFoundError as e: - logger.error(f"模板文件不存在: {e}") - raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}") - except Exception as e: - logger.error(f"专题图导出失败: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"导出失败: {e}") - finally: - pass - - -def _generate_batch_maps(models: list, config: dict, batch_key: str, - inference_id: int = None, file_store: str = None) -> None: - """并行启动多个 docker exec 子进程,实时读取每张图进度并写 DB""" - import json, math, concurrent.futures, subprocess, tempfile, threading - from app.services.qgis.qgis_env import ( - get_docker_project_dir, get_container_python_path, build_docker_exec_cmd, - map_host_to_container, map_container_to_host, map_template_to_container, + # 启动后台线程执行产图 + thread = threading.Thread( + target=_background_export, + args=(inference_id,), + daemon=True, + name=f"qgis-export-{inference_id}", ) + thread.start() + logger.info(f"后台产图任务已启动 (inferenceId={inference_id})") - max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4) - workers = min(max_workers, len(models)) - chunk_size = math.ceil(len(models) / workers) - - chunks = [] - for i in range(0, len(models), chunk_size): - chunks.append(models[i:i + chunk_size]) - - project_dir = get_docker_project_dir() - python_in_container = get_container_python_path() - runner_in_container = f"{project_dir}/app/services/qgis/qgis_runner.py" - - logger.info( - f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行 docker exec " - f"(每进程 {chunk_size} 张)" + return QgisMapExportResponse( + code=200, + message=f"任务已提交,正在后台生成专题图", + data=str(inference_id), ) - errors = [] - all_results = [] - db_lock = threading.Lock() # 保护 DB 写入 - - def _run_chunk(chunk_models: list, chunk_idx: int): - """单个 docker exec 子进程,逐张读取进度并实时写 DB""" - # 将主机路径映射为容器内路径(G:/files → /files) - container_models = [] - for m in chunk_models: - cm = dict(m) - if "outFile" in cm: - cm["outFile"] = map_host_to_container(cm["outFile"]) - # 模板 path 映射到容器本地路径(预拷贝后绕过 9P) - if "path" in cm: - cm["path"] = map_template_to_container(cm["path"]) - container_models.append(cm) - - request = json.dumps({"config": config, "models": container_models}, ensure_ascii=False) - # 临时文件写到项目目录内的 tmp/ 子目录(挂载到容器 /app/tmp/),确保容器内可访问 - project_dir_host = str(Path(__file__).parent.parent.parent) - tmp_dir = os.path.join(project_dir_host, "tmp") - os.makedirs(tmp_dir, exist_ok=True) - tmp = tempfile.NamedTemporaryFile( - suffix=".json", delete=False, mode="w", encoding="utf-8", - dir=tmp_dir, - ) - tmp.write(request) - tmp.close() - - # 主机临时文件路径 → 容器内路径 - tmp_in_container = tmp.name.replace("\\", "/") - project_root = str(Path(__file__).parent.parent.parent).replace("\\", "/") - if tmp_in_container.startswith(project_root): - tmp_in_container = tmp_in_container.replace(project_root, project_dir, 1) - - cmd = build_docker_exec_cmd(python_in_container, runner_in_container, tmp_in_container) - logger.info(f"[Docker] chunk{chunk_idx} 命令: {' '.join(cmd)}") - - proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True, encoding="utf-8", errors="replace", - ) - - # 并发读取 stderr 防止管道缓冲区满导致死锁 - stderr_lines = [] - def _drain_stderr(): - for ln in proc.stderr: - stderr_lines.append(ln) - stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) - stderr_thread.start() - - chunk_results = [] - for line in proc.stdout: - line = line.strip() - logger.debug(f"[chunk{chunk_idx}] stdout: {line[:200]}") - if line.startswith("PROGRESS:"): - try: - r = json.loads(line[len("PROGRESS:"):]) - # 容器内路径映射回主机路径(/files → G:/files) - if "output" in r: - r["output"] = map_container_to_host(r["output"]) - chunk_results.append(r) - status = "FAIL" if "error" in r else "OK" - logger.info(f"[chunk{chunk_idx}] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}") - if inference_id and file_store and "error" not in r: - _write_single_path(inference_id, r.get("output", ""), file_store, db_lock) - except json.JSONDecodeError: - logger.warning(f"[chunk{chunk_idx}] JSON解析失败: {line[:200]}") - - proc.wait() - stderr_thread.join(timeout=5) - - stderr_text = "".join(stderr_lines) - if stderr_text.strip(): - logger.warning(f"[docker exec chunk{chunk_idx}] stderr:\n{stderr_text[:800]}") - - try: - os.remove(tmp.name) - except OSError: - pass - - if proc.returncode != 0: - raise RuntimeError( - f"[docker exec chunk{chunk_idx}] 失败 (exit={proc.returncode}): " - f"{stderr_text[:300]}" - ) - - return chunk_results - - with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor: - futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)} - for future in concurrent.futures.as_completed(futures): - chunk_idx = futures[future] - try: - results = future.result() - all_results.extend(results) - except Exception as e: - logger.error(f"[批量产图] 子进程 {chunk_idx} 失败: {e}") - errors.append(str(e)) - - if errors and not all_results: - raise RuntimeError(f"所有子进程均失败: {'; '.join(errors[:2])}") - - success_count = sum(1 for r in all_results if "error" not in r) - fail_count = len(all_results) - success_count - logger.info(f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}") - for r in all_results: - if "error" not in r: - logger.info(f" OK {r.get('output', 'N/A')}") - else: - logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}") - if success_count == 0 and fail_count > 0: - first_err = all_results[0].get("error", "unknown") - raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}") - # ============================================================ diff --git a/app/services/qgis/qgis_pool.py b/app/services/qgis/qgis_pool.py new file mode 100644 index 0000000..e2752ac --- /dev/null +++ b/app/services/qgis/qgis_pool.py @@ -0,0 +1,324 @@ +""" +QGIS Worker 进程池管理器 — 主机侧运行。 + +管理 N 个长驻 Docker Worker 进程,提供任务提交和结果回收。 + +架构: + 主机 FastAPI ←→ qgis_pool (本模块) ←→ docker exec -i ←→ qgis_worker.py (容器内) + +用法: + from app.services.qgis.qgis_pool import qgis_pool + + # 提交任务(阻塞等待完成) + results, summary = qgis_pool.submit_job(config, models, progress_callback) + + # 关闭池 + qgis_pool.shutdown() +""" +import json +import subprocess +import threading +import time +from typing import Callable, Dict, List, Optional, Tuple + +from app.config.paths import get_logger + +logger = get_logger("qgis.pool") + + +class _WorkerHandle: + """单个 Worker 进程的句柄""" + + def __init__(self, worker_id: int, proc: subprocess.Popen): + self.worker_id = worker_id + self.proc = proc + self.busy = False + self.lock = threading.Lock() + self._reader_thread: Optional[threading.Thread] = None + + def is_alive(self) -> bool: + return self.proc.poll() is None + + def mark_busy(self): + with self.lock: + self.busy = True + + def mark_free(self): + with self.lock: + self.busy = False + + def is_free(self) -> bool: + with self.lock: + return not self.busy and self.is_alive() + + +class QgisPool: + """QGIS Worker 进程池""" + + def __init__(self, pool_size: int = None): + """ + Args: + pool_size: 池中 Worker 数量,默认从配置读取 + """ + if pool_size is None: + try: + from config import settings + pool_size = getattr(settings, "QGIS_POOL_SIZE", 2) + except Exception: + pool_size = 2 + + self._pool_size = pool_size + self._workers: List[_WorkerHandle] = [] + self._lock = threading.Lock() + self._started = False + + def _ensure_started(self): + """懒启动:首次使用时启动 Worker 进程""" + if self._started: + return + with self._lock: + if self._started: + return + self._start_workers() + self._started = True + + def _start_workers(self): + """启动所有 Worker 进程""" + from app.services.qgis.qgis_env import ( + get_docker_container, + get_docker_project_dir, + get_container_python_path, + ) + + container = get_docker_container() + project_dir = get_docker_project_dir() + python_path = get_container_python_path() + worker_script = f"{project_dir}/app/services/qgis/qgis_worker.py" + + # Qt 平台设置 + try: + from config import settings + qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen") + except Exception: + qt_platform = "offscreen" + + for i in range(self._pool_size): + cmd = [ + "docker", "exec", "-i", + "-w", project_dir, + "-e", f"QT_QPA_PLATFORM={qt_platform}", + container, + python_path, worker_script, + ] + logger.info(f"[Pool] 启动 Worker-{i}: {' '.join(cmd)}") + + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + errors="replace", + ) + worker = _WorkerHandle(i, proc) + self._workers.append(worker) + + # 后台线程消费 stderr,防止管道满阻塞 + t = threading.Thread( + target=self._drain_stderr, args=(worker,), daemon=True + ) + t.start() + + logger.info(f"[Pool] 已启动 {self._pool_size} 个 Worker") + + @staticmethod + def _drain_stderr(worker: _WorkerHandle): + """持续读取 Worker stderr 防止管道缓冲区满""" + try: + for line in worker.proc.stderr: + line = line.strip() + if line: + logger.debug(f"[Worker-{worker.worker_id}] stderr: {line}") + except Exception: + pass + + def _find_free_worker(self, timeout: float = 30) -> Optional[_WorkerHandle]: + """找到一个空闲 Worker,超时返回 None""" + deadline = time.time() + timeout + while time.time() < deadline: + with self._lock: + for w in self._workers: + if w.is_free(): + return w + time.sleep(0.1) + return None + + def submit_job( + self, + config: dict, + models: list, + progress_callback: Callable[[dict], None] = None, + ) -> Tuple[list, dict]: + """ + 提交一个批次任务,阻塞等待完成。 + + Args: + config: QGIS 配置字典 + models: 模型参数列表 + progress_callback: 每张图完成时的回调(接收 PROGRESS dict) + + Returns: + (results_list, summary_dict) + """ + self._ensure_started() + + worker = self._find_free_worker(timeout=60) + if worker is None: + raise RuntimeError("[Pool] 无可用 Worker(等待 60s 超时)") + + worker.mark_busy() + try: + return self._run_job(worker, config, models, progress_callback) + finally: + # 检查 Worker 是否还活着,死了就重启 + if not worker.is_alive(): + logger.error(f"[Pool] Worker-{worker.worker_id} 已死亡,重启...") + self._restart_worker(worker) + worker.mark_free() + + def _run_job( + self, + worker: _WorkerHandle, + config: dict, + models: list, + progress_callback: Callable[[dict], None] = None, + ) -> Tuple[list, dict]: + """向指定 Worker 发送任务并读取结果""" + job = {"config": config, "models": models} + job_json = json.dumps(job, ensure_ascii=False) + "\n" + + # 写入 stdin + try: + worker.proc.stdin.write(job_json) + worker.proc.stdin.flush() + except (BrokenPipeError, OSError) as e: + raise RuntimeError(f"[Pool] 写入 Worker-{worker.worker_id} stdin 失败: {e}") + + # 读取输出(直到 JOB_DONE 或 FATAL) + results = [] + summary = {} + + while True: + line = worker.proc.stdout.readline() + if not line: + # stdout 关闭,Worker 可能已死亡 + logger.error(f"[Pool] Worker-{worker.worker_id} stdout 关闭") + summary = {"error": "Worker stdout 关闭"} + break + + line = line.strip() + if not line: + continue + + if line.startswith("PROGRESS:"): + try: + r = json.loads(line[len("PROGRESS:"):]) + results.append(r) + if progress_callback: + progress_callback(r) + except json.JSONDecodeError: + logger.warning(f"[Pool] Worker-{worker.worker_id} PROGRESS 解析失败: {line[:200]}") + + elif line.startswith("JOB_DONE:"): + try: + summary = json.loads(line[len("JOB_DONE:"):]) + except json.JSONDecodeError: + summary = {"error": f"JOB_DONE 解析失败: {line[:200]}"} + break + + elif line.startswith("FATAL:"): + try: + summary = json.loads(line[len("FATAL:"):]) + except json.JSONDecodeError: + summary = {"error": f"FATAL: {line[:200]}"} + logger.error(f"[Pool] Worker-{worker.worker_id} FATAL: {summary}") + break + + else: + logger.debug(f"[Pool] Worker-{worker.worker_id} 未知输出: {line[:200]}") + + return results, summary + + def _restart_worker(self, dead_worker: _WorkerHandle): + """重启一个死亡的 Worker""" + from app.services.qgis.qgis_env import ( + get_docker_container, + get_docker_project_dir, + get_container_python_path, + ) + + container = get_docker_container() + project_dir = get_docker_project_dir() + python_path = get_container_python_path() + worker_script = f"{project_dir}/app/services/qgis/qgis_worker.py" + + try: + from config import settings + qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen") + except Exception: + qt_platform = "offscreen" + + cmd = [ + "docker", "exec", "-i", + "-w", project_dir, + "-e", f"QT_QPA_PLATFORM={qt_platform}", + container, + python_path, worker_script, + ] + + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + errors="replace", + ) + + dead_worker.proc = proc + dead_worker.busy = False + + # 重启 stderr 消费线程 + t = threading.Thread( + target=self._drain_stderr, args=(dead_worker,), daemon=True + ) + t.start() + + logger.info(f"[Pool] Worker-{dead_worker.worker_id} 已重启") + + def shutdown(self): + """关闭所有 Worker""" + with self._lock: + for w in self._workers: + if w.proc.poll() is None: + try: + w.proc.stdin.close() + except Exception: + pass + try: + w.proc.terminate() + w.proc.wait(timeout=5) + except Exception: + try: + w.proc.kill() + except Exception: + pass + self._workers.clear() + self._started = False + logger.info("[Pool] 已关闭") + + +# 全局单例 +qgis_pool = QgisPool() diff --git a/app/services/qgis/qgis_worker.py b/app/services/qgis/qgis_worker.py new file mode 100644 index 0000000..c3efd0b --- /dev/null +++ b/app/services/qgis/qgis_worker.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +""" +QGIS 长驻 Worker — Docker 容器内运行,常驻进程模式。 + +与 qgis_runner.py 的区别: + - qgis_runner.py: 一次性进程,处理完一批任务后退出 + - qgis_worker.py: 长驻进程,初始化 QGIS 一次,循环从 stdin 读取任务处理 + +通信协议(stdin/stdout JSON Lines): + 输入: 每行一个 JSON 任务 {"config": {...}, "models": [{...}, ...]} + 输出: 每张图一行 PROGRESS PROGRESS:{"name": "...", "output": "..."} + 任务结束一行 JOB_DONE:{"total": N, "ok": M, "fail": K, "elapsed": T} + 致命错误 FATAL:{"error": "..."} + +用法: + docker exec -i qgis-server python3 /app/app/services/qgis/qgis_worker.py +""" +import json +import os +import sys +import time + + +# ============================================================ +# 环境初始化(与 qgis_runner.py 保持一致) +# ============================================================ + +def _setup_python_path(): + """将项目根目录和 QGIS dist-packages 加入 sys.path""" + project_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + ) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + try: + from config import settings + pythonpath = getattr(settings, "QGIS_DOCKER_PYTHONPATH", None) or [] + except Exception: + pythonpath = [] + + for p in pythonpath: + if os.path.isdir(p) and p not in sys.path: + sys.path.insert(0, p) + + +def _setup_environment(): + """设置 QGIS 运行所需的环境变量""" + os.environ["PYTHONUTF8"] = "1" + os.environ["GDAL_FILENAME_IS_UTF8"] = "YES" + os.environ["VSI_CACHE"] = "TRUE" + os.environ["VSI_CACHE_SIZE"] = "1000000" + + +def _init_qgis(): + """初始化 QgsApplication""" + from qgis.core import QgsApplication + + try: + from config import settings + prefix_path = getattr(settings, "QGIS_DOCKER_PREFIX_PATH", "/usr") + except Exception: + prefix_path = "/usr" + + QgsApplication.setPrefixPath(prefix_path, True) + qgs_app = QgsApplication([], False) + qgs_app.initQgis() + return qgs_app + + +# ============================================================ +# 单任务处理 +# ============================================================ + +def _emit(line: str): + """输出一行到 stdout""" + print(line, flush=True) + + +def _emit_progress(result: dict): + """输出 PROGRESS 行""" + _emit(f"PROGRESS:{json.dumps(result, ensure_ascii=False)}") + + +def _process_job(service, job: dict) -> dict: + """ + 处理一个批次任务(多个模板),返回汇总结果。 + + 每张图输出一行 PROGRESS,最后返回汇总 dict。 + """ + models = job.get("models") or [] + if not models: + return {"total": 0, "ok": 0, "fail": 0, "elapsed": 0} + + ok = 0 + fail = 0 + t_start = time.time() + + for i, model in enumerate(models): + t_model = time.time() + try: + name = service.generate(model) + result = {"name": name, "output": model["outFile"]} + _emit_progress(result) + ok += 1 + elapsed = time.time() - t_model + print( + f"[worker] [{i+1}/{len(models)}] 完成: {name}, 耗时 {elapsed:.1f}s", + file=sys.stderr, + ) + except Exception as e: + elapsed = time.time() - t_model + error_msg = f"{e}" + fail_result = {"name": model.get("name", ""), "output": "", "error": error_msg} + _emit_progress(fail_result) + fail += 1 + print( + f"[worker] [{i+1}/{len(models)}] 失败: {model.get('name', '?')}, " + f"耗时 {elapsed:.1f}s — {error_msg}", + file=sys.stderr, + ) + + total_elapsed = time.time() - t_start + summary = {"total": len(models), "ok": ok, "fail": fail, "elapsed": round(total_elapsed, 1)} + print( + f"[worker] 批次完成: {ok}成功/{fail}失败, 总耗时 {total_elapsed:.1f}s", + file=sys.stderr, + ) + return summary + + +# ============================================================ +# 主循环 +# ============================================================ + +def main(): + _setup_environment() + _setup_python_path() + + print("[worker] 正在初始化 QGIS...", file=sys.stderr) + t_init = time.time() + qgs_app = _init_qgis() + print(f"[worker] QGIS 初始化完成 ({time.time() - t_init:.1f}s),等待任务...", file=sys.stderr) + + try: + from app.services.qgis.map_service import MapService + + # 持有 MapService 实例(config 在第一个任务时设置) + service = None + + for line_no, line in enumerate(sys.stdin, 1): + line = line.strip() + if not line: + continue + + try: + job = json.loads(line) + except json.JSONDecodeError as e: + print(f"[worker] 第{line_no}行 JSON 解析失败: {e}", file=sys.stderr) + _emit(f'JOB_DONE:{{"error": "JSON解析失败: {e}"}}') + continue + + config = job.get("config") + if config: + # 更新 config(每个任务可以带不同的 config) + service = MapService(config) + + if service is None: + _emit(f'JOB_DONE:{{"error": "未提供 config"}}') + continue + + summary = _process_job(service, job) + _emit(f"JOB_DONE:{json.dumps(summary, ensure_ascii=False)}") + + # 清空项目,恢复到初始状态,等待下一个任务 + from qgis.core import QgsProject + QgsProject.instance().clear() + print("[worker] 项目已清空,等待下一个任务", file=sys.stderr) + + except KeyboardInterrupt: + print("[worker] 收到中断信号,退出", file=sys.stderr) + except Exception as e: + print(f"[worker] 致命错误: {e}", file=sys.stderr) + _emit(f'FATAL:{{"error": "{e}"}}') + finally: + qgs_app.exitQgis() + print("[worker] QGIS 已退出", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/settings.toml b/settings.toml index 291f775..df68cd9 100644 --- a/settings.toml +++ b/settings.toml @@ -23,10 +23,8 @@ QGIS_DEFAULTS_ZOOM_VALUE = "5" QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局" # 专题图DPI QGIS_EXPORT_DPI = 200 -# 并行 docker exec 子进程数 -QGIS_PARALLEL_WORKERS = 4 -# 最大并发请求数 -QGIS_MAX_CONCURRENT = 2 +# Worker 进程池大小 +QGIS_POOL_SIZE = 4 # ============================================================ # Docker QGIS 配置 # ============================================================ @@ -36,6 +34,12 @@ QGIS_DOCKER_CONTAINER = "qgis-server" QGIS_DOCKER_PROJECT_DIR = "/app" # 容器内 Python 解释器路径 QGIS_DOCKER_PYTHON = "/usr/bin/python3" +# QGIS 安装根目录 +QGIS_DOCKER_PREFIX_PATH = "/usr" +# QGIS Python 包路径(官方镜像为空列表) +QGIS_DOCKER_PYTHONPATH = [] +# Qt 无头渲染模式 +QGIS_DOCKER_QT_PLATFORM = "offscreen" # Docker 镜像名称 QGIS_DOCKER_IMAGE = "qgis/qgis:3.44.11" # 优先产出模板