QGIS设置允许并发

This commit is contained in:
wzy-warehouse
2026-06-24 11:22:20 +08:00
parent d402668a5c
commit cd638d9a5c
4 changed files with 679 additions and 276 deletions
+156 -272
View File
@@ -2,18 +2,16 @@
QGIS 专题图导出接口
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
- 线程池并发产图(默认 4 worker
- Worker 进程池串行产图(QGIS 只初始化一次,模板顺序处理
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
- 异步模式:HTTP 请求立即返回,产图在后台线程执行
"""
import asyncio
import concurrent.futures
import os
import re
import subprocess
import threading
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
@@ -29,9 +27,9 @@ logger = get_logger("api.qgis")
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
# 去重锁
_in_progress_locks: dict[str, asyncio.Lock] = {}
_locks_lock = asyncio.Lock()
# 后台任务跟踪(线程安全)
_background_tasks: set[int] = set()
_tasks_lock = threading.Lock()
# 西安市中心
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
@@ -212,21 +210,144 @@ def _build_qgis_config(batch_folder: str) -> dict:
# ============================================================
# 接口实现
# 接口实现(异步 fire-and-forget 模式)
# ============================================================
# 全局并发限制
import asyncio as _asyncio
_concurrent = getattr(settings, "QGIS_MAX_CONCURRENT", 2)
_qgis_semaphore = _asyncio.Semaphore(_concurrent)
def _background_export(inference_id: int) -> None:
"""后台线程:执行产图全流程(DB 查询 + 模板扫描 + 进程池提交)"""
from app.services.qgis.qgis_env import get_docker_container
from app.services.qgis.qgis_pool import qgis_pool
from app.services.qgis.qgis_env import (
map_host_to_container, map_container_to_host, map_template_to_container,
)
try:
# 1. 查推理结果
inference = qgis_repository.query_inference_result(inference_id)
event_type = inference["event_type"]
batch_key = str(inference_id)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
logger.info(
f"[后台] 推理结果: id={inference['id']}, "
f"type={event_type}, occurred_time={inference['occurred_time']}"
)
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 2. 扫描模板
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i)
return (1, name)
template_files.sort(key=_priority_order)
if not template_files:
logger.error(f"[后台] 模板文件夹为空: {template_dir}")
return
# 3. 增量检查
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[后台] [跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return
missing_templates = [
f for f in template_files
if os.path.splitext(f)[0] + ".jpg" not in existing
]
logger.info(
f"[后台] [增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
# 4. 构建模型参数
models = []
for tpl_file in missing_templates:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"[后台] 模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}"
)
models.append(model)
# 5. 路径映射到容器内
container_models = []
for m in models:
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
if "path" in cm:
cm["path"] = map_template_to_container(cm["path"])
container_models.append(cm)
logger.info(f"[后台] 提交 {len(container_models)} 张图到 Worker 进程池")
# 6. 提交到进程池(阻塞等待产图完成)
db_lock = threading.Lock()
all_results = []
def _on_progress(r: dict):
if "output" in r:
r["output"] = map_container_to_host(r["output"])
all_results.append(r)
status = "FAIL" if "error" in r else "OK"
logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock)
results, summary = qgis_pool.submit_job(config, container_models, _on_progress)
success_count = summary.get("ok", 0)
fail_count = summary.get("fail", 0)
elapsed = summary.get("elapsed", 0)
logger.info(f"[后台] 完成: 成功={success_count}, 失败={fail_count}, 耗时={elapsed}s")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
except Exception as e:
logger.error(f"[后台] 产图失败 (inferenceId={inference_id}): {e}", exc_info=True)
finally:
with _tasks_lock:
_background_tasks.discard(inference_id)
logger.info(f"[后台] 任务结束 (inferenceId={inference_id})")
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。
异步模式:请求立即返回,产图在后台线程执行。
"""
from app.services.qgis.qgis_env import get_docker_container
# 检查 Docker 容器
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
@@ -239,272 +360,35 @@ async def export_map(req: QgisMapExportRequest):
except Exception:
raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用")
async with _qgis_semaphore:
inference_id = req.inferenceId
loop = asyncio.get_event_loop()
inference_id = req.inferenceId
try:
inference = await loop.run_in_executor(
None, qgis_repository.query_inference_result, inference_id
# 去重:同一 inferenceId 不重复提交
with _tasks_lock:
if inference_id in _background_tasks:
logger.info(f"inferenceId={inference_id} 已有任务在执行,跳过")
return QgisMapExportResponse(
code=200,
message=f"任务已在执行中,请稍后查看结果",
data=str(inference_id),
)
_background_tasks.add(inference_id)
event_type = inference["event_type"]
batch_key = str(inference_id)
# 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
# 去重锁
async with _locks_lock:
if batch_key not in _in_progress_locks:
_in_progress_locks[batch_key] = asyncio.Lock()
task_lock = _in_progress_locks[batch_key]
async with task_lock:
# 精确判断在模板扫描后(比较文件数 vs 模板数)
logger.info(
f"推理结果查询成功: id={inference['id']}, "
f"type={event_type}, "
f"occurred_time={inference['occurred_time']}"
)
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 扫描模板文件夹下所有 .qgz 文件
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
# 优先模板排到前面
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i) # 优先组,按配置顺序
return (1, name) # 非优先组,按文件名
template_files.sort(key=_priority_order)
priority_count = sum(
1 for f in template_files
if any(kw in f for kw in priority_keywords)
)
logger.info(f"优先模板: {priority_count} 张排前面")
if not template_files:
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
# 检查已产出的图片,只生成缺失的
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing)}",
data=batch_key,
)
missing_templates = []
for tpl_file in template_files:
out_name = os.path.splitext(tpl_file)[0] + ".jpg"
if out_name not in existing:
missing_templates.append(tpl_file)
if len(missing_templates) < len(template_files):
logger.info(
f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
template_files = missing_templates
# 构建所有模型参数(批量模式)
models = []
for tpl_file in template_files:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}, "
f"中心: ({model['centerX']}, {model['centerY']})"
)
models.append(model)
# 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度)
_generate_batch_maps(models, config, batch_key, inference_id, file_store)
return QgisMapExportResponse(
code=200,
message=f"任务已完成,共{len(models)}张专题图",
data=batch_key,
)
except ValueError as e:
logger.warning(f"参数错误: {e}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
logger.error(f"模板文件不存在: {e}")
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
except Exception as e:
logger.error(f"专题图导出失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
finally:
pass
def _generate_batch_maps(models: list, config: dict, batch_key: str,
inference_id: int = None, file_store: str = None) -> None:
"""并行启动多个 docker exec 子进程,实时读取每张图进度并写 DB"""
import json, math, concurrent.futures, subprocess, tempfile, threading
from app.services.qgis.qgis_env import (
get_docker_project_dir, get_container_python_path, build_docker_exec_cmd,
map_host_to_container, map_container_to_host, map_template_to_container,
# 启动后台线程执行产图
thread = threading.Thread(
target=_background_export,
args=(inference_id,),
daemon=True,
name=f"qgis-export-{inference_id}",
)
thread.start()
logger.info(f"后台产图任务已启动 (inferenceId={inference_id})")
max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4)
workers = min(max_workers, len(models))
chunk_size = math.ceil(len(models) / workers)
chunks = []
for i in range(0, len(models), chunk_size):
chunks.append(models[i:i + chunk_size])
project_dir = get_docker_project_dir()
python_in_container = get_container_python_path()
runner_in_container = f"{project_dir}/app/services/qgis/qgis_runner.py"
logger.info(
f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行 docker exec "
f"(每进程 {chunk_size} 张)"
return QgisMapExportResponse(
code=200,
message=f"任务已提交,正在后台生成专题图",
data=str(inference_id),
)
errors = []
all_results = []
db_lock = threading.Lock() # 保护 DB 写入
def _run_chunk(chunk_models: list, chunk_idx: int):
"""单个 docker exec 子进程,逐张读取进度并实时写 DB"""
# 将主机路径映射为容器内路径(G:/files → /files
container_models = []
for m in chunk_models:
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
# 模板 path 映射到容器本地路径(预拷贝后绕过 9P)
if "path" in cm:
cm["path"] = map_template_to_container(cm["path"])
container_models.append(cm)
request = json.dumps({"config": config, "models": container_models}, ensure_ascii=False)
# 临时文件写到项目目录内的 tmp/ 子目录(挂载到容器 /app/tmp/),确保容器内可访问
project_dir_host = str(Path(__file__).parent.parent.parent)
tmp_dir = os.path.join(project_dir_host, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
tmp = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8",
dir=tmp_dir,
)
tmp.write(request)
tmp.close()
# 主机临时文件路径 → 容器内路径
tmp_in_container = tmp.name.replace("\\", "/")
project_root = str(Path(__file__).parent.parent.parent).replace("\\", "/")
if tmp_in_container.startswith(project_root):
tmp_in_container = tmp_in_container.replace(project_root, project_dir, 1)
cmd = build_docker_exec_cmd(python_in_container, runner_in_container, tmp_in_container)
logger.info(f"[Docker] chunk{chunk_idx} 命令: {' '.join(cmd)}")
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding="utf-8", errors="replace",
)
# 并发读取 stderr 防止管道缓冲区满导致死锁
stderr_lines = []
def _drain_stderr():
for ln in proc.stderr:
stderr_lines.append(ln)
stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
stderr_thread.start()
chunk_results = []
for line in proc.stdout:
line = line.strip()
logger.debug(f"[chunk{chunk_idx}] stdout: {line[:200]}")
if line.startswith("PROGRESS:"):
try:
r = json.loads(line[len("PROGRESS:"):])
# 容器内路径映射回主机路径(/files → G:/files
if "output" in r:
r["output"] = map_container_to_host(r["output"])
chunk_results.append(r)
status = "FAIL" if "error" in r else "OK"
logger.info(f"[chunk{chunk_idx}] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock)
except json.JSONDecodeError:
logger.warning(f"[chunk{chunk_idx}] JSON解析失败: {line[:200]}")
proc.wait()
stderr_thread.join(timeout=5)
stderr_text = "".join(stderr_lines)
if stderr_text.strip():
logger.warning(f"[docker exec chunk{chunk_idx}] stderr:\n{stderr_text[:800]}")
try:
os.remove(tmp.name)
except OSError:
pass
if proc.returncode != 0:
raise RuntimeError(
f"[docker exec chunk{chunk_idx}] 失败 (exit={proc.returncode}): "
f"{stderr_text[:300]}"
)
return chunk_results
with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor:
futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)}
for future in concurrent.futures.as_completed(futures):
chunk_idx = futures[future]
try:
results = future.result()
all_results.extend(results)
except Exception as e:
logger.error(f"[批量产图] 子进程 {chunk_idx} 失败: {e}")
errors.append(str(e))
if errors and not all_results:
raise RuntimeError(f"所有子进程均失败: {'; '.join(errors[:2])}")
success_count = sum(1 for r in all_results if "error" not in r)
fail_count = len(all_results) - success_count
logger.info(f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
if success_count == 0 and fail_count > 0:
first_err = all_results[0].get("error", "unknown")
raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}")
# ============================================================