QGIS的docker管理

This commit is contained in:
wzy-warehouse
2026-06-21 22:30:04 +08:00
parent 4e459fc203
commit 154f0a968e
11 changed files with 761 additions and 377 deletions
+107 -86
View File
@@ -10,7 +10,9 @@ import asyncio
import concurrent.futures
import os
import re
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException
@@ -156,8 +158,38 @@ def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
from app.services.qgis.qgis_env import (
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
gpkg_dir = get_gpkg_dir()
# Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行)
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
)
is_docker = result.stdout.strip() == "true"
except Exception:
is_docker = False
if is_docker:
# GPKG 目录:容器内路径 = 项目挂载目录 + GPKG 子目录
project_dir = get_docker_project_dir()
gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg")
gpkg_dir = f"{project_dir}/{gpkg_subdir}"
# batch_folder:主机路径 → 容器路径
host_fs = get_host_file_store().rstrip("/")
container_fs = get_container_file_store().rstrip("/")
batch_folder_container = batch_folder.replace("\\", "/")
if batch_folder_container.lower().startswith(host_fs.lower()):
batch_folder_container = container_fs + batch_folder_container[len(host_fs):]
batch_folder = batch_folder_container
logger.info(f"[Docker模式] gpkg_dir={gpkg_dir}, batch_folder={batch_folder}")
else:
logger.info(f"[本地模式] gpkg_dir={gpkg_dir}")
return {
"db": {
"host": settings.DB_HOST,
@@ -189,9 +221,18 @@ async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。
"""
from app.services.qgis.qgis_env import is_qgis_available
if not is_qgis_available():
raise HTTPException(status_code=503, detail="QGIS 环境不可用")
from app.services.qgis.qgis_env import get_docker_container
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
)
if result.stdout.strip() != "true":
raise HTTPException(status_code=503, detail="QGIS Docker 容器未运行")
except HTTPException:
raise
except Exception:
raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用")
async with _qgis_semaphore:
inference_id = req.inferenceId
@@ -320,10 +361,11 @@ async def export_map(req: QgisMapExportRequest):
def _generate_batch_maps(models: list, config: dict, batch_key: str,
inference_id: int = None, file_store: str = None) -> None:
"""并行启动多个 QGIS 子进程,实时读取每张图进度并写 DB"""
"""并行启动多个 docker exec 子进程,实时读取每张图进度并写 DB"""
import json, math, concurrent.futures, subprocess, tempfile, threading
from app.services.qgis.qgis_env import (
get_qgis_python_path, get_runner_script, build_qgis_subprocess_env,
get_docker_project_dir, get_container_python_path, build_docker_exec_cmd,
map_host_to_container, map_container_to_host,
)
max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4)
@@ -334,8 +376,12 @@ def _generate_batch_maps(models: list, config: dict, batch_key: str,
for i in range(0, len(models), chunk_size):
chunks.append(models[i:i + chunk_size])
project_dir = get_docker_project_dir()
python_in_container = get_container_python_path()
runner_in_container = f"{project_dir}/app/services/qgis/qgis_runner.py"
logger.info(
f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行子进程 "
f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行 docker exec "
f"(每进程 {chunk_size} 张)"
)
@@ -344,44 +390,87 @@ def _generate_batch_maps(models: list, config: dict, batch_key: str,
db_lock = threading.Lock() # 保护 DB 写入
def _run_chunk(chunk_models: list, chunk_idx: int):
"""单个子进程,逐张读取进度并实时写 DB"""
request = json.dumps({"config": config, "models": chunk_models}, ensure_ascii=False)
tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w", encoding="utf-8")
"""单个 docker exec 子进程,逐张读取进度并实时写 DB"""
# 将主机路径映射为容器内路径(G:/files → /files
container_models = []
for m in chunk_models:
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
# 模板 path 也需要映射:Windows主机路径 → 容器内路径
if "path" in cm:
cm["path"] = map_host_to_container(cm["path"])
container_models.append(cm)
request = json.dumps({"config": config, "models": container_models}, ensure_ascii=False)
# 临时文件写到项目目录内的 tmp/ 子目录(挂载到容器 /app/tmp/),确保容器内可访问
project_dir_host = str(Path(__file__).parent.parent.parent)
tmp_dir = os.path.join(project_dir_host, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
tmp = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8",
dir=tmp_dir,
)
tmp.write(request)
tmp.close()
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
python_path = get_qgis_python_path(qgis_root)
runner = get_runner_script()
cmd = [python_path, runner, tmp.name]
# 主机临时文件路径 → 容器内路径
tmp_in_container = tmp.name.replace("\\", "/")
project_root = str(Path(__file__).parent.parent.parent).replace("\\", "/")
if tmp_in_container.startswith(project_root):
tmp_in_container = tmp_in_container.replace(project_root, project_dir, 1)
cmd = build_docker_exec_cmd(python_in_container, runner_in_container, tmp_in_container)
logger.info(f"[Docker] chunk{chunk_idx} 命令: {' '.join(cmd)}")
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
env=build_qgis_subprocess_env(qgis_root),
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding="utf-8", errors="replace",
)
# 并发读取 stderr 防止管道缓冲区满导致死锁
stderr_lines = []
def _drain_stderr():
for ln in proc.stderr:
stderr_lines.append(ln)
stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
stderr_thread.start()
chunk_results = []
for line in proc.stdout:
line = line.strip()
logger.debug(f"[chunk{chunk_idx}] stdout: {line[:200]}")
if line.startswith("PROGRESS:"):
try:
r = json.loads(line[len("PROGRESS:"):])
# 容器内路径映射回主机路径(/files → G:/files
if "output" in r:
r["output"] = map_container_to_host(r["output"])
chunk_results.append(r)
# ★ 每张图产出后立即写 DB
status = "FAIL" if "error" in r else "OK"
logger.info(f"[chunk{chunk_idx}] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock)
except json.JSONDecodeError:
pass
logger.warning(f"[chunk{chunk_idx}] JSON解析失败: {line[:200]}")
proc.wait()
stderr_thread.join(timeout=5)
stderr_text = "".join(stderr_lines)
if stderr_text.strip():
logger.warning(f"[docker exec chunk{chunk_idx}] stderr:\n{stderr_text[:800]}")
try:
os.remove(tmp.name)
except OSError:
pass
if proc.returncode != 0:
raise RuntimeError(f"[子进程{chunk_idx}] 失败 (exit={proc.returncode})")
raise RuntimeError(
f"[docker exec chunk{chunk_idx}] 失败 (exit={proc.returncode}): "
f"{stderr_text[:300]}"
)
return chunk_results
@@ -412,74 +501,6 @@ def _generate_batch_maps(models: list, config: dict, batch_key: str,
raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}")
def _generate_maps_subprocess(chunk_models: list, config: dict, chunk_idx: int) -> list:
"""单个 QGIS 子进程,处理一批模板,返回结果列表"""
import json
import subprocess
import tempfile
from app.services.qgis.qgis_env import (
get_qgis_python_path, get_runner_script, build_qgis_subprocess_env,
)
request_data = json.dumps(
{"config": config, "models": chunk_models},
ensure_ascii=False,
)
tmp_json = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8"
)
tmp_json.write(request_data)
tmp_json.close()
try:
from config import settings
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
python_path = get_qgis_python_path(qgis_root)
if not python_path:
raise RuntimeError("未找到 QGIS Python 3.12 解释器")
runner_script = get_runner_script()
cmd = [python_path, runner_script, tmp_json.name]
logger.info(f"[子进程{chunk_idx}] 启动: {len(chunk_models)} 张图")
result = subprocess.run(
cmd,
capture_output=True,
timeout=600,
env=build_qgis_subprocess_env(qgis_root),
)
finally:
try:
os.remove(tmp_json.name)
except OSError:
pass
stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
parsed_output = None
if stdout_text:
for line in reversed(stdout_text.split("\n")):
line = line.strip()
if line.startswith("{"):
try:
parsed_output = json.loads(line)
break
except json.JSONDecodeError:
continue
if parsed_output is not None:
results = parsed_output.get("results", [])
ok = sum(1 for r in results if "error" not in r)
logger.info(f"[子进程{chunk_idx}] 完成: {ok}/{len(results)}")
return results
if result.returncode != 0:
stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"[子进程{chunk_idx}] 失败: {stderr_text[:200]}")
logger.warning(f"[子进程{chunk_idx}] 无输出")
return []
# ============================================================
# file_path 数据库记录