6 Commits

Author SHA1 Message Date
wzy-warehouse 1b08f2c4a2 添加中文字体 2026-06-24 15:52:40 +08:00
wzy-warehouse 7163ca67f9 修改适配Linux 2026-06-24 14:16:23 +08:00
wzy-warehouse e5582bab5d 修改适配Linux 2026-06-24 13:03:15 +08:00
wzy-warehouse cd638d9a5c QGIS设置允许并发 2026-06-24 11:22:20 +08:00
wzy-warehouse d402668a5c QGIS的docker管理 2026-06-22 11:01:15 +08:00
wzy-warehouse 154f0a968e QGIS的docker管理 2026-06-21 22:30:04 +08:00
22 changed files with 1895 additions and 621 deletions
+1
View File
@@ -59,3 +59,4 @@ htmlcov/
# QGIS 临时模板文件
app/data/template/*/tmp*.qgz
tmp*.qgz
/tmp/
+385
View File
@@ -0,0 +1,385 @@
# QGIS Docker 部署指南
QGIS 专题图产出通过 Docker 容器执行,主进程(Python 3.10)通过 `docker exec` 调用容器内的 QGIS 环境。
## 1. 环境要求
- DockerWindows: Docker DesktopLinux: docker-ce
- 项目代码已部署到服务器
- 输出文件目录已创建
## 2. 拉取 QGIS 镜像
```bash
# 官方 QGIS 镜像(含 QGIS 3.x + Python + Qt5
docker pull qgis/qgis:3.44.11
# 验证
docker run --rm qgis/qgis:3.44.11 qgis --version
```
### 无外网环境(离线导入)
```bash
# 在有网的机器上导出
docker save qgis/qgis:3.44.11 -o qgis-34411.tar
# 传输到目标服务器后导入
docker load -i qgis-34411.tar
```
## 3. 启动 QGIS 容器
```bash
# ---- Linux / macOS ----
docker run -d \
--name qgis-server \
--restart unless-stopped \
-v "/www/wwwroot/xian_algorithm_new:/app:ro" \
-v "/www/wwwroot/xian_algorithm_new/files:/files" \
qgis/qgis:3.44.11 \
sleep infinity
# ---- Windows (cmd.exe) ----
# 注意:Windows 路径必须用正斜杠 /,不能用反斜杠 \
# 挂载目标必须是 Linux 路径(容器是 Linux
docker run -d ^
--name qgis-server ^
--restart unless-stopped ^
-v "F:/project/xian/xian_algorithm_new:/app:ro" ^
-v "G:/files:/files" ^
qgis/qgis:3.44.11 ^
sleep infinity
```
### 参数说明
| 参数 | 说明 |
|------|------|
| `--name qgis-server` | 容器名称,需与 `settings.toml``QGIS_DOCKER_CONTAINER` 一致 |
| `-v 项目代码:/app:ro` | 项目代码只读挂载,容器内路径由 `QGIS_DOCKER_PROJECT_DIR` 配置 |
| `-v 输出目录:输出目录` | 文件输出目录读写挂载,保持主机与容器路径一致 |
| `sleep infinity` | 保持容器运行,等待 `docker exec` 调用 |
## 4. 预拷贝静态数据到容器本地 FS(必须,性能关键)
WSL2 9P 文件系统随机读取极慢(GPKG 62MB 耗时 6-10s/模板,模板 ZIP 也慢),拷贝到容器内 `/data/` 后读取仅需 ~0.5s。
### 方式一:Python 脚本(推荐)
```bash
python app/script/copy_data_to_container.py
```
输出示例:
```
=== 预拷贝静态数据到容器 qgis-server ===
[GPKG] 主机目录: F:\project\xian\xian_algorithm_new\app\data\gpkg
文件数: 23, 总大小: 62.3 MB
容器目标: qgis-server:/data/gpkg
拷贝完成: 3.2s, 容器内 23 个文件
[模板] 主机目录: F:\project\xian\xian_algorithm_new\app\data\template
文件数: 34, 总大小: 45.1 MB
容器目标: qgis-server:/data/template
拷贝完成: 2.1s, 容器内 34 个文件
=== 总耗时: 5.4s ===
```
其他用法:
```bash
python app/script/copy_data_to_container.py --dry-run # 仅查看信息
python app/script/copy_data_to_container.py --only gpkg # 只拷贝 GPKG
python app/script/copy_data_to_container.py --only template # 只拷贝模板
```
### 方式二:手动 docker cp
```bash
# 清理旧文件
docker exec qgis-server rm -rf /data/gpkg /data/template
# 拷贝 GPKG
docker cp F:\project\xian\xian_algorithm_new\app\data\gpkg qgis-server:/data/gpkg
# 拷贝模板
docker cp F:\project\xian\xian_algorithm_new\app\data\template qgis-server:/data/template
# 验证
docker exec qgis-server ls -la /data/gpkg
docker exec qgis-server ls -la /data/template/rainfall
docker exec qgis-server ls -la /data/template/earthquake
```
### 何时需要重新执行
- GPKG 或模板文件有更新时
- 容器重建后(`/data` 目录丢失)
- 首次部署时
## 5. 安装中文字体(手动,必须执行)
QGIS 模板使用了 SimHei(黑体)、SimSun(宋体)、Microsoft YaHei(微软雅黑)等 Windows 中文字体,
Docker 镜像默认不包含这些字体,会导致中文全部乱码。**字体需手动安装,代码不会自动安装。**
### 5.1 准备字体文件
字体文件存放在项目根目录的 `fonts/` 目录下,已预置 4 个常用中文字体:
```
fonts/
├── simhei.ttf — 黑体(模板默认字体)
├── simsun.ttc — 宋体
├── msyh.ttc — 微软雅黑
└── msyhbd.ttc — 微软雅黑粗体
```
如果字体缺失,从 Windows 主机复制(`C:\Windows\Fonts\`):
```bash
# Linux 服务器用 SCP 从 Windows 传
scp "C:\Windows\Fonts\simhei.ttf" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\simsun.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\msyh.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\msyhbd.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
```
### 5.2 一键安装到容器
```bash
python app/script/install_fonts_to_container.py
```
输出示例:
```
=== 安装中文字体到 Docker 容器 qgis-server ===
字体目录: /www/wwwroot/xian_algorithm_new/fonts
字体文件: 4 个
- msyh.ttc (4165 KB)
- msyhbd.ttc (4167 KB)
- simhei.ttf (2637 KB)
- simsun.ttc (10126 KB)
容器目标: qgis-server:/usr/share/fonts/truetype/winfonts
[1/3] 复制字体文件...
OK msyh.ttc
OK msyhbd.ttc
OK simhei.ttf
OK simsun.ttc
[2/3] 刷新字体缓存...
OK
[3/3] 验证字体...
中文字体: ['SimHei', 'SimSun', 'Microsoft YaHei', 'Microsoft YaHei UI']
=== 完成,耗时 3.2s ===
```
其他用法:
```bash
python app/script/install_fonts_to_container.py --dry-run # 仅查看信息
python app/script/install_fonts_to_container.py --container my # 指定容器名
```
### 5.3 持久化(推荐)
容器重建后字体丢失,**强烈建议挂载 `fonts/` 目录**。
```bash
# 停掉旧容器
docker stop qgis-server && docker rm qgis-server
# 重建容器,加上字体挂载
docker run -d \
--name qgis-server \
--restart unless-stopped \
-v "/www/wwwroot/xian_algorithm_new:/app:ro" \
-v "/www/wwwroot/xian_algorithm_new/files:/files" \
-v "/www/wwwroot/xian_algorithm_new/fonts:/usr/share/fonts/truetype/winfonts:ro" \
qgis/qgis:3.44.11 \
sleep infinity
# 挂载后只需刷新一次缓存
docker exec qgis-server fc-cache -fv
```
### 5.4 无法获取 Windows 字体时的替代方案
```bash
# Linux 服务器安装开源中文字体
yum install wqy-microhei-fonts # CentOS / RHEL
apt install fonts-wqy-microhei # Debian / Ubuntu
# 将系统字体复制到项目 fonts/ 目录
cp /usr/share/fonts/wqy-microhei/wqy-microhei.ttc /www/wwwroot/xian_algorithm_new/fonts/
```
## 6. 验证容器
```bash
# 检查容器状态
docker ps
# 测试 QGIS 可用性
docker exec qgis-server python3 -c "from qgis.core import QgsApplication; print('OK')"
# 查看容器内项目代码
docker exec qgis-server ls /app/app/services/qgis/
```
## 7. 配置文件
所有 Docker 相关配置集中在 `settings.toml``[default]` 段:
```toml
[default]
# ---- Docker QGIS 配置 ----
QGIS_DOCKER_CONTAINER = "qgis-server" # 容器名称(需与 docker run --name 一致)
QGIS_DOCKER_PROJECT_DIR = "/app" # 容器内项目代码挂载路径
QGIS_DOCKER_PYTHON = "/usr/bin/python3" # 容器内 Python 解释器路径
QGIS_DOCKER_IMAGE = "qgis/qgis:3.44.11" # Docker 镜像名称
QGIS_DOCKER_PREFIX_PATH = "/usr" # QGIS prefixPath(安装根目录)
QGIS_DOCKER_PYTHONPATH = [ # QGIS Python 包路径列表
"/usr/lib/python3/dist-packages",
"/usr/lib/python3.10/dist-packages",
"/usr/lib/python3.11/dist-packages",
"/usr/lib/python3.12/dist-packages",
]
QGIS_DOCKER_QT_PLATFORM = "offscreen" # Qt 无头模式
QGIS_DOCKER_KEEP_ALIVE = "sleep infinity" # 容器保活命令
# ---- 专题图参数 ----
QGIS_GPKG_DIR = "app/data/gpkg" # GPKG 目录(相对于项目根)
QGIS_DOCKER_GPKG_DIR = "/data/gpkg" # 容器内 GPKG 本地路径(预拷贝后读取)
QGIS_DOCKER_TEMPLATE_DIR = "/data/template" # 容器内模板本地路径(预拷贝后读取)
QGIS_EXPORT_DPI = 200 # 导出 DPI
QGIS_PARALLEL_WORKERS = 4 # 并行 docker exec 子进程数
QGIS_MAX_CONCURRENT = 2 # 最大并发请求数
# ---- 文件路径 ----
FILE_STORE_DIR = "G:/files" # 主机端文件输出目录
```
### 环境变量覆盖
```bash
export QGIS_DOCKER_CONTAINER="qgis-server"
export QGIS_DOCKER_PROJECT_DIR="/app"
export QGIS_DOCKER_PYTHON="/usr/bin/python3"
```
### 跨机器适配
不同机器只需修改 `settings.toml` 中的路径配置:
| 配置项 | 开发机 (Windows) | 生产机 (Linux) |
|--------|-----------------|---------------|
| `FILE_STORE_DIR` | `"G:/files"` | `"/data"` |
| `DB_HOST` | `"47.92.216.173"` | `"10.22.245.138"` |
| `DB_PORT` | `7654` | `54321` |
## 8. 目录结构
```
项目根目录/
├── app/
│ ├── data/
│ │ ├── gpkg/ # 静态底图 GeoPackage 文件
│ │ └── template/ # 专题图模板 (.qgz)
│ ├── services/qgis/
│ │ ├── qgis_env.py # Docker 环境配置(路径映射、命令构建)
│ │ ├── qgis_runner.py # 容器内子进程入口
│ │ ├── map_service.py # 地图生成主流程
│ │ ├── template_modifier.py # 模板 XML 修改
│ │ └── map_exporter.py # 图片导出
│ └── api/
│ └── qgis_map_export.py # FastAPI 专题图导出接口
├── script/
│ └── copy_data_to_container.py # GPKG + 模板预拷贝脚本
├── config.py # Dynaconf 配置入口
├── settings.toml # 全部配置
├── requirements.txt # Python 依赖
├── QGIS_DOCKER_README.md # 本文档
└── tmp/ # 临时文件目录(容器内映射为 /app/tmp/)
```
## 9. 临时文件
- 主机端临时 JSON(批量产图配置):写入 `{项目根}/tmp/`,容器内可通过 `/app/tmp/` 访问
- 容器端临时 .qgz(修改后的模板):写入容器内 `/tmp/`,由 runner 自动清理
## 10. 故障排查
```bash
# 容器未运行
docker ps -a | grep qgis-server
# 查看容器日志
docker logs qgis-server
# 手动测试 runner
docker exec qgis-server python3 /app/app/services/qgis/qgis_runner.py --help
# 检查 QGIS 依赖
docker exec qgis-server python3 -c "
from qgis.core import QgsApplication
QgsApplication.setPrefixPath('/usr', True)
app = QgsApplication([], False)
app.initQgis()
print('QGIS', QgsApplication.version())
app.exitQgis()
"
# 检查中文字体
docker exec qgis-server python3 -c "
from PyQt5.QtGui import QFontDatabase
db = QFontDatabase()
zh = [f for f in db.families() if any(k in f for k in ['SimHei','YaHei','SimSun','WenQuanYi'])]
print('中文字体:', zh if zh else '未安装!')
"
# 检查 GPKG 文件(容器本地路径,性能关键)
docker exec qgis-server ls /app/app/data/gpkg/ # 挂载路径(慢)
docker exec qgis-server ls /data/gpkg/ # 本地路径(快,需预拷贝)
# 检查模板文件(容器本地路径)
docker exec qgis-server ls /data/template/rainfall/ # 本地路径(快)
docker exec qgis-server ls /data/template/earthquake/
# 检查临时文件目录
docker exec qgis-server ls /app/tmp/
```
### 静态数据相关问题
| 问题 | 原因 | 解决 |
|------|------|------|
| 产图慢(>60s) | GPKG/模板从挂载目录读取(9P 慢) | 执行 `python app/script/copy_data_to_container.py` |
| 日志显示 `gpkg_dir=/app/...` | 未预拷贝或 `QGIS_DOCKER_GPKG_DIR` 为空 | 检查 settings.toml 配置 |
| 容器重建后变慢 | `/data` 目录丢失 | 重新执行拷贝脚本 |
| `docker cp` 权限错误 | 容器未运行 | `docker start qgis-server` |
## 11. 工作流程
```
用户请求 POST /qgis/export/map
→ docker inspect 检查容器是否运行
→ 查询推理结果
→ 扫描模板文件夹
→ 构建配置(路径映射到容器内路径)
→ 并行启动 docker exec 子进程
→ 容器内 qgis_runner.py
→ 加载 QGIS Python 包
→ 初始化 QgsApplication
→ 逐模板处理:
→ TemplateModifier 修改模板 XML(替换连接参数、静态层→GPKG)
→ project.read() 加载模板
→ 图层过滤、缩放、文本更新
→ 导出 JPG
→ 实时写入进度表
```
+204 -294
View File
@@ -2,16 +2,16 @@
QGIS 专题图导出接口
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
- 线程池并发产图(默认 4 worker
- Worker 进程池串行产图(QGIS 只初始化一次,模板顺序处理
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
- 异步模式:HTTP 请求立即返回,产图在后台线程执行
"""
import asyncio
import concurrent.futures
import os
import re
import subprocess
import threading
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
@@ -27,9 +27,9 @@ logger = get_logger("api.qgis")
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
# 去重锁
_in_progress_locks: dict[str, asyncio.Lock] = {}
_locks_lock = asyncio.Lock()
# 后台任务跟踪(线程安全)
_background_tasks: set[int] = set()
_tasks_lock = threading.Lock()
# 西安市中心
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
@@ -156,8 +156,43 @@ def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
from app.services.qgis.qgis_env import (
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
gpkg_dir = get_gpkg_dir()
# Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行)
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
)
is_docker = result.stdout.strip() == "true"
except Exception:
is_docker = False
if is_docker:
# GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P)
# 需先执行 python script/copy_gpkg_to_container.py
gpkg_dir = getattr(settings, "QGIS_DOCKER_GPKG_DIR", "") or ""
if not gpkg_dir:
# fallback: 使用挂载路径(性能差)
project_dir = get_docker_project_dir()
gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg")
gpkg_dir = f"{project_dir}/{gpkg_subdir}"
logger.warning(f"GPKG 将从挂载目录读取(慢),建议执行 copy_gpkg_to_container.py")
# batch_folder:主机路径 → 容器路径
host_fs = get_host_file_store().rstrip("/")
container_fs = get_container_file_store().rstrip("/")
batch_folder_container = batch_folder.replace("\\", "/")
if batch_folder_container.lower().startswith(host_fs.lower()):
batch_folder_container = container_fs + batch_folder_container[len(host_fs):]
batch_folder = batch_folder_container
logger.info(f"[Docker模式] gpkg_dir={gpkg_dir}, batch_folder={batch_folder}")
else:
logger.info(f"[本地模式] gpkg_dir={gpkg_dir}")
return {
"db": {
"host": settings.DB_HOST,
@@ -175,310 +210,185 @@ def _build_qgis_config(batch_folder: str) -> dict:
# ============================================================
# 接口实现
# 接口实现(异步 fire-and-forget 模式)
# ============================================================
# 全局并发限制
import asyncio as _asyncio
_concurrent = getattr(settings, "QGIS_MAX_CONCURRENT", 2)
_qgis_semaphore = _asyncio.Semaphore(_concurrent)
def _background_export(inference_id: int) -> None:
"""后台线程:执行产图全流程(DB 查询 + 模板扫描 + 进程池提交)"""
from app.services.qgis.qgis_env import get_docker_container
from app.services.qgis.qgis_pool import qgis_pool
from app.services.qgis.qgis_env import (
map_host_to_container, map_container_to_host, map_template_to_container,
)
try:
# 1. 查推理结果
inference = qgis_repository.query_inference_result(inference_id)
event_type = inference["event_type"]
batch_key = str(inference_id)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
logger.info(
f"[后台] 推理结果: id={inference['id']}, "
f"type={event_type}, occurred_time={inference['occurred_time']}"
)
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 2. 扫描模板
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i)
return (1, name)
template_files.sort(key=_priority_order)
if not template_files:
logger.error(f"[后台] 模板文件夹为空: {template_dir}")
return
# 3. 增量检查
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[后台] [跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return
missing_templates = [
f for f in template_files
if os.path.splitext(f)[0] + ".jpg" not in existing
]
logger.info(
f"[后台] [增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
# 4. 构建模型参数
models = []
for tpl_file in missing_templates:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"[后台] 模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}"
)
models.append(model)
# 5. 路径映射到容器内
container_models = []
for m in models:
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
if "path" in cm:
cm["path"] = map_template_to_container(cm["path"])
container_models.append(cm)
logger.info(f"[后台] 提交 {len(container_models)} 张图到 Worker 进程池")
# 6. 提交到进程池(阻塞等待产图完成)
db_lock = threading.Lock()
all_results = []
def _on_progress(r: dict):
if "output" in r:
r["output"] = map_container_to_host(r["output"])
all_results.append(r)
status = "FAIL" if "error" in r else "OK"
logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r["output"], file_store, db_lock)
results, summary = qgis_pool.submit_job(config, container_models, _on_progress)
success_count = summary.get("ok", 0)
fail_count = summary.get("fail", 0)
elapsed = summary.get("elapsed", 0)
logger.info(f"[后台] 完成: 成功={success_count}, 失败={fail_count}, 耗时={elapsed}s")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
except Exception as e:
logger.error(f"[后台] 产图失败 (inferenceId={inference_id}): {e}", exc_info=True)
finally:
with _tasks_lock:
_background_tasks.discard(inference_id)
logger.info(f"[后台] 任务结束 (inferenceId={inference_id})")
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。
异步模式:请求立即返回,产图在后台线程执行。
"""
from app.services.qgis.qgis_env import is_qgis_available
if not is_qgis_available():
raise HTTPException(status_code=503, detail="QGIS 环境不可用")
async with _qgis_semaphore:
inference_id = req.inferenceId
loop = asyncio.get_event_loop()
try:
inference = await loop.run_in_executor(
None, qgis_repository.query_inference_result, inference_id
)
event_type = inference["event_type"]
batch_key = str(inference_id)
# 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
# 去重锁
async with _locks_lock:
if batch_key not in _in_progress_locks:
_in_progress_locks[batch_key] = asyncio.Lock()
task_lock = _in_progress_locks[batch_key]
async with task_lock:
# 精确判断在模板扫描后(比较文件数 vs 模板数)
logger.info(
f"推理结果查询成功: id={inference['id']}, "
f"type={event_type}, "
f"occurred_time={inference['occurred_time']}"
)
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 扫描模板文件夹下所有 .qgz 文件
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
# 优先模板排到前面
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i) # 优先组,按配置顺序
return (1, name) # 非优先组,按文件名
template_files.sort(key=_priority_order)
priority_count = sum(
1 for f in template_files
if any(kw in f for kw in priority_keywords)
)
logger.info(f"优先模板: {priority_count} 张排前面")
if not template_files:
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
# 检查已产出的图片,只生成缺失的
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing)}",
data=batch_key,
)
missing_templates = []
for tpl_file in template_files:
out_name = os.path.splitext(tpl_file)[0] + ".jpg"
if out_name not in existing:
missing_templates.append(tpl_file)
if len(missing_templates) < len(template_files):
logger.info(
f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
template_files = missing_templates
# 构建所有模型参数(批量模式)
models = []
for tpl_file in template_files:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}, "
f"中心: ({model['centerX']}, {model['centerY']})"
)
models.append(model)
# 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度)
_generate_batch_maps(models, config, batch_key, inference_id, file_store)
return QgisMapExportResponse(
code=200,
message=f"任务已完成,共{len(models)}张专题图",
data=batch_key,
)
except ValueError as e:
logger.warning(f"参数错误: {e}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
logger.error(f"模板文件不存在: {e}")
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
except Exception as e:
logger.error(f"专题图导出失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
finally:
pass
def _generate_batch_maps(models: list, config: dict, batch_key: str,
inference_id: int = None, file_store: str = None) -> None:
"""并行启动多个 QGIS 子进程,实时读取每张图进度并写 DB"""
import json, math, concurrent.futures, subprocess, tempfile, threading
from app.services.qgis.qgis_env import (
get_qgis_python_path, get_runner_script, build_qgis_subprocess_env,
)
max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4)
workers = min(max_workers, len(models))
chunk_size = math.ceil(len(models) / workers)
chunks = []
for i in range(0, len(models), chunk_size):
chunks.append(models[i:i + chunk_size])
logger.info(
f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行子进程 "
f"(每进程 {chunk_size} 张)"
)
errors = []
all_results = []
db_lock = threading.Lock() # 保护 DB 写入
def _run_chunk(chunk_models: list, chunk_idx: int):
"""单个子进程,逐张读取进度并实时写 DB"""
request = json.dumps({"config": config, "models": chunk_models}, ensure_ascii=False)
tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w", encoding="utf-8")
tmp.write(request)
tmp.close()
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
python_path = get_qgis_python_path(qgis_root)
runner = get_runner_script()
cmd = [python_path, runner, tmp.name]
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
env=build_qgis_subprocess_env(qgis_root),
text=True, encoding="utf-8", errors="replace",
)
chunk_results = []
for line in proc.stdout:
line = line.strip()
if line.startswith("PROGRESS:"):
try:
r = json.loads(line[len("PROGRESS:"):])
chunk_results.append(r)
# ★ 每张图产出后立即写 DB
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock)
except json.JSONDecodeError:
pass
proc.wait()
try:
os.remove(tmp.name)
except OSError:
pass
if proc.returncode != 0:
raise RuntimeError(f"[子进程{chunk_idx}] 失败 (exit={proc.returncode})")
return chunk_results
with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor:
futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)}
for future in concurrent.futures.as_completed(futures):
chunk_idx = futures[future]
try:
results = future.result()
all_results.extend(results)
except Exception as e:
logger.error(f"[批量产图] 子进程 {chunk_idx} 失败: {e}")
errors.append(str(e))
if errors and not all_results:
raise RuntimeError(f"所有子进程均失败: {'; '.join(errors[:2])}")
success_count = sum(1 for r in all_results if "error" not in r)
fail_count = len(all_results) - success_count
logger.info(f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
if success_count == 0 and fail_count > 0:
first_err = all_results[0].get("error", "unknown")
raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}")
def _generate_maps_subprocess(chunk_models: list, config: dict, chunk_idx: int) -> list:
"""单个 QGIS 子进程,处理一批模板,返回结果列表"""
import json
import subprocess
import tempfile
from app.services.qgis.qgis_env import (
get_qgis_python_path, get_runner_script, build_qgis_subprocess_env,
)
request_data = json.dumps(
{"config": config, "models": chunk_models},
ensure_ascii=False,
)
tmp_json = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8"
)
tmp_json.write(request_data)
tmp_json.close()
from app.services.qgis.qgis_env import get_docker_container
# 检查 Docker 容器
try:
from config import settings
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
python_path = get_qgis_python_path(qgis_root)
if not python_path:
raise RuntimeError("未找到 QGIS Python 3.12 解释器")
runner_script = get_runner_script()
cmd = [python_path, runner_script, tmp_json.name]
logger.info(f"[子进程{chunk_idx}] 启动: {len(chunk_models)} 张图")
result = subprocess.run(
cmd,
capture_output=True,
timeout=600,
env=build_qgis_subprocess_env(qgis_root),
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
)
finally:
try:
os.remove(tmp_json.name)
except OSError:
pass
if result.stdout.strip() != "true":
raise HTTPException(status_code=503, detail="QGIS Docker 容器未运行")
except HTTPException:
raise
except Exception:
raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用")
stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
parsed_output = None
if stdout_text:
for line in reversed(stdout_text.split("\n")):
line = line.strip()
if line.startswith("{"):
try:
parsed_output = json.loads(line)
break
except json.JSONDecodeError:
continue
inference_id = req.inferenceId
if parsed_output is not None:
results = parsed_output.get("results", [])
ok = sum(1 for r in results if "error" not in r)
logger.info(f"[子进程{chunk_idx}] 完成: {ok}/{len(results)}")
return results
# 去重:同一 inferenceId 不重复提交
with _tasks_lock:
if inference_id in _background_tasks:
logger.info(f"inferenceId={inference_id} 已有任务在执行,跳过")
return QgisMapExportResponse(
code=200,
message=f"任务已在执行中,请稍后查看结果",
data=str(inference_id),
)
_background_tasks.add(inference_id)
if result.returncode != 0:
stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"[子进程{chunk_idx}] 失败: {stderr_text[:200]}")
# 启动后台线程执行产图
thread = threading.Thread(
target=_background_export,
args=(inference_id,),
daemon=True,
name=f"qgis-export-{inference_id}",
)
thread.start()
logger.info(f"后台产图任务已启动 (inferenceId={inference_id})")
return QgisMapExportResponse(
code=200,
message=f"任务已提交,正在后台生成专题图",
data=str(inference_id),
)
logger.warning(f"[子进程{chunk_idx}] 无输出")
return []
# ============================================================
+8 -2
View File
@@ -82,12 +82,18 @@ GPKG_SUBDIR = "app/data/gpkg"
def get_gpkg_dir(project_root: str = None) -> str:
"""获取 GPKG 目录绝对路径"""
"""获取 GPKG 目录绝对路径(从配置读取,避免硬编码)"""
try:
from config import settings
gpkg_subdir = getattr(settings, "QGIS_GPKG_DIR", None) or GPKG_SUBDIR
except Exception:
gpkg_subdir = GPKG_SUBDIR
if project_root is None:
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
gpkg_dir = os.path.join(project_root, GPKG_SUBDIR)
gpkg_dir = os.path.join(project_root, gpkg_subdir)
return os.path.normpath(gpkg_dir).replace("\\", "/")
+17 -17
View File
@@ -39,26 +39,26 @@ class AppLauncher:
check_virtualenv(self.project_root)
# 检查是否正在使用虚拟环境运行
import platform
import sys
venv_path = self.project_root / ".venv"
os_name = platform.system()
if os_name == 'Windows':
venv_python = venv_path / "Scripts" / "python.exe"
else: # Linux/Mac
venv_python = venv_path / "bin" / "python3"
# 如果当前不是使用虚拟环境的Python,则重新启动
current_python = Path(sys.executable).resolve()
venv_python_resolved = venv_python.resolve()
if current_python != venv_python_resolved:
# sys.prefix != sys.base_prefix 是 Python 检测 venv 的标准方式
# 不依赖路径解析,Windows/Linux 均适用
in_venv = hasattr(sys, 'real_prefix') or (
hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix
)
if not in_venv:
import platform
venv_path = self.project_root / ".venv"
os_name = platform.system()
if os_name == 'Windows':
venv_python = venv_path / "Scripts" / "python.exe"
else: # Linux/Mac
venv_python = venv_path / "bin" / "python3"
print("\n" + "=" * 50)
print("检测到未使用虚拟环境,正在切换到虚拟环境...")
print("=" * 50)
# 使用虚拟环境的Python重新启动应用(不传递参数避免重复检查)
import subprocess
cmd = [str(venv_python)] + sys.argv
subprocess.run(cmd, check=True)
+19 -19
View File
@@ -6,8 +6,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from app.utils.api_deps import get_rainfall_model, get_earthquake_model, is_model_loaded
from app.schemas.api_schemas import HealthResponse
from app.utils.api_deps import get_rainfall_model, get_earthquake_model
from app.config.paths import get_logger
from config import settings
@@ -22,17 +21,21 @@ async def lifespan(app: FastAPI):
get_earthquake_model()
logger.info("DBN模型预加载完成")
# 检测 QGIS 子进程环境(产图时按需启动子进程)
qgis_root = getattr(settings, "QGIS_ROOT", None)
if qgis_root:
try:
from app.services.qgis.qgis_env import is_qgis_available
if is_qgis_available(qgis_root):
logger.info("QGIS 环境检测通过")
else:
logger.warning("QGIS 环境不可用,专题图功能降级")
except Exception as e:
logger.error(f"QGIS 环境检测失败: {e}")
# 检测 Docker QGIS 容器是否可用
try:
import subprocess
from app.services.qgis.qgis_env import get_docker_container
container = get_docker_container()
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", container],
capture_output=True, text=True, timeout=5,
)
if result.stdout.strip() == "true":
logger.info(f"Docker QGIS 容器 '{container}' 运行中")
else:
logger.warning(f"Docker QGIS 容器 '{container}' 未运行,专题图功能降级")
except Exception as e:
logger.error(f"Docker QGIS 检测失败: {e}")
yield
@@ -53,6 +56,9 @@ def create_app() -> FastAPI:
start = time.time()
response = await call_next(request)
elapsed = time.time() - start
# 静默健康检查探针
if request.url.path == "/" and request.method == "GET":
return response
logger.info(f"{request.method} {request.url.path} -> {response.status_code} ({elapsed:.3f}s)")
return response
@@ -60,10 +66,4 @@ def create_app() -> FastAPI:
from app.api import register_routers
register_routers(application)
@application.get("/health", response_model=HealthResponse, tags=["系统"])
async def health_check():
"""健康检查"""
status = is_model_loaded()
return HealthResponse(status="ok", **status)
return application
+159
View File
@@ -0,0 +1,159 @@
"""
将主机端 GPKG 和模板文件预拷贝到 Docker 容器本地文件系统。
WSL2 9P 文件系统随机读取极慢(GPKG 62MB 耗时 6-10s,模板 ZIP 也慢),
拷贝到容器内 /data/ 后读取仅需 ~0.5s。
用法:
python app/script/copy_data_to_container.py [--container qgis-server] [--dry-run] [--only gpkg|template]
前置条件:
Docker 容器已启动(docker start qgis-server
"""
import argparse
import os
import subprocess
import sys
import time
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
def _load_config():
"""从 settings.toml 读取配置"""
try:
from config import settings
return {
"gpkg_subdir": getattr(settings, "QGIS_GPKG_DIR", "app/data/gpkg"),
"template_subdir": "app/data/template",
"container": getattr(settings, "QGIS_DOCKER_CONTAINER", "qgis-server"),
"container_gpkg": getattr(settings, "QGIS_DOCKER_GPKG_DIR", "/data/gpkg"),
"container_template": getattr(settings, "QGIS_DOCKER_TEMPLATE_DIR", "/data/template"),
}
except ImportError:
pass
# fallback: 解析 TOML
cfg = {
"gpkg_subdir": "app/data/gpkg",
"template_subdir": "app/data/template",
"container": "qgis-server",
"container_gpkg": "/data/gpkg",
"container_template": "/data/template",
}
toml_path = PROJECT_ROOT / "settings.toml"
if toml_path.exists():
for line in toml_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
for key, prefix in [
("gpkg_subdir", "QGIS_GPKG_DIR"),
("container", "QGIS_DOCKER_CONTAINER"),
("container_gpkg", "QGIS_DOCKER_GPKG_DIR"),
("container_template", "QGIS_DOCKER_TEMPLATE_DIR"),
]:
if line.startswith(prefix) and "=" in line:
cfg[key] = line.split("=", 1)[1].strip().strip('"').strip("'")
return cfg
def _run(cmd, timeout=10, **kwargs):
"""subprocess.run 封装,统一 UTF-8 编码,避免 Windows GBK 报错"""
result = subprocess.run(
cmd, capture_output=True, timeout=timeout,
encoding="utf-8", errors="replace", **kwargs,
)
return result
def _check_container(container: str):
"""检查容器是否运行"""
result = _run(["docker", "inspect", "--format={{.State.Running}}", container], timeout=5)
if (result.stdout or "").strip() != "true":
raise RuntimeError(f"容器 {container} 未运行,请先 docker start {container}")
def _copy_dir_to_container(host_dir: Path, container: str, container_dest: str, label: str, dry_run: bool):
"""拷贝单个目录到容器"""
if not host_dir.is_dir():
print(f" [{label}] 目录不存在,跳过: {host_dir}")
return 0
files = list(host_dir.rglob("*"))
files = [f for f in files if f.is_file()]
if not files:
print(f" [{label}] 目录为空,跳过: {host_dir}")
return 0
total_size = sum(f.stat().st_size for f in files)
print(f" [{label}] 主机目录: {host_dir}")
print(f" 文件数: {len(files)}, 总大小: {total_size / 1024 / 1024:.1f} MB")
print(f" 容器目标: {container}:{container_dest}")
if dry_run:
print(f" [dry-run] 跳过")
return 0
# 确保目标目录存在
parent_dir = container_dest.rsplit("/", 1)[0]
_run(["docker", "exec", container, "mkdir", "-p", parent_dir])
# 清理旧目录
_run(["docker", "exec", container, "rm", "-rf", container_dest])
# docker cp
t0 = time.time()
result = _run(["docker", "cp", str(host_dir), f"{container}:{container_dest}"], timeout=120)
elapsed = time.time() - t0
if result.returncode != 0:
raise RuntimeError(f"[{label}] docker cp 失败: {result.stderr}")
# 验证
verify = _run(["docker", "exec", container, "find", container_dest, "-type", "f"])
stdout = (verify.stdout or "").strip()
count = len([l for l in stdout.splitlines() if l.strip()])
print(f" 拷贝完成: {elapsed:.1f}s, 容器内 {count} 个文件")
return elapsed
def copy_to_container(container: str = None, dry_run: bool = False, only: str = None):
"""将 GPKG 和模板拷贝到容器本地文件系统"""
cfg = _load_config()
if container is None:
container = cfg["container"]
_check_container(container)
t_total = time.time()
print(f"=== 预拷贝静态数据到容器 {container} ===\n")
if only != "template":
host_gpkg = PROJECT_ROOT / cfg["gpkg_subdir"]
_copy_dir_to_container(host_gpkg, container, cfg["container_gpkg"], "GPKG", dry_run)
print()
if only != "gpkg":
host_template = PROJECT_ROOT / cfg["template_subdir"]
_copy_dir_to_container(host_template, container, cfg["container_template"], "模板", dry_run)
elapsed = time.time() - t_total
print(f"\n=== 总耗时: {elapsed:.1f}s ===")
def main():
parser = argparse.ArgumentParser(description="将 GPKG 和模板预拷贝到 Docker 容器本地 FS")
parser.add_argument("--container", default=None, help="Docker 容器名称")
parser.add_argument("--dry-run", action="store_true", help="仅显示信息,不实际拷贝")
parser.add_argument("--only", choices=["gpkg", "template"], help="只拷贝指定类型")
args = parser.parse_args()
try:
copy_to_container(container=args.container, dry_run=args.dry_run, only=args.only)
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+159
View File
@@ -0,0 +1,159 @@
"""
将项目 fonts/ 目录下的中文字体安装到 Docker QGIS 容器。
QGIS 官方 Docker 镜像不包含中文字体,模板中的 SimHei/SimSun/YaHei 字体会显示为方块。
本脚本将 fonts/ 目录下的字体文件复制到容器内并刷新字体缓存。
用法:
python app/script/install_fonts_to_container.py [--container qgis-server] [--dry-run]
前置条件:
- Docker 容器已启动(docker start qgis-server
- 项目根目录下 fonts/ 目录包含所需的 .ttf/.ttc 字体文件
"""
import argparse
import subprocess
import sys
import time
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
FONTS_DIR = PROJECT_ROOT / "fonts"
# 容器内字体目录
CONTAINER_FONT_DIR = "/usr/share/fonts/truetype/winfonts"
def _run(cmd, timeout=10, **kwargs):
"""subprocess.run 封装,统一 UTF-8 编码"""
return subprocess.run(
cmd, capture_output=True, timeout=timeout,
encoding="utf-8", errors="replace", **kwargs,
)
def _load_config():
"""从 settings.toml 读取配置"""
try:
from config import settings
return {
"container": getattr(settings, "QGIS_DOCKER_CONTAINER", "qgis-server"),
}
except ImportError:
pass
cfg = {"container": "qgis-server"}
toml_path = PROJECT_ROOT / "settings.toml"
if toml_path.exists():
for line in toml_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line.startswith("QGIS_DOCKER_CONTAINER") and "=" in line:
cfg["container"] = line.split("=", 1)[1].strip().strip('"').strip("'")
return cfg
def _check_container(container: str):
"""检查容器是否运行"""
result = _run(["docker", "inspect", "--format={{.State.Running}}", container], timeout=5)
if (result.stdout or "").strip() != "true":
raise RuntimeError(f"容器 {container} 未运行,请先 docker start {container}")
def install_fonts(container: str = None, dry_run: bool = False):
"""将 fonts/ 目录下的字体安装到容器"""
cfg = _load_config()
if container is None:
container = cfg["container"]
_check_container(container)
# 扫描字体文件
if not FONTS_DIR.is_dir():
print(f"字体目录不存在: {FONTS_DIR}")
print(f"请先在项目根目录下创建 fonts/ 目录,并放入 .ttf/.ttc 字体文件。")
print(f"Windows 字体路径: C:\\Windows\\Fonts\\")
print(f" simhei.ttf — 黑体(模板默认字体)")
print(f" simsun.ttc — 宋体")
print(f" msyh.ttc — 微软雅黑")
print(f" msyhbd.ttc — 微软雅黑粗体")
sys.exit(1)
font_files = [f for f in FONTS_DIR.iterdir()
if f.is_file() and f.suffix.lower() in (".ttf", ".ttc", ".otf")]
if not font_files:
print(f"字体目录为空或无字体文件: {FONTS_DIR}")
print(f"请放入 .ttf/.ttc/.otf 字体文件后重试。")
sys.exit(1)
print(f"=== 安装中文字体到 Docker 容器 {container} ===\n")
print(f" 字体目录: {FONTS_DIR}")
print(f" 字体文件: {len(font_files)}")
for f in sorted(font_files):
size_kb = f.stat().st_size / 1024
print(f" - {f.name} ({size_kb:.0f} KB)")
print(f" 容器目标: {container}:{CONTAINER_FONT_DIR}")
print()
if dry_run:
print(" [dry-run] 跳过安装")
return
# 1. 在容器内创建字体目录
t0 = time.time()
_run(["docker", "exec", container, "mkdir", "-p", CONTAINER_FONT_DIR])
# 2. 逐个复制字体文件到容器
print(" [1/3] 复制字体文件...")
for f in sorted(font_files):
result = _run(
["docker", "cp", str(f), f"{container}:{CONTAINER_FONT_DIR}/{f.name}"],
timeout=30,
)
if result.returncode != 0:
print(f" FAIL {f.name}: {result.stderr.strip()}")
else:
print(f" OK {f.name}")
# 3. 刷新字体缓存
print("\n [2/3] 刷新字体缓存...")
result = _run(["docker", "exec", container, "fc-cache", "-fv"], timeout=30)
if result.returncode != 0:
print(f" WARN fc-cache 输出: {result.stderr.strip()}")
else:
print(" OK")
# 4. 验证字体
print("\n [3/3] 验证字体...")
verify_script = (
"from PyQt5.QtGui import QFontDatabase; "
"db = QFontDatabase(); "
"zh = [f for f in db.families() if any(k in f for k in "
"['SimHei','YaHei','SimSun','WenQuanYi','Noto Sans CJK'])]; "
"print('中文字体:', zh if zh else '未安装!')"
)
result = _run(
["docker", "exec", container, "python3", "-c", verify_script],
timeout=10,
)
print(f" {(result.stdout or '').strip()}")
elapsed = time.time() - t0
print(f"\n=== 完成,耗时 {elapsed:.1f}s ===")
def main():
parser = argparse.ArgumentParser(description="将中文字体安装到 Docker QGIS 容器")
parser.add_argument("--container", default=None, help="Docker 容器名称")
parser.add_argument("--dry-run", action="store_true", help="仅显示信息,不实际安装")
args = parser.parse_args()
try:
install_fonts(container=args.container, dry_run=args.dry_run)
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+2 -2
View File
@@ -27,5 +27,5 @@ __all__ = [
# 不在顶层导入 QGIS 依赖模块,避免主进程崩溃
# 使用方式:
# from app.services.qgis.map_service import MapService (仅在子进程中)
# from app.services.qgis.qgis_env import is_qgis_available (主进程安全)
# from app.services.qgis.map_service import MapService (仅在容器子进程中)
# from app.services.qgis.qgis_env import get_docker_container (主进程检查容器状态)
+103 -33
View File
@@ -26,62 +26,132 @@ import time
# 环境初始化(必须在 QGIS import 之前)
# ============================================================
QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
IS_WINDOWS = sys.platform == "win32"
def _detect_qgis_root():
"""自动检测 QGIS 安装根目录(跨平台)"""
root = os.environ.get("QGIS_ROOT")
if root and os.path.isdir(root):
return root
if IS_WINDOWS:
for candidate in ["D:/QGIS", "C:/OSGeo4W", "C:/QGIS"]:
if os.path.isdir(candidate):
return candidate
else:
for candidate in ["/usr", "/opt/QGIS", "/home/QGIS", "/usr/local"]:
if _is_valid_qgis_root_linux(candidate):
return candidate
return None
def _is_valid_qgis_root_linux(path: str) -> bool:
"""验证 Linux 路径是否为有效的 QGIS 安装根目录"""
if not os.path.isdir(path):
return False
return (
os.path.isdir(os.path.join(path, "share", "qgis"))
or any(os.path.isdir(os.path.join(path, "apps", n)) for n in ("qgis-ltr", "qgis"))
or os.path.isfile(os.path.join(path, "bin", "qgis"))
)
QGIS_ROOT = _detect_qgis_root()
def _detect_qgis_app_dir():
"""自动检测 QGIS 应用目录(跨平台)"""
if QGIS_ROOT is None:
raise RuntimeError("未检测到 QGIS 安装目录,请设置 QGIS_ROOT 环境变量")
if IS_WINDOWS:
for name in ("qgis-ltr", "qgis"):
d = os.path.join(QGIS_ROOT, "apps", name)
if os.path.isdir(d):
return d
raise RuntimeError(
f"未找到 QGIS 应用目录: {QGIS_ROOT}\\apps\\qgis-ltr 或 qgis"
)
# Linux: 先检查独立安装器的 app 目录
for name in ("qgis-ltr", "qgis"):
d = os.path.join(QGIS_ROOT, "apps", name)
if os.path.isdir(d):
return d
raise RuntimeError(f"未找到 QGIS 应用目录: {QGIS_ROOT}\\apps\\qgis-ltr 或 qgis")
# Linux apt 安装:返回 prefix 目录
for prefix in ("/usr", "/opt/QGIS"):
if os.path.isdir(os.path.join(prefix, "share", "qgis")):
return prefix
raise RuntimeError(
f"未找到 QGIS 应用目录: {QGIS_ROOT} 下无 apps/qgis-ltr 或 share/qgis"
)
def _setup_environment():
"""设置 QGIS 运行所需的环境变量和 DLL 搜索路径(跨平台)"""
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
if sys.platform == "win32":
import ctypes
qgis_app_dir = _detect_qgis_app_dir()
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
if os.path.isdir(dll_dir):
os.add_dll_directory(dll_dir)
if not IS_WINDOWS:
return
_preload_dlls = [
"qgis_core.dll", "qgispython.dll",
"Qt5Core.dll", "Qt5Gui.dll", "Qt5Widgets.dll",
"Qt5Network.dll", "Qt5Svg.dll", "Qt5Xml.dll",
"Qt5Concurrent.dll", "Qt5PrintSupport.dll",
]
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
for dll_name in _preload_dlls:
dll_path = os.path.join(dll_dir, dll_name)
if os.path.isfile(dll_path):
try:
ctypes.WinDLL(dll_path)
except OSError:
pass
# ── Windows: DLL 预加载 ──
import ctypes
qgis_app_dir = _detect_qgis_app_dir()
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
if os.path.isdir(dll_dir):
os.add_dll_directory(dll_dir)
_preload_dlls = [
"qgis_core.dll", "qgispython.dll",
"Qt5Core.dll", "Qt5Gui.dll", "Qt5Widgets.dll",
"Qt5Network.dll", "Qt5Svg.dll", "Qt5Xml.dll",
"Qt5Concurrent.dll", "Qt5PrintSupport.dll",
]
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
for dll_name in _preload_dlls:
dll_path = os.path.join(dll_dir, dll_name)
if os.path.isfile(dll_path):
try:
ctypes.WinDLL(dll_path)
except OSError:
pass
def _setup_python_path():
"""将项目根目录和 QGIS Python 路径加入 sys.path(跨平台)"""
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
if project_root not in sys.path:
sys.path.insert(0, project_root)
qgis_app_dir = _detect_qgis_app_dir()
qgis_python = os.path.join(qgis_app_dir, "python")
if os.path.isdir(qgis_python) and qgis_python not in sys.path:
sys.path.insert(0, qgis_python)
if IS_WINDOWS:
qgis_python = os.path.join(qgis_app_dir, "python")
if os.path.isdir(qgis_python) and qgis_python not in sys.path:
sys.path.insert(0, qgis_python)
else:
for p in [
"/usr/lib/python3/dist-packages",
"/usr/lib/python3.10/dist-packages",
"/usr/lib/python3.11/dist-packages",
"/usr/lib/python3.12/dist-packages",
]:
if os.path.isdir(p) and p not in sys.path:
sys.path.insert(0, p)
def _scan_templates() -> list[str]:
+229 -168
View File
@@ -1,202 +1,263 @@
"""
QGIS 环境检测与子进程配置模块。
QGIS 环境配置模块 — Docker 模式
主进程运行在 Python 3.10,无法直接加载 QGIS 的 Python 3.12 C 扩展。
所有 QGIS 操作通过 subprocess 调用 QGIS Python 3.12 执行
所有 QGIS 操作通过 docker exec 在容器内执行,
主进程(Python 3.10)无需安装 QGIS 或处理 DLL 加载
容器内使用 qgis/qgis 官方镜像(QGIS 3.x + Python 3)。
本模块提供:
- get_qgis_python_path(): 检测 QGIS Python 3.12 解释器路径
- build_qgis_subprocess_env(): 构建子进程完整环境变量(替代 bat 包装器)
- is_qgis_available(): 检查 QGIS 是否可用
- get_runner_script(): 获取 qgis_runner.py 的路径
- get_docker_container(): 获取 Docker 容器名称
- get_docker_project_dir(): 获取容器内项目根目录
- build_docker_exec_cmd(): 构建 docker exec 命令
- build_docker_volume_mounts(): 构建 docker run 卷挂载参数
- get_runner_script(): 获取 qgis_runner.py 的绝对路径
- get_container_python_path(): 获取容器内 Python 解释器路径
"""
import os
import platform
from pathlib import Path
from app.config.paths import get_logger
logger = get_logger("qgis.env")
IS_WINDOWS = platform.system() == "Windows"
def get_qgis_python_path(qgis_root: str = None) -> str | None:
# ============================================================
# Docker 容器配置
# ============================================================
def get_docker_container() -> str:
"""获取 Docker 容器名称/ID"""
try:
from config import settings
container = getattr(settings, "QGIS_DOCKER_CONTAINER", "") or ""
if container.strip():
return container.strip()
except Exception:
pass
# 环境变量覆盖
env_container = os.environ.get("QGIS_DOCKER_CONTAINER", "")
if env_container.strip():
return env_container.strip()
return "qgis-server"
def get_docker_project_dir() -> str:
"""获取容器内项目根目录(代码挂载目标路径)"""
try:
from config import settings
project_dir = getattr(settings, "QGIS_DOCKER_PROJECT_DIR", "") or ""
if project_dir.strip():
return project_dir.strip()
except Exception:
pass
env_dir = os.environ.get("QGIS_DOCKER_PROJECT_DIR", "")
if env_dir.strip():
return env_dir.strip()
return "/app"
def get_container_python_path() -> str:
"""获取容器内 Python 解释器路径"""
try:
from config import settings
python_path = getattr(settings, "QGIS_DOCKER_PYTHON", "") or ""
if python_path.strip():
return python_path.strip()
except Exception:
pass
env_path = os.environ.get("QGIS_DOCKER_PYTHON", "")
if env_path.strip():
return env_path.strip()
# qgis/qgis 官方镜像默认路径
return "/usr/bin/python3"
# ============================================================
# docker exec 命令构建
# ============================================================
def build_docker_exec_cmd(python_path: str, runner_script: str, json_file: str) -> list:
"""
检测 QGIS 自带的 Python 3.12 解释器路径
构建 docker exec 命令
Windows: {QGIS_ROOT}/apps/Python312/python3.exe
Args:
python_path: 容器内 Python 路径(如 /usr/bin/python3
runner_script: 容器内 runner 脚本路径
json_file: 容器内 JSON 请求文件路径
Returns:
解释器绝对路径,不存在则返回 None
docker exec 命令列表
"""
if qgis_root is None:
qgis_root = _detect_qgis_root()
if qgis_root is None:
return None
container = get_docker_container()
project_dir = get_docker_project_dir()
import platform
if platform.system() == "Windows":
for name in ("python3.exe", "python.exe"):
candidate = os.path.join(qgis_root, "apps", "Python312", name)
if os.path.isfile(candidate):
logger.info(f"检测到 QGIS Python: {candidate}")
return candidate
logger.warning(f"未找到 QGIS Python 3.12")
return None
else:
import shutil
# Qt 平台插件(从配置读取,避免硬编码)
try:
from config import settings
qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen")
except Exception:
qt_platform = "offscreen"
# 优先检查环境变量
env_python = os.environ.get("QGIS_PYTHON_PATH")
if env_python and os.path.isfile(env_python):
logger.info(f"QGIS_PYTHON_PATH 指定: {env_python}")
return env_python
# 搜索标准 Linux QGIS 安装路径
linux_paths = [
"/usr/bin/qgis", # Ubuntu/Debian apt 安装
"/usr/bin/qgis.bin",
"/opt/QGIS/apps/Python312/bin/python3", # 独立安装器
"/opt/QGIS/apps/Python3/bin/python3",
"/usr/libexec/qgis/python3", # Fedora/RHEL
]
for candidate in linux_paths:
if os.path.isfile(candidate):
logger.info(f"检测到 QGIS 可执行文件: {candidate}")
return candidate
# 回退到系统 Python(如果 qgis.core 可导入)
sys_python = shutil.which("python3") or shutil.which("python")
if sys_python:
logger.info(f"Linux 环境,使用系统 Python: {sys_python}")
return sys_python
return None
cmd = [
"docker", "exec",
"-w", project_dir,
"-e", f"QT_QPA_PLATFORM={qt_platform}",
container,
python_path, runner_script, json_file,
]
return cmd
def build_qgis_subprocess_env(qgis_root: str = None) -> dict:
# ============================================================
# docker run 卷挂载(首次启动容器用)
# ============================================================
def get_host_file_store() -> str:
"""获取主机端文件输出目录"""
try:
from config import settings
fs = getattr(settings, "FILE_STORE_DIR", "") or ""
if fs.strip():
return fs.strip().replace("\\", "/")
except Exception:
pass
return "G:/files"
def get_container_file_store() -> str:
"""获取容器内文件输出目录(Linux 路径)"""
# 挂载时主机 G:/files → 容器内 /files
host = get_host_file_store()
# 取最后一段目录名作为容器内路径
tail = host.rstrip("/").rsplit("/", 1)[-1]
return f"/{tail}"
def map_host_to_container(path: str) -> str:
"""将主机路径映射为容器内路径。
优先匹配 file_storeG:/files → /files),其次匹配项目根目录(F:/project/... → /app)。
"""
构建 QGIS Python 3.12 子进程的完整环境变量(替代 bat 包装器)。
normalized = path.replace("\\", "/")
策略:
1. 从 os.environ 继承,移除 venv 污染(PYTHONPATH/VIRTUAL_ENV 等)
2. 将 QGIS 的 bin 目录前置到 PATHQt5→qgis→gdal 顺序,与原 bat 一致)
3. 设置 PYTHONPATH / QGIS_PREFIX_PATH / QT_PLUGIN_PATH / GDAL_DATA / PROJ_DATA
4. 子进程 _setup_environment() 负责 os.add_dll_directory + ctypes 预加载
# 1. file_store 路径映射
host_fs = get_host_file_store()
container_fs = get_container_file_store()
if normalized.lower().startswith(host_fs.lower()):
return container_fs + normalized[len(host_fs):]
# 2. 项目根目录映射
project_root = str(Path(__file__).parent.parent.parent.parent).replace("\\", "/")
project_dir = get_docker_project_dir()
if normalized.lower().startswith(project_root.lower()):
return project_dir + normalized[len(project_root):]
return normalized
def map_container_to_host(path: str) -> str:
"""将容器内路径映射回主机路径"""
host = get_host_file_store()
container = get_container_file_store()
if path.startswith(container):
return host + path[len(container):]
return path
def map_template_to_container(host_path: str) -> str:
"""将主机端模板路径映射到容器内本地路径(预拷贝后绕过 9P)。
主机: F:/project/xian/xian_algorithm_new/app/data/template/rainfall/xxx.qgz
容器: /data/template/rainfall/xxx.qgz
"""
import platform
try:
from config import settings
container_tpl = getattr(settings, "QGIS_DOCKER_TEMPLATE_DIR", "") or ""
except Exception:
container_tpl = ""
# 继承当前环境,清理 venv 污染
env = dict(os.environ)
venv_root = env.get("VIRTUAL_ENV", "").lower()
if not container_tpl:
# fallback: 用通用映射(走 9P,慢)
return map_host_to_container(host_path)
for key in (
"PYTHONPATH",
"PYTHONHOME",
"VIRTUAL_ENV",
"PYTHONDONTWRITEBYTECODE",
):
env.pop(key, None)
normalized = host_path.replace("\\", "/").lower()
# 找 "app/data/template/" 后面的部分
marker = "app/data/template/"
idx = normalized.find(marker)
if idx >= 0:
relative = host_path.replace("\\", "/")[idx + len(marker):]
return f"{container_tpl.rstrip('/')}/{relative}"
if venv_root:
path_parts = env.get("PATH", "").split(";")
clean = []
for p in path_parts:
if not p or venv_root in p.lower():
continue
clean.append(p)
env["PATH"] = ";".join(clean)
# 检测 QGIS 安装路径
if qgis_root is None:
qgis_root = _detect_qgis_root() or "D:/QGIS"
env["QGIS_ROOT"] = qgis_root
if platform.system() != "Windows":
return env
qgis_app_dir = os.path.join(qgis_root, "apps", "qgis-ltr")
if not os.path.isdir(qgis_app_dir):
qgis_app_dir = os.path.join(qgis_root, "apps", "qgis")
if not os.path.isdir(qgis_app_dir):
raise RuntimeError(
f"未找到 QGIS 应用目录: {qgis_root}\\apps\\qgis-ltr 或 qgis"
)
# 前置 QGIS bin 目录到 PATHQt5 必须在最前面,QGIS 依赖它)
qgis_bin = os.path.join(qgis_app_dir, "bin")
qt5_bin = os.path.join(qgis_root, "apps", "Qt5", "bin")
gdal_lib = os.path.join(qgis_root, "apps", "gdal", "lib")
qt5_plugins = os.path.join(qgis_root, "apps", "Qt5", "plugins")
qtplugins = os.path.join(qgis_app_dir, "qtplugins")
qgis_python_dir = os.path.join(qgis_app_dir, "python")
env["PATH"] = f"{qt5_bin};{qgis_bin};{gdal_lib};{env.get('PATH', '')}"
# QGIS 核心环境变量(与 bat 包装器保持一致)
env["PYTHONPATH"] = qgis_python_dir
env["QGIS_PREFIX_PATH"] = qgis_app_dir
env["QT_PLUGIN_PATH"] = f"{qtplugins};{qt5_plugins}"
# GDAL / PROJ 数据目录(避免系统旧版 proj.db 干扰)
gdal_data = os.path.join(qgis_root, "apps", "gdal", "share", "gdal")
if os.path.isdir(gdal_data):
env["GDAL_DATA"] = gdal_data
for pd in (
os.path.join(qgis_app_dir, "share", "proj"),
os.path.join(qgis_root, "share", "proj"),
os.path.join(qgis_root, "apps", "gdal", "share", "proj"),
):
if os.path.isfile(os.path.join(pd, "proj.db")):
env["PROJ_DATA"] = pd
break
# UTF-8 / GDAL 编码辅助变量
env["PYTHONUTF8"] = "1"
env["GDAL_FILENAME_IS_UTF8"] = "YES"
env["VSI_CACHE"] = "TRUE"
env["VSI_CACHE_SIZE"] = "1000000"
logger.debug(
f"QGIS subprocess env built: root={qgis_root}, app={qgis_app_dir}, "
f"PATH prefixed with qgis_bin/qt5_bin/gdal_lib"
)
return env
return map_host_to_container(host_path)
def is_qgis_available(qgis_root: str = None) -> bool:
"""检查 QGIS 环境是否可用"""
return get_qgis_python_path(qgis_root) is not None
def build_docker_volume_mounts() -> list:
"""
构建 Docker 卷挂载参数列表(用于 docker run)。
挂载内容:
1. 项目代码目录 → 容器内 /app(只读)
2. 输出文件目录 → 容器内 /files(读写)
"""
project_root = str(Path(__file__).parent.parent.parent.parent)
host_file_store = get_host_file_store()
container_file_store = get_container_file_store()
project_dir = get_docker_project_dir()
mounts = [
f"{project_root}:{project_dir}:ro",
f"{host_file_store}:{container_file_store}",
]
return mounts
def build_docker_run_cmd(image: str = None) -> list:
"""
构建 docker run 启动命令(首次部署用)。
Returns:
docker run 命令列表
"""
if image is None:
try:
from config import settings
image = getattr(settings, "QGIS_DOCKER_IMAGE", "") or ""
if not image.strip():
image = "qgis/qgis:latest"
except Exception:
image = "qgis/qgis:latest"
# 容器保活命令(从配置读取)
try:
from config import settings
keep_alive = getattr(settings, "QGIS_DOCKER_KEEP_ALIVE", "sleep infinity")
except Exception:
keep_alive = "sleep infinity"
container = get_docker_container()
mounts = build_docker_volume_mounts()
cmd = [
"docker", "run", "-d",
"--name", container,
"--restart", "unless-stopped",
]
for m in mounts:
cmd.extend(["-v", m])
cmd.append(image)
# 保活命令
cmd.extend(keep_alive.split())
return cmd
# ============================================================
# Runner 脚本路径
# ============================================================
def get_runner_script() -> str:
"""获取 qgis_runner.py 的绝对路径"""
"""获取 qgis_runner.py 的绝对路径(主机端)"""
return str(Path(__file__).parent / "qgis_runner.py")
def _detect_qgis_root() -> str | None:
"""
自动检测 QGIS 安装根目录。
优先级:
1. 环境变量 QGIS_ROOT
2. Windows 默认路径 D:/QGIS
3. Linux 常见路径
"""
env_root = os.environ.get("QGIS_ROOT")
if env_root and os.path.isdir(env_root):
return env_root
import platform
if platform.system() == "Windows":
for candidate in ["D:/QGIS", "C:/OSGeo4W", "C:/QGIS"]:
if os.path.isdir(candidate):
logger.info(f"检测到 QGIS 根目录: {candidate}")
return candidate
else:
for candidate in ["/usr", "/opt/QGIS", "/home/QGIS"]:
if os.path.isdir(candidate):
logger.info(f"检测到 QGIS 根目录: {candidate}")
return candidate
logger.warning("未检测到 QGIS 安装目录")
return None
+324
View File
@@ -0,0 +1,324 @@
"""
QGIS Worker 进程池管理器 — 主机侧运行。
管理 N 个长驻 Docker Worker 进程,提供任务提交和结果回收。
架构:
主机 FastAPI ←→ qgis_pool (本模块) ←→ docker exec -i ←→ qgis_worker.py (容器内)
用法:
from app.services.qgis.qgis_pool import qgis_pool
# 提交任务(阻塞等待完成)
results, summary = qgis_pool.submit_job(config, models, progress_callback)
# 关闭池
qgis_pool.shutdown()
"""
import json
import subprocess
import threading
import time
from typing import Callable, Dict, List, Optional, Tuple
from app.config.paths import get_logger
logger = get_logger("qgis.pool")
class _WorkerHandle:
"""单个 Worker 进程的句柄"""
def __init__(self, worker_id: int, proc: subprocess.Popen):
self.worker_id = worker_id
self.proc = proc
self.busy = False
self.lock = threading.Lock()
self._reader_thread: Optional[threading.Thread] = None
def is_alive(self) -> bool:
return self.proc.poll() is None
def mark_busy(self):
with self.lock:
self.busy = True
def mark_free(self):
with self.lock:
self.busy = False
def is_free(self) -> bool:
with self.lock:
return not self.busy and self.is_alive()
class QgisPool:
"""QGIS Worker 进程池"""
def __init__(self, pool_size: int = None):
"""
Args:
pool_size: 池中 Worker 数量,默认从配置读取
"""
if pool_size is None:
try:
from config import settings
pool_size = getattr(settings, "QGIS_POOL_SIZE", 2)
except Exception:
pool_size = 2
self._pool_size = pool_size
self._workers: List[_WorkerHandle] = []
self._lock = threading.Lock()
self._started = False
def _ensure_started(self):
"""懒启动:首次使用时启动 Worker 进程"""
if self._started:
return
with self._lock:
if self._started:
return
self._start_workers()
self._started = True
def _start_workers(self):
"""启动所有 Worker 进程"""
from app.services.qgis.qgis_env import (
get_docker_container,
get_docker_project_dir,
get_container_python_path,
)
container = get_docker_container()
project_dir = get_docker_project_dir()
python_path = get_container_python_path()
worker_script = f"{project_dir}/app/services/qgis/qgis_worker.py"
# Qt 平台设置
try:
from config import settings
qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen")
except Exception:
qt_platform = "offscreen"
for i in range(self._pool_size):
cmd = [
"docker", "exec", "-i",
"-w", project_dir,
"-e", f"QT_QPA_PLATFORM={qt_platform}",
container,
python_path, worker_script,
]
logger.info(f"[Pool] 启动 Worker-{i}: {' '.join(cmd)}")
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
worker = _WorkerHandle(i, proc)
self._workers.append(worker)
# 后台线程消费 stderr,防止管道满阻塞
t = threading.Thread(
target=self._drain_stderr, args=(worker,), daemon=True
)
t.start()
logger.info(f"[Pool] 已启动 {self._pool_size} 个 Worker")
@staticmethod
def _drain_stderr(worker: _WorkerHandle):
"""持续读取 Worker stderr 防止管道缓冲区满"""
try:
for line in worker.proc.stderr:
line = line.strip()
if line:
logger.info(f"[Worker-{worker.worker_id}] {line}")
except Exception:
pass
def _find_free_worker(self, timeout: float = 30) -> Optional[_WorkerHandle]:
"""找到一个空闲 Worker,超时返回 None"""
deadline = time.time() + timeout
while time.time() < deadline:
with self._lock:
for w in self._workers:
if w.is_free():
return w
time.sleep(0.1)
return None
def submit_job(
self,
config: dict,
models: list,
progress_callback: Callable[[dict], None] = None,
) -> Tuple[list, dict]:
"""
提交一个批次任务,阻塞等待完成。
Args:
config: QGIS 配置字典
models: 模型参数列表
progress_callback: 每张图完成时的回调(接收 PROGRESS dict
Returns:
(results_list, summary_dict)
"""
self._ensure_started()
worker = self._find_free_worker(timeout=60)
if worker is None:
raise RuntimeError("[Pool] 无可用 Worker(等待 60s 超时)")
worker.mark_busy()
try:
return self._run_job(worker, config, models, progress_callback)
finally:
# 检查 Worker 是否还活着,死了就重启
if not worker.is_alive():
logger.error(f"[Pool] Worker-{worker.worker_id} 已死亡,重启...")
self._restart_worker(worker)
worker.mark_free()
def _run_job(
self,
worker: _WorkerHandle,
config: dict,
models: list,
progress_callback: Callable[[dict], None] = None,
) -> Tuple[list, dict]:
"""向指定 Worker 发送任务并读取结果"""
job = {"config": config, "models": models}
job_json = json.dumps(job, ensure_ascii=False) + "\n"
# 写入 stdin
try:
worker.proc.stdin.write(job_json)
worker.proc.stdin.flush()
except (BrokenPipeError, OSError) as e:
raise RuntimeError(f"[Pool] 写入 Worker-{worker.worker_id} stdin 失败: {e}")
# 读取输出(直到 JOB_DONE 或 FATAL
results = []
summary = {}
while True:
line = worker.proc.stdout.readline()
if not line:
# stdout 关闭,Worker 可能已死亡
logger.error(f"[Pool] Worker-{worker.worker_id} stdout 关闭")
summary = {"error": "Worker stdout 关闭"}
break
line = line.strip()
if not line:
continue
if line.startswith("PROGRESS:"):
try:
r = json.loads(line[len("PROGRESS:"):])
results.append(r)
if progress_callback:
progress_callback(r)
except json.JSONDecodeError:
logger.warning(f"[Pool] Worker-{worker.worker_id} PROGRESS 解析失败: {line[:200]}")
elif line.startswith("JOB_DONE:"):
try:
summary = json.loads(line[len("JOB_DONE:"):])
except json.JSONDecodeError:
summary = {"error": f"JOB_DONE 解析失败: {line[:200]}"}
break
elif line.startswith("FATAL:"):
try:
summary = json.loads(line[len("FATAL:"):])
except json.JSONDecodeError:
summary = {"error": f"FATAL: {line[:200]}"}
logger.error(f"[Pool] Worker-{worker.worker_id} FATAL: {summary}")
break
else:
logger.debug(f"[Pool] Worker-{worker.worker_id} 未知输出: {line[:200]}")
return results, summary
def _restart_worker(self, dead_worker: _WorkerHandle):
"""重启一个死亡的 Worker"""
from app.services.qgis.qgis_env import (
get_docker_container,
get_docker_project_dir,
get_container_python_path,
)
container = get_docker_container()
project_dir = get_docker_project_dir()
python_path = get_container_python_path()
worker_script = f"{project_dir}/app/services/qgis/qgis_worker.py"
try:
from config import settings
qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen")
except Exception:
qt_platform = "offscreen"
cmd = [
"docker", "exec", "-i",
"-w", project_dir,
"-e", f"QT_QPA_PLATFORM={qt_platform}",
container,
python_path, worker_script,
]
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
dead_worker.proc = proc
dead_worker.busy = False
# 重启 stderr 消费线程
t = threading.Thread(
target=self._drain_stderr, args=(dead_worker,), daemon=True
)
t.start()
logger.info(f"[Pool] Worker-{dead_worker.worker_id} 已重启")
def shutdown(self):
"""关闭所有 Worker"""
with self._lock:
for w in self._workers:
if w.proc.poll() is None:
try:
w.proc.stdin.close()
except Exception:
pass
try:
w.proc.terminate()
w.proc.wait(timeout=5)
except Exception:
try:
w.proc.kill()
except Exception:
pass
self._workers.clear()
self._started = False
logger.info("[Pool] 已关闭")
# 全局单例
qgis_pool = QgisPool()
+32 -63
View File
@@ -1,9 +1,8 @@
#!/usr/bin/env python3
"""
QGIS 专题图生成子进程入口。
QGIS 专题图生成子进程入口 — Docker 容器内运行
由主进程 (Python 3.10) 通过 subprocess 调用,
运行在 QGIS 自带的 Python 3.12 环境中。
由主进程通过 docker exec 调用,运行在 QGIS Docker 容器内。
支持两种模式:
- 批量模式(推荐):单次启动 QgsApplication,顺序处理多个模板
@@ -23,102 +22,70 @@ import sys
import time
# ============================================================
# 环境初始化(必须在任何 QGIS/Qt import 之前
# 环境初始化(Docker 容器内,QGIS 通过 apt 安装在 /usr
# ============================================================
QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
def _detect_qgis_app_dir():
"""自动检测 QGIS 应用目录(qgis-ltr 或 qgis"""
for name in ("qgis-ltr", "qgis"):
d = os.path.join(QGIS_ROOT, "apps", name)
if os.path.isdir(d):
return d
raise RuntimeError(
f"未找到 QGIS 应用目录: {QGIS_ROOT}\\apps\\qgis-ltr 或 qgis"
)
def _setup_python_path():
"""将项目根目录和 QGIS Python 路径加入 sys.path"""
"""将项目根目录和 QGIS dist-packages 加入 sys.path"""
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
if project_root not in sys.path:
sys.path.insert(0, project_root)
qgis_app_dir = _detect_qgis_app_dir()
qgis_python = os.path.join(qgis_app_dir, "python")
if os.path.isdir(qgis_python) and qgis_python not in sys.path:
sys.path.insert(0, qgis_python)
# QGIS Python 包路径(从配置读取,避免硬编码)
try:
from config import settings
pythonpath = getattr(settings, "QGIS_DOCKER_PYTHONPATH", None) or []
except Exception:
pythonpath = []
for p in pythonpath:
if os.path.isdir(p) and p not in sys.path:
sys.path.insert(0, p)
def _setup_environment():
"""设置 QGIS 运行所需的环境变量和 DLL 搜索路径。
QGIS_PREFIX_PATH / QT_PLUGIN_PATH / GDAL_DATA / PROJ_DATA
已由主进程通过 build_qgis_subprocess_env() 设置,子进程不重复设。
这里注册 DLL 搜索目录并预加载核心 DLL 以确保正确加载顺序。
"""
"""设置 QGIS 运行所需的环境变量"""
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
if sys.platform == "win32":
import ctypes
qgis_app_dir = _detect_qgis_app_dir()
dll_dirs = [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]
for dll_dir in dll_dirs:
if os.path.isdir(dll_dir):
os.add_dll_directory(dll_dir)
# 预加载核心 DLL —— 强制从 QGIS 目录加载,防止系统 PATH 中同名 DLL 干扰
_preload_dlls = [
"qgis_core.dll", "qgispython.dll",
"Qt5Core.dll", "Qt5Gui.dll", "Qt5Widgets.dll",
"Qt5Network.dll", "Qt5Svg.dll", "Qt5Xml.dll",
"Qt5Concurrent.dll", "Qt5PrintSupport.dll",
]
for dll_dir in dll_dirs:
for dll_name in _preload_dlls:
dll_path = os.path.join(dll_dir, dll_name)
if os.path.isfile(dll_path):
try:
ctypes.WinDLL(dll_path)
except OSError:
pass
# ============================================================
# 主逻辑
# ============================================================
def _init_qgis():
"""初始化 QgsApplication只做一次"""
"""初始化 QgsApplication从配置读取 prefixPath,避免硬编码"""
from qgis.core import QgsApplication
qgis_app_dir = _detect_qgis_app_dir()
QgsApplication.setPrefixPath(qgis_app_dir, True)
try:
from config import settings
prefix_path = getattr(settings, "QGIS_DOCKER_PREFIX_PATH", "/usr")
except Exception:
prefix_path = "/usr"
QgsApplication.setPrefixPath(prefix_path, True)
qgs_app = QgsApplication([], False)
qgs_app.initQgis()
return qgs_app
def _process_single(service, model):
"""处理单个模板,返回结果 dict"""
"""处理单个模板,返回结果 dict。成功/失败均输出 PROGRESS 行。"""
name = service.generate(model)
result = {"name": name, "output": model["outFile"]}
# ★ 实时进度:每完成一张图就输出到 stdout
print(f"PROGRESS:{json.dumps(result, ensure_ascii=False)}", flush=True)
return result
def _emit_progress(result: dict):
"""输出 PROGRESS 行(成功或失败均调用)"""
print(f"PROGRESS:{json.dumps(result, ensure_ascii=False)}", flush=True)
def main():
t_start = time.time()
@@ -166,7 +133,9 @@ def main():
f"耗时 {elapsed:.1f}s — {error_msg}",
file=sys.stderr,
)
results.append({"name": model.get("name", ""), "output": "", "error": error_msg})
fail_result = {"name": model.get("name", ""), "output": "", "error": error_msg}
_emit_progress(fail_result)
results.append(fail_result)
# 输出结果
if len(models) == 1 and not request.get("models"):
+198
View File
@@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""
QGIS 长驻 Worker — Docker 容器内运行,常驻进程模式。
与 qgis_runner.py 的区别:
- qgis_runner.py: 一次性进程,处理完一批任务后退出
- qgis_worker.py: 长驻进程,初始化 QGIS 一次,循环从 stdin 读取任务处理
通信协议(stdin/stdout JSON Lines):
输入: 每行一个 JSON 任务 {"config": {...}, "models": [{...}, ...]}
输出: 每张图一行 PROGRESS PROGRESS:{"name": "...", "output": "..."}
任务结束一行 JOB_DONE:{"total": N, "ok": M, "fail": K, "elapsed": T}
致命错误 FATAL:{"error": "..."}
用法:
docker exec -i qgis-server python3 /app/app/services/qgis/qgis_worker.py
"""
import json
import os
import sys
import time
# ============================================================
# 环境初始化(与 qgis_runner.py 保持一致)
# ============================================================
def _setup_python_path():
"""将项目根目录和 QGIS dist-packages 加入 sys.path"""
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
if project_root not in sys.path:
sys.path.insert(0, project_root)
try:
from config import settings
pythonpath = getattr(settings, "QGIS_DOCKER_PYTHONPATH", None) or []
except Exception:
pythonpath = []
for p in pythonpath:
if os.path.isdir(p) and p not in sys.path:
sys.path.insert(0, p)
def _setup_environment():
"""设置 QGIS 运行所需的环境变量"""
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
def _init_qgis():
"""初始化 QgsApplication"""
from qgis.core import QgsApplication
try:
from config import settings
prefix_path = getattr(settings, "QGIS_DOCKER_PREFIX_PATH", "/usr")
except Exception:
prefix_path = "/usr"
QgsApplication.setPrefixPath(prefix_path, True)
qgs_app = QgsApplication([], False)
qgs_app.initQgis()
return qgs_app
# ============================================================
# 单任务处理
# ============================================================
def _emit(line: str):
"""输出一行到 stdout"""
print(line, flush=True)
def _emit_progress(result: dict):
"""输出 PROGRESS 行"""
_emit(f"PROGRESS:{json.dumps(result, ensure_ascii=False)}")
def _process_job(service, job: dict) -> dict:
"""
处理一个批次任务(多个模板),返回汇总结果。
每张图输出一行 PROGRESS,最后返回汇总 dict。
"""
models = job.get("models") or []
if not models:
return {"total": 0, "ok": 0, "fail": 0, "elapsed": 0}
ok = 0
fail = 0
t_start = time.time()
for i, model in enumerate(models):
t_model = time.time()
try:
name = service.generate(model)
result = {"name": name, "output": model["outFile"]}
_emit_progress(result)
ok += 1
elapsed = time.time() - t_model
print(
f"[worker] [{i+1}/{len(models)}] 完成: {name}, 耗时 {elapsed:.1f}s",
file=sys.stderr,
)
except Exception as e:
elapsed = time.time() - t_model
error_msg = f"{e}"
fail_result = {"name": model.get("name", ""), "output": "", "error": error_msg}
_emit_progress(fail_result)
fail += 1
print(
f"[worker] [{i+1}/{len(models)}] 失败: {model.get('name', '?')}, "
f"耗时 {elapsed:.1f}s — {error_msg}",
file=sys.stderr,
)
total_elapsed = time.time() - t_start
summary = {"total": len(models), "ok": ok, "fail": fail, "elapsed": round(total_elapsed, 1)}
print(
f"[worker] 批次完成: {ok}成功/{fail}失败, 总耗时 {total_elapsed:.1f}s",
file=sys.stderr,
)
return summary
# ============================================================
# 主循环
# ============================================================
def main():
_setup_environment()
_setup_python_path()
print("[worker] 正在初始化 QGIS...", file=sys.stderr)
t_init = time.time()
qgs_app = _init_qgis()
print(f"[worker] QGIS 初始化完成 ({time.time() - t_init:.1f}s),等待任务...", file=sys.stderr)
try:
from app.services.qgis.map_service import MapService
# 持有 MapService 实例(config 在第一个任务时设置)
service = None
for line_no, line in enumerate(sys.stdin, 1):
line = line.strip()
if not line:
continue
try:
job = json.loads(line)
except json.JSONDecodeError as e:
print(f"[worker] 第{line_no}行 JSON 解析失败: {e}", file=sys.stderr)
_emit(f'JOB_DONE:{{"error": "JSON解析失败: {e}"}}')
continue
config = job.get("config")
if config:
# 更新 config(每个任务可以带不同的 config)
service = MapService(config)
if service is None:
_emit(f'JOB_DONE:{{"error": "未提供 config"}}')
continue
summary = _process_job(service, job)
_emit(f"JOB_DONE:{json.dumps(summary, ensure_ascii=False)}")
# 清空项目,恢复到初始状态,等待下一个任务
from qgis.core import QgsProject
QgsProject.instance().clear()
print("[worker] 项目已清空,等待下一个任务", file=sys.stderr)
except KeyboardInterrupt:
print("[worker] 收到中断信号,退出", file=sys.stderr)
except Exception as e:
print(f"[worker] 致命错误: {e}", file=sys.stderr)
_emit(f'FATAL:{{"error": "{e}"}}')
finally:
qgs_app.exitQgis()
print("[worker] QGIS 已退出", file=sys.stderr)
if __name__ == "__main__":
try:
main()
except Exception as e:
# 捕获 main() 未处理的异常,确保输出到 stderr
import traceback
print(f"[worker] 未捕获异常: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
sys.exit(1)
+6 -2
View File
@@ -39,12 +39,13 @@ class TemplateModifier:
"""
将 GPKG 文件路径对应的 <provider ...>postgres</provider> 改为 ogr。
策略:按 <maplayer> 块处理,若块内 datasource 是文件路径(盘符开头),
策略:按 <maplayer> 块处理,若块内 datasource 是文件路径(盘符开头或 Linux 绝对路径),
则该块内的 provider 改为 ogr。避免跨层误改。
"""
maplayer_re = re.compile(r'(<maplayer[^>]*>.*?</maplayer>)', re.DOTALL)
provider_re = re.compile(r'(<provider[^>]*>)postgres(</provider>)')
file_ds_re = re.compile(r'<datasource>([A-Za-z]:/[^<]+)</datasource>')
# 匹配 Windows 盘符路径 (G:/...) 或 Linux 绝对路径 (/app/...)
file_ds_re = re.compile(r'<datasource>(?:[A-Za-z]:/|/)[^<]*</datasource>')
def _fix_layer(m):
layer_xml = m.group(1)
@@ -60,8 +61,11 @@ class TemplateModifier:
has_static = bool(self._static_map)
if not override and not has_static:
logger.info(f"模板无需修改(无 override 且 static_map 为空),直接返回: {template_path}")
return template_path
logger.info(f"模板修改: static_map有{len(self._static_map)}条, override={'' if override else ''}")
orig = override.get("original") if override else None
actual = override.get("actual") if override else None
+23 -3
View File
@@ -10,7 +10,7 @@ from config import settings
class RedisHelper:
"""Redis 数据库帮助类"""
def __init__(self):
"""初始化 Redis 连接配置"""
self.redis_config = {
@@ -23,7 +23,20 @@ class RedisHelper:
'socket_timeout': 5, # 读写超时时间(秒)
}
self._client: Optional[redis.Redis] = None
self._logged_config = False # 避免重复打印配置
def _log_config_once(self):
"""首次连接失败时打印配置信息,便于排查"""
if not self._logged_config:
from app.utils.logger import get_logger
_logger = get_logger("redis")
_logger.warning(
f"Redis 连接配置: host={self.redis_config['host']}, "
f"port={self.redis_config['port']}, db={self.redis_config['db']}, "
f"password={'***' if self.redis_config['password'] else 'None'}"
)
self._logged_config = True
@property
def client(self) -> redis.Redis:
"""获取 Redis 客户端实例(单例模式)"""
@@ -32,10 +45,17 @@ class RedisHelper:
self._client = redis.Redis(**self.redis_config)
# 测试连接
self._client.ping()
except redis.AuthenticationError as e:
self._log_config_once()
raise ConnectionError(f"Redis 认证失败(密码错误): {e}")
except redis.ConnectionError as e:
self._log_config_once()
raise ConnectionError(f"无法连接到 Redis 服务器: {e}")
except Exception as e:
self._log_config_once()
raise ConnectionError(f"Redis 连接异常: {e}")
return self._client
def close(self):
"""关闭 Redis 连接"""
if self._client:
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
+26 -18
View File
@@ -8,9 +8,13 @@ RAIN_STATION_GRID_DIR = "/xian/rainfall/grid/images/:id"
REDIS_RAIN_STATION_GRID_KEY = "xian:rainfall:rain_station_grid"
REDIS_RAIN_STATION_IDENTIFIER_KEY = "xian:rainfall:rain_station_identifier"
PREDICT_PROBABILITY_THRESHOLD = 50
# 静态底图 GeoPackage 目录(相对于项目根目录)
# 静态底图 GeoPackage 目录
QGIS_GPKG_DIR = "app/data/gpkg"
# 专题图输出子目录(相对于 FILE_STORE_DIR
# 容器内 GPKG 本地路径
QGIS_DOCKER_GPKG_DIR = "/data/gpkg"
# 容器内模板本地路径
QGIS_DOCKER_TEMPLATE_DIR = "/data/template"
# 专题图输出子目录
QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:inferenceId"
# 专题图默认参数
QGIS_DEFAULTS_MAP_LAYOUT = "A4"
@@ -19,10 +23,25 @@ QGIS_DEFAULTS_ZOOM_VALUE = "5"
QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# 专题图DPI
QGIS_EXPORT_DPI = 200
# 并行子进程数(每进程独立 QGIS 实例)
QGIS_PARALLEL_WORKERS = 4
# 最大并发请求数(防止多人同时触发资源耗尽)
QGIS_MAX_CONCURRENT = 2
# Worker 进程池大小
QGIS_POOL_SIZE = 4
# ============================================================
# Docker QGIS 配置
# ============================================================
# 容器名称/ID
QGIS_DOCKER_CONTAINER = "qgis-server"
# 容器内项目代码挂载目标路径
QGIS_DOCKER_PROJECT_DIR = "/app"
# 容器内 Python 解释器路径
QGIS_DOCKER_PYTHON = "/usr/bin/python3"
# QGIS 安装根目录
QGIS_DOCKER_PREFIX_PATH = "/usr"
# QGIS Python 包路径(官方镜像为空列表)
QGIS_DOCKER_PYTHONPATH = []
# Qt 无头渲染模式
QGIS_DOCKER_QT_PLATFORM = "offscreen"
# Docker 镜像名称
QGIS_DOCKER_IMAGE = "qgis/qgis:3.44.11"
# 优先产出模板
QGIS_PRIORITY_TEMPLATES = ["暴雨地质灾害风险区分布图", "暴雨滑坡潜在隐患点及人口分布图", "暴雨山洪潜在隐患点及人口分布图", "暴雨泥石流潜在隐患点及人口分布图", "暴雨内涝潜在隐患点及人口分布图", "暴雨避难场所分布图"]
@@ -80,13 +99,6 @@ REDIS_DB = 0
# 文件路径配置
# ============================================================
FILE_STORE_DIR = "G:/files"
# ============================================================
# QGIS 配置
# ============================================================
QGIS_ROOT = "D:/QGIS"
# 专题图输出子目录
QGIS_OUTPUT_DIR = "xian/qgis/map/:eventType/:inferenceId"
QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# ============================================================
# 生产环境
@@ -120,8 +132,4 @@ REDIS_DB = 0
# ============================================================
# 文件路径配置
# ============================================================
FILE_STORE_DIR = "/data"
# ============================================================
# QGIS 配置
# ============================================================
QGIS_ROOT = "/home/QGIS"
FILE_STORE_DIR = "/data"
Binary file not shown.