6 Commits

Author SHA1 Message Date
wzy-warehouse 1f01ceb062 添加降雨量与持续时间接口 2026-06-28 09:26:13 +08:00
wzy-warehouse 54f7300557 扫描QGIS模板 2026-06-27 10:51:13 +08:00
wzy-warehouse 1b08f2c4a2 添加中文字体 2026-06-24 15:52:40 +08:00
wzy-warehouse 7163ca67f9 修改适配Linux 2026-06-24 14:16:23 +08:00
wzy-warehouse e5582bab5d 修改适配Linux 2026-06-24 13:03:15 +08:00
wzy-warehouse cd638d9a5c QGIS设置允许并发 2026-06-24 11:22:20 +08:00
21 changed files with 1274 additions and 381 deletions
+84 -45
View File
@@ -32,15 +32,11 @@ docker load -i qgis-34411.tar
```bash
# ---- Linux / macOS ----
PROJECT_ROOT=/home/xian/xian_algorithm_new
FILE_STORE=/data
docker run -d \
--name qgis-server \
--restart unless-stopped \
-v "${PROJECT_ROOT}:/app:ro" \
-v "${FILE_STORE}:${FILE_STORE}" \
-v "/www/wwwroot/xian_algorithm_new:/app:ro" \
-v "/www/wwwroot/xian_algorithm_new/files:/files" \
qgis/qgis:3.44.11 \
sleep infinity
@@ -124,60 +120,103 @@ docker exec qgis-server ls -la /data/template/earthquake
- 容器重建后(`/data` 目录丢失)
- 首次部署时
## 5. 安装中文字体(手动)
## 5. 安装中文字体(手动,必须执行
QGIS 模板使用了 SimHei(黑体)、SimSun(宋体)、Microsoft YaHei(微软雅黑)等 Windows 中文字体,
Docker 镜像默认不包含这些字体,会导致中文全部乱码。**字体需手动安装,代码不会自动安装。**
### 步骤
### 5.1 准备字体文件
字体文件存放在项目根目录的 `fonts/` 目录下,已预置 4 个常用中文字体:
```
fonts/
├── simhei.ttf — 黑体(模板默认字体)
├── simsun.ttc — 宋体
├── msyh.ttc — 微软雅黑
└── msyhbd.ttc — 微软雅黑粗体
```
如果字体缺失,从 Windows 主机复制(`C:\Windows\Fonts\`):
```bash
# 1. 创建字体目录
docker exec qgis-server mkdir -p /usr/share/fonts/truetype/winfonts
# Linux 服务器用 SCP 从 Windows 传
scp "C:\Windows\Fonts\simhei.ttf" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\simsun.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\msyh.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\msyhbd.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
```
# 2. 从主机复制 Windows 中文字体
# Windows 路径(根据实际字体文件位置调整):
docker cp "C:\Windows\Fonts\simhei.ttf" qgis-server:/usr/share/fonts/truetype/winfonts/
docker cp "C:\Windows\Fonts\simsun.ttc" qgis-server:/usr/share/fonts/truetype/winfonts/
docker cp "C:\Windows\Fonts\msyh.ttc" qgis-server:/usr/share/fonts/truetype/winfonts/
docker cp "C:\Windows\Fonts\msyhbd.ttc" qgis-server:/usr/share/fonts/truetype/winfonts/
### 5.2 一键安装到容器
# Linux 字体路径示例:
# docker cp /usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf qgis-server:/usr/share/fonts/truetype/winfonts/
```bash
python app/script/install_fonts_to_container.py
```
# 3. 刷新字体缓存
输出示例:
```
=== 安装中文字体到 Docker 容器 qgis-server ===
字体目录: /www/wwwroot/xian_algorithm_new/fonts
字体文件: 4 个
- msyh.ttc (4165 KB)
- msyhbd.ttc (4167 KB)
- simhei.ttf (2637 KB)
- simsun.ttc (10126 KB)
容器目标: qgis-server:/usr/share/fonts/truetype/winfonts
[1/3] 复制字体文件...
OK msyh.ttc
OK msyhbd.ttc
OK simhei.ttf
OK simsun.ttc
[2/3] 刷新字体缓存...
OK
[3/3] 验证字体...
中文字体: ['SimHei', 'SimSun', 'Microsoft YaHei', 'Microsoft YaHei UI']
=== 完成,耗时 3.2s ===
```
其他用法:
```bash
python app/script/install_fonts_to_container.py --dry-run # 仅查看信息
python app/script/install_fonts_to_container.py --container my # 指定容器名
```
### 5.3 持久化(推荐)
容器重建后字体丢失,**强烈建议挂载 `fonts/` 目录**。
```bash
# 停掉旧容器
docker stop qgis-server && docker rm qgis-server
# 重建容器,加上字体挂载
docker run -d \
--name qgis-server \
--restart unless-stopped \
-v "/www/wwwroot/xian_algorithm_new:/app:ro" \
-v "/www/wwwroot/xian_algorithm_new/files:/files" \
-v "/www/wwwroot/xian_algorithm_new/fonts:/usr/share/fonts/truetype/winfonts:ro" \
qgis/qgis:3.44.11 \
sleep infinity
# 挂载后只需刷新一次缓存
docker exec qgis-server fc-cache -fv
# 4. 验证字体已识别
docker exec qgis-server python3 -c "
from PyQt5.QtGui import QFontDatabase
db = QFontDatabase()
zh = [f for f in db.families() if 'SimHei' in f or 'YaHei' in f or 'SimSun' in f]
print('中文字体:', zh)
"
```
### 持久化方案
### 5.4 无法获取 Windows 字体时的替代方案
容器重建后字体丢失。可选方案:
**方案 A:挂载单个字体文件**
```bash
docker run -d ... \
-v "C:\Windows\Fonts\simhei.ttf:/usr/share/fonts/truetype/winfonts/simhei.ttf" \
-v "C:\Windows\Fonts\simsun.ttc:/usr/share/fonts/truetype/winfonts/simsun.ttc" \
...
```
# Linux 服务器安装开源中文字体
yum install wqy-microhei-fonts # CentOS / RHEL
apt install fonts-wqy-microhei # Debian / Ubuntu
**方案 B:挂载整个字体目录(推荐)**
```bash
# 先在主机创建字体目录,放入所需字体文件
mkdir -p /opt/qgis-fonts
cp /usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf /opt/qgis-fonts/
docker run -d ... \
-v "/opt/qgis-fonts:/usr/share/fonts/truetype/winfonts:ro" \
...
# 将系统字体复制到项目 fonts/ 目录
cp /usr/share/fonts/wqy-microhei/wqy-microhei.ttc /www/wwwroot/xian_algorithm_new/fonts/
```
## 6. 验证容器
+1 -1
View File
@@ -134,7 +134,7 @@ async def predict_earthquake(req: EarthquakePredictRequest):
"point_ids": req.point_ids,
"region_code": req.region_code,
"magnitude": req.magnitude,
"depth": req.depth, # 已有默认值 10.0
"depth": req.depth,
"epicenter_lon": req.epicenter_lon,
"epicenter_lat": req.epicenter_lat,
"occurred_time": occurred_time.isoformat() if hasattr(occurred_time, 'isoformat') else str(occurred_time)
+203 -283
View File
@@ -2,18 +2,16 @@
QGIS 专题图导出接口
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
- 线程池并发产图(默认 4 worker
- Worker 进程池串行产图(QGIS 只初始化一次,模板顺序处理
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
- 异步模式:HTTP 请求立即返回,产图在后台线程执行
"""
import asyncio
import concurrent.futures
import os
import re
import subprocess
import threading
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
@@ -29,9 +27,9 @@ logger = get_logger("api.qgis")
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
# 去重锁
_in_progress_locks: dict[str, asyncio.Lock] = {}
_locks_lock = asyncio.Lock()
# 后台任务跟踪(线程安全)
_background_tasks: set[int] = set()
_tasks_lock = threading.Lock()
# 西安市中心
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
@@ -156,23 +154,51 @@ def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
# 构建 QGIS 服务配置字典
# ============================================================
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录)"""
from app.services.qgis.qgis_env import (
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
gpkg_dir = get_gpkg_dir()
# Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行)
def _check_docker_running() -> bool:
"""检测 Docker 容器是否在运行"""
from app.services.qgis.qgis_env import get_docker_container
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5,
)
is_docker = result.stdout.strip() == "true"
return result.stdout.strip() == "true"
except Exception:
is_docker = False
return False
def _list_container_templates(container: str, event_type: str) -> list:
"""Docker 模式下:扫描容器内模板目录,返回模板文件名列表"""
from app.services.qgis.qgis_env import get_docker_container
container_tpl_dir = getattr(settings, "QGIS_DOCKER_TEMPLATE_DIR", "") or "/data/template"
container_path = f"{container_tpl_dir.rstrip('/')}/{event_type}"
try:
result = subprocess.run(
["docker", "exec", container, "ls", container_path],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
logger.error(f"[Docker] 列出容器模板失败: {result.stderr.strip()}")
return []
files = [
f.strip() for f in result.stdout.splitlines()
if f.strip().endswith(".qgz") and not f.strip().startswith("tmp")
]
logger.info(f"[Docker] 容器内模板扫描: {container_path}{len(files)} 个模板")
return sorted(files)
except Exception as e:
logger.error(f"[Docker] 列出容器模板异常: {e}")
return []
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录和 Docker 模式标志)"""
from app.services.qgis.qgis_env import (
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
gpkg_dir = get_gpkg_dir()
is_docker = _check_docker_running()
if is_docker:
# GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P)
@@ -208,25 +234,156 @@ def _build_qgis_config(batch_folder: str) -> dict:
},
"static_layers": build_static_layers_config(gpkg_dir),
"batch_folder": batch_folder,
"is_docker": is_docker,
}
# ============================================================
# 接口实现
# 接口实现(异步 fire-and-forget 模式)
# ============================================================
# 全局并发限制
import asyncio as _asyncio
_concurrent = getattr(settings, "QGIS_MAX_CONCURRENT", 2)
_qgis_semaphore = _asyncio.Semaphore(_concurrent)
def _background_export(inference_id: int) -> None:
"""后台线程:执行产图全流程(DB 查询 + 模板扫描 + 进程池提交)"""
from app.services.qgis.qgis_env import get_docker_container
from app.services.qgis.qgis_pool import qgis_pool
from app.services.qgis.qgis_env import (
map_host_to_container, map_container_to_host, map_template_to_container,
)
try:
# 1. 查推理结果
inference = qgis_repository.query_inference_result(inference_id)
event_type = inference["event_type"]
batch_key = str(inference_id)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
logger.info(
f"[后台] 推理结果: id={inference['id']}, "
f"type={event_type}, occurred_time={inference['occurred_time']}"
)
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 2. 扫描模板
is_docker = config.get("is_docker", False)
if is_docker:
container_tpl = getattr(settings, "QGIS_DOCKER_TEMPLATE_DIR", "") or "/data/template"
template_dir = f"{container_tpl.rstrip('/')}/{event_type}"
template_files = _list_container_templates(get_docker_container(), event_type)
else:
tpl_subdir = getattr(settings, "QGIS_HOST_TEMPLATE_SUBDIR", "app/data/template")
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
*tpl_subdir.replace("\\", "/").split("/")
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i)
return (1, name)
template_files.sort(key=_priority_order)
if not template_files:
logger.error(f"[后台] 模板文件夹为空: {template_dir}")
return
# 3. 增量检查
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[后台] [跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return
missing_templates = [
f for f in template_files
if os.path.splitext(f)[0] + ".jpg" not in existing
]
logger.info(
f"[后台] [增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
# 4. 构建模型参数
models = []
for tpl_file in missing_templates:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"[后台] 模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}"
)
models.append(model)
# 5. 路径映射到容器内
container_models = []
for m in models:
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
if "path" in cm:
cm["path"] = map_template_to_container(cm["path"])
container_models.append(cm)
logger.info(f"[后台] 提交 {len(container_models)} 张图到 Worker 进程池")
# 6. 提交到进程池(阻塞等待产图完成)
db_lock = threading.Lock()
all_results = []
def _on_progress(r: dict):
if "output" in r:
r["output"] = map_container_to_host(r["output"])
all_results.append(r)
status = "FAIL" if "error" in r else "OK"
logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r["output"], file_store, db_lock)
results, summary = qgis_pool.submit_job(config, container_models, _on_progress)
success_count = summary.get("ok", 0)
fail_count = summary.get("fail", 0)
elapsed = summary.get("elapsed", 0)
logger.info(f"[后台] 完成: 成功={success_count}, 失败={fail_count}, 耗时={elapsed}s")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
except Exception as e:
logger.error(f"[后台] 产图失败 (inferenceId={inference_id}): {e}", exc_info=True)
finally:
with _tasks_lock:
_background_tasks.discard(inference_id)
logger.info(f"[后台] 任务结束 (inferenceId={inference_id})")
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
async def export_map(req: QgisMapExportRequest):
"""
根据模拟ID批量导出专题图。同一 inferenceId 共享文件夹,增量产出缺失图片。
异步模式:请求立即返回,产图在后台线程执行。
"""
from app.services.qgis.qgis_env import get_docker_container
# 检查 Docker 容器
try:
result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
@@ -239,272 +396,35 @@ async def export_map(req: QgisMapExportRequest):
except Exception:
raise HTTPException(status_code=503, detail="QGIS Docker 容器不可用")
async with _qgis_semaphore:
inference_id = req.inferenceId
loop = asyncio.get_event_loop()
inference_id = req.inferenceId
try:
inference = await loop.run_in_executor(
None, qgis_repository.query_inference_result, inference_id
# 去重:同一 inferenceId 不重复提交
with _tasks_lock:
if inference_id in _background_tasks:
logger.info(f"inferenceId={inference_id} 已有任务在执行,跳过")
return QgisMapExportResponse(
code=200,
message=f"任务已在执行中,请稍后查看结果",
data=str(inference_id),
)
_background_tasks.add(inference_id)
event_type = inference["event_type"]
batch_key = str(inference_id)
# 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名)
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
# 去重锁
async with _locks_lock:
if batch_key not in _in_progress_locks:
_in_progress_locks[batch_key] = asyncio.Lock()
task_lock = _in_progress_locks[batch_key]
async with task_lock:
# 精确判断在模板扫描后(比较文件数 vs 模板数)
logger.info(
f"推理结果查询成功: id={inference['id']}, "
f"type={event_type}, "
f"occurred_time={inference['occurred_time']}"
)
os.makedirs(batch_folder, exist_ok=True)
config = _build_qgis_config(batch_folder)
# 扫描模板文件夹下所有 .qgz 文件
template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template"
)
template_dir = os.path.join(template_base, event_type)
template_files = sorted([
f for f in os.listdir(template_dir)
if f.endswith(".qgz") and not f.startswith("tmp")
])
# 优先模板排到前面
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
if priority_keywords:
def _priority_order(name: str) -> tuple:
for i, kw in enumerate(priority_keywords):
if kw in name:
return (0, i) # 优先组,按配置顺序
return (1, name) # 非优先组,按文件名
template_files.sort(key=_priority_order)
priority_count = sum(
1 for f in template_files
if any(kw in f for kw in priority_keywords)
)
logger.info(f"优先模板: {priority_count} 张排前面")
if not template_files:
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
# 检查已产出的图片,只生成缺失的
existing = set()
if os.path.isdir(batch_folder):
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
if len(existing) == len(template_files):
logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出")
return QgisMapExportResponse(
code=200,
message=f"已存在,共{len(existing)}",
data=batch_key,
)
missing_templates = []
for tpl_file in template_files:
out_name = os.path.splitext(tpl_file)[0] + ".jpg"
if out_name not in existing:
missing_templates.append(tpl_file)
if len(missing_templates) < len(template_files):
logger.info(
f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}"
)
template_files = missing_templates
# 构建所有模型参数(批量模式)
models = []
for tpl_file in template_files:
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
model = _derive_model_params(inference, batch_folder, tpl_path)
logger.info(
f"模板: {tpl_path}, "
f"输出: {model['outFile']}, "
f"标题: {model['mapTitle']}, "
f"中心: ({model['centerX']}, {model['centerY']})"
)
models.append(model)
# 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度)
_generate_batch_maps(models, config, batch_key, inference_id, file_store)
return QgisMapExportResponse(
code=200,
message=f"任务已完成,共{len(models)}张专题图",
data=batch_key,
)
except ValueError as e:
logger.warning(f"参数错误: {e}")
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
logger.error(f"模板文件不存在: {e}")
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
except Exception as e:
logger.error(f"专题图导出失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
finally:
pass
def _generate_batch_maps(models: list, config: dict, batch_key: str,
inference_id: int = None, file_store: str = None) -> None:
"""并行启动多个 docker exec 子进程,实时读取每张图进度并写 DB"""
import json, math, concurrent.futures, subprocess, tempfile, threading
from app.services.qgis.qgis_env import (
get_docker_project_dir, get_container_python_path, build_docker_exec_cmd,
map_host_to_container, map_container_to_host, map_template_to_container,
# 启动后台线程执行产图
thread = threading.Thread(
target=_background_export,
args=(inference_id,),
daemon=True,
name=f"qgis-export-{inference_id}",
)
thread.start()
logger.info(f"后台产图任务已启动 (inferenceId={inference_id})")
max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4)
workers = min(max_workers, len(models))
chunk_size = math.ceil(len(models) / workers)
chunks = []
for i in range(0, len(models), chunk_size):
chunks.append(models[i:i + chunk_size])
project_dir = get_docker_project_dir()
python_in_container = get_container_python_path()
runner_in_container = f"{project_dir}/app/services/qgis/qgis_runner.py"
logger.info(
f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行 docker exec "
f"(每进程 {chunk_size} 张)"
return QgisMapExportResponse(
code=200,
message=f"任务已提交,正在后台生成专题图",
data=str(inference_id),
)
errors = []
all_results = []
db_lock = threading.Lock() # 保护 DB 写入
def _run_chunk(chunk_models: list, chunk_idx: int):
"""单个 docker exec 子进程,逐张读取进度并实时写 DB"""
# 将主机路径映射为容器内路径(G:/files → /files
container_models = []
for m in chunk_models:
cm = dict(m)
if "outFile" in cm:
cm["outFile"] = map_host_to_container(cm["outFile"])
# 模板 path 映射到容器本地路径(预拷贝后绕过 9P)
if "path" in cm:
cm["path"] = map_template_to_container(cm["path"])
container_models.append(cm)
request = json.dumps({"config": config, "models": container_models}, ensure_ascii=False)
# 临时文件写到项目目录内的 tmp/ 子目录(挂载到容器 /app/tmp/),确保容器内可访问
project_dir_host = str(Path(__file__).parent.parent.parent)
tmp_dir = os.path.join(project_dir_host, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
tmp = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8",
dir=tmp_dir,
)
tmp.write(request)
tmp.close()
# 主机临时文件路径 → 容器内路径
tmp_in_container = tmp.name.replace("\\", "/")
project_root = str(Path(__file__).parent.parent.parent).replace("\\", "/")
if tmp_in_container.startswith(project_root):
tmp_in_container = tmp_in_container.replace(project_root, project_dir, 1)
cmd = build_docker_exec_cmd(python_in_container, runner_in_container, tmp_in_container)
logger.info(f"[Docker] chunk{chunk_idx} 命令: {' '.join(cmd)}")
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding="utf-8", errors="replace",
)
# 并发读取 stderr 防止管道缓冲区满导致死锁
stderr_lines = []
def _drain_stderr():
for ln in proc.stderr:
stderr_lines.append(ln)
stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
stderr_thread.start()
chunk_results = []
for line in proc.stdout:
line = line.strip()
logger.debug(f"[chunk{chunk_idx}] stdout: {line[:200]}")
if line.startswith("PROGRESS:"):
try:
r = json.loads(line[len("PROGRESS:"):])
# 容器内路径映射回主机路径(/files → G:/files
if "output" in r:
r["output"] = map_container_to_host(r["output"])
chunk_results.append(r)
status = "FAIL" if "error" in r else "OK"
logger.info(f"[chunk{chunk_idx}] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock)
except json.JSONDecodeError:
logger.warning(f"[chunk{chunk_idx}] JSON解析失败: {line[:200]}")
proc.wait()
stderr_thread.join(timeout=5)
stderr_text = "".join(stderr_lines)
if stderr_text.strip():
logger.warning(f"[docker exec chunk{chunk_idx}] stderr:\n{stderr_text[:800]}")
try:
os.remove(tmp.name)
except OSError:
pass
if proc.returncode != 0:
raise RuntimeError(
f"[docker exec chunk{chunk_idx}] 失败 (exit={proc.returncode}): "
f"{stderr_text[:300]}"
)
return chunk_results
with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor:
futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)}
for future in concurrent.futures.as_completed(futures):
chunk_idx = futures[future]
try:
results = future.result()
all_results.extend(results)
except Exception as e:
logger.error(f"[批量产图] 子进程 {chunk_idx} 失败: {e}")
errors.append(str(e))
if errors and not all_results:
raise RuntimeError(f"所有子进程均失败: {'; '.join(errors[:2])}")
success_count = sum(1 for r in all_results if "error" not in r)
fail_count = len(all_results) - success_count
logger.info(f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}")
for r in all_results:
if "error" not in r:
logger.info(f" OK {r.get('output', 'N/A')}")
else:
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
if success_count == 0 and fail_count > 0:
first_err = all_results[0].get("error", "unknown")
raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}")
# ============================================================
+74 -6
View File
@@ -7,11 +7,16 @@ from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException
from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictData, UpdateMonitoringTimeRequest
from app.schemas.api_schemas import (
RainfallPredictRequest, PredictResponse, PredictData,
UpdateMonitoringTimeRequest, DistrictSummaryRequest, DistrictSummaryItem
)
from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore
from app.repositories.dbn_repository import dbn_repository
from app.repositories.rainfall_repository import rainfall_repository as rain_repo
from app.core.rainfall_manager import rainfall_manager
from app.config.paths import get_logger
from app.utils.db_helper import db_helper
from app.utils.time_converter import TimeConverter
router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"])
@@ -141,13 +146,17 @@ async def update_monitoring_time(req: UpdateMonitoringTimeRequest):
@router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测")
async def predict_rainfall(req: RainfallPredictRequest):
"""
根据降雨量和持续时间,批量预测隐患点/风险点的灾害概率。
批量预测隐患点/风险点的灾害概率。两种模式(二选一):
**自动推演模式**rainfall、duration、region_code 全部不传
→ 从气象表自动获取各点最近站降雨数据,不限区域
**指定条件模式**rainfall、duration、region_code 全部传入
→ 按指定降雨条件和区域预测
- **disaster_name**: 灾害名称
- **point_ids**: 点位ID列表(可选,不传则查询所有点
- **region_code**: 行政区划代码(可选,不传则不限区域
- **rainfall**: 累计降雨量(mm),不传则从气象表自动获取
- **duration**: 降雨持续时间(h),不传则从气象表自动获取
- **point_ids**: 点位ID列表(可选)
- **occurred_time**: 事件发生时间(可选,不传则为当前时间
- **operation_type**: 操作类型(如 '实时监测', '情景模拟', '应急评估'
"""
semaphore = get_prediction_semaphore()
@@ -180,3 +189,62 @@ async def predict_rainfall(req: RainfallPredictRequest):
logger.error(f"保存推理结果失败: {e}", exc_info=True)
return PredictResponse(code=200, message="success", data=PredictData(record_id=record_id, list=result_map_with_location))
@router.post("/district-summary", summary="获取各区降雨概况")
async def district_summary(req: DistrictSummaryRequest):
"""
根据推理结果ID获取各行政区的降雨概况(用于报告生成)。
逻辑:
- 如果预测时传了 rainfall → 直接用 condition 中的值
- 如果预测时没传 rainfall → 从 xian_meteorology 按区聚合实际气象数据
- **inference_id**: 推理结果ID
"""
# 1. 查推理结果
record = dbn_repository.get_inference_result(req.inference_id)
if not record:
raise HTTPException(status_code=404, detail=f"推理结果不存在: {req.inference_id}")
condition = record.get('condition') or {}
occurred_time = record.get('occurred_time')
# 2. 如果传了 rainfall → 直接用 condition 中的值
if condition.get('rainfall') is not None:
region_code = condition.get('region_code')
rainfall_val = float(condition['rainfall'])
duration_val = float(condition.get('duration', 0))
if region_code:
# 查 district 名称
district_row = db_helper.execute_query_one(
"SELECT name FROM xian_district WHERE code = %s AND is_delete = 0", (region_code,)
)
district_name = district_row['name'] if district_row else region_code
items = [{
'district_name': district_name,
'district_code': region_code,
'rainfall': rainfall_val,
'duration_hours': duration_val
}]
else:
items = [{
'district_name': '全市',
'district_code': '',
'rainfall': rainfall_val,
'duration_hours': duration_val
}]
else:
# 3. 自动推演模式 → 从气象表按区聚合
items = rain_repo.get_district_rainfall_summary(occurred_time)
if not items:
return {"code": 200, "message": "success", "data": []}
# 转换为响应格式
result_items = [
DistrictSummaryItem(**item).model_dump()
for item in items
]
return {"code": 200, "message": "success", "data": result_items}
+13 -13
View File
@@ -39,26 +39,26 @@ class AppLauncher:
check_virtualenv(self.project_root)
# 检查是否正在使用虚拟环境运行
import platform
import sys
venv_path = self.project_root / ".venv"
os_name = platform.system()
# sys.prefix != sys.base_prefix 是 Python 检测 venv 的标准方式
# 不依赖路径解析,Windows/Linux 均适用
in_venv = hasattr(sys, 'real_prefix') or (
hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix
)
if os_name == 'Windows':
venv_python = venv_path / "Scripts" / "python.exe"
else: # Linux/Mac
venv_python = venv_path / "bin" / "python3"
if not in_venv:
import platform
venv_path = self.project_root / ".venv"
os_name = platform.system()
# 如果当前不是使用虚拟环境的Python,则重新启动
current_python = Path(sys.executable).resolve()
venv_python_resolved = venv_python.resolve()
if os_name == 'Windows':
venv_python = venv_path / "Scripts" / "python.exe"
else: # Linux/Mac
venv_python = venv_path / "bin" / "python3"
if current_python != venv_python_resolved:
print("\n" + "=" * 50)
print("检测到未使用虚拟环境,正在切换到虚拟环境...")
print("=" * 50)
# 使用虚拟环境的Python重新启动应用(不传递参数避免重复检查)
import subprocess
cmd = [str(venv_python)] + sys.argv
subprocess.run(cmd, check=True)
+4 -8
View File
@@ -6,8 +6,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from app.utils.api_deps import get_rainfall_model, get_earthquake_model, is_model_loaded
from app.schemas.api_schemas import HealthResponse
from app.utils.api_deps import get_rainfall_model, get_earthquake_model
from app.config.paths import get_logger
from config import settings
@@ -57,6 +56,9 @@ def create_app() -> FastAPI:
start = time.time()
response = await call_next(request)
elapsed = time.time() - start
# 静默健康检查探针
if request.url.path == "/" and request.method == "GET":
return response
logger.info(f"{request.method} {request.url.path} -> {response.status_code} ({elapsed:.3f}s)")
return response
@@ -64,10 +66,4 @@ def create_app() -> FastAPI:
from app.api import register_routers
register_routers(application)
@application.get("/health", response_model=HealthResponse, tags=["系统"])
async def health_check():
"""健康检查"""
status = is_model_loaded()
return HealthResponse(status="ok", **status)
return application
+20 -1
View File
@@ -195,6 +195,7 @@ class DbnRepository:
COUNT(*) as record_count
FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
AND rainfall_1h IS NOT NULL
AND CAST(rainfall_1h AS DOUBLE PRECISION) > 0
GROUP BY lon, lat
@@ -259,6 +260,7 @@ class DbnRepository:
ST_SetSRID(ST_MakePoint(%s, %s), 4326)
) as dist
FROM xian_meteorology
WHERE is_delete = 0
) t
WHERE dist < 50000
ORDER BY dist
@@ -283,6 +285,7 @@ class DbnRepository:
CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall
FROM xian_meteorology
WHERE lon = %s AND lat = %s
AND is_delete = 0
AND datetime BETWEEN %s AND %s
ORDER BY datetime DESC
"""
@@ -326,7 +329,7 @@ class DbnRepository:
"""一次性加载所有气象站点坐标到内存(188个站点,约2KB)"""
if cls._cached_stations is not None:
return cls._cached_stations
sql = "SELECT DISTINCT lon, lat FROM xian_meteorology"
sql = "SELECT DISTINCT lon, lat FROM xian_meteorology WHERE is_delete = 0"
cls._cached_stations = db_helper.execute_query(sql)
logger.info(f"已缓存 {len(cls._cached_stations)} 个气象站点坐标")
return cls._cached_stations
@@ -407,6 +410,7 @@ class DbnRepository:
SELECT lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall
FROM xian_meteorology
WHERE (lon, lat) IN ({placeholders})
AND is_delete = 0
AND datetime BETWEEN %s AND %s
ORDER BY lon, lat, datetime DESC
"""
@@ -688,6 +692,21 @@ class DbnRepository:
return float(result['aspect'])
return None
@staticmethod
def get_inference_result(inference_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取推理结果
Returns:
{id, name, event_type, occurred_time, operation_type, condition, result}
"""
sql = """
SELECT id, name, event_type, occurred_time, operation_type, condition, result
FROM xian_inference_result
WHERE id = %s AND is_delete = 0
"""
return db_helper.execute_query_one(sql, (inference_id,))
@staticmethod
def save_inference_result(disaster_name: str, event_type: str, occurred_time, operation_type: str,
condition: dict, result: list) -> int:
+1 -1
View File
@@ -12,7 +12,7 @@ class QgisRepository:
sql = """
SELECT id, name, event_type, occurred_time, condition
FROM xian_inference_result
WHERE id = %s
WHERE id = %s AND is_delete = 0
"""
rows = db_helper.execute_query(sql, (inference_id,))
if not rows:
+97
View File
@@ -28,6 +28,7 @@ class RainfallRepository:
SELECT max(id) as max_id
FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
"""
result = db_helper.execute_query_one(sql, (start_time, end_time))
@@ -58,6 +59,7 @@ class RainfallRepository:
CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h
FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
ORDER BY lon, lat, datetime DESC
"""
@@ -100,5 +102,100 @@ class RainfallRepository:
return station_data
def get_district_rainfall_summary(self, query_time, region_code: Optional[str] = None) -> List[Dict[str, Any]]:
"""
按行政区聚合降雨统计(取区内最大站点值)
Args:
query_time: 查询时间
region_code: 行政区划代码,不传则返回所有区
Returns:
[{district_name, district_code, rainfall, duration_hours}, ...]
"""
start_time, end_time = time_converter.to_db_time_range(query_time, hours=72)
# 一次查询:area_code + lon + lat + rainfall_1h
sql = """
SELECT area_code, lon, lat, datetime,
CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h
FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
ORDER BY area_code, lon, lat, datetime DESC
"""
rows = db_helper.execute_query(sql, (start_time, end_time))
if not rows:
return []
# 按站点分组,计算 accum_rain + duration_hours(复用现有算法)
from itertools import groupby
station_stats: Dict[tuple, Dict[str, Any]] = {} # (area_code, lon, lat) → stats
for (area_code, lon, lat), group in groupby(rows,
key=lambda r: (r['area_code'], r['lon'], r['lat'])):
accum_rain = 0.0
duration_hours = 0
consecutive_no_rain = 0
for row in group:
rainfall = float(row['rainfall_1h']) if row['rainfall_1h'] else 0.0
if rainfall > 0:
accum_rain += rainfall
duration_hours += 1
consecutive_no_rain = 0
else:
consecutive_no_rain += 1
if consecutive_no_rain >= 3:
break
if accum_rain > 0:
duration_hours += 1
key = (area_code, lon, lat)
station_stats[key] = {
'area_code': area_code,
'accum_rain': accum_rain,
'duration_hours': duration_hours
}
# 按 area_code 聚合:取区内最大降雨站点
district_max: Dict[str, Dict[str, Any]] = {}
for key, stats in station_stats.items():
code = stats['area_code']
if region_code and code != region_code:
continue
if code not in district_max or stats['accum_rain'] > district_max[code]['rainfall']:
district_max[code] = {
'district_code': code,
'rainfall': round(stats['accum_rain'], 1),
'duration_hours': stats['duration_hours']
}
if not district_max:
return []
# 查 xian_district 获取名称
codes = list(district_max.keys())
placeholders = ', '.join(['%s'] * len(codes))
sql = f"SELECT code, name FROM xian_district WHERE code IN ({placeholders}) AND is_delete = 0"
district_rows = db_helper.execute_query(sql, tuple(codes))
code_to_name = {r['code']: r['name'] for r in district_rows}
result = []
for code, info in district_max.items():
name = code_to_name.get(code)
if name is None:
continue # 跳过 xian_district 中不存在的代码
result.append({
'district_name': name,
'district_code': code,
'rainfall': info['rainfall'],
'duration_hours': info['duration_hours']
})
# 按名称排序
result.sort(key=lambda x: x['district_name'])
return result
# 创建全局实例
rainfall_repository = RainfallRepository()
+47 -7
View File
@@ -3,7 +3,7 @@ API 请求/响应数据模型
"""
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
# ============================================================
@@ -11,19 +11,35 @@ from pydantic import BaseModel, Field
# ============================================================
class RainfallPredictRequest(BaseModel):
"""暴雨灾害链预测请求"""
"""
暴雨灾害链预测请求
参数规则(二选一):
1. 自动推演模式:rainfall、duration、region_code 全部不传 → 从气象表自动获取,不限区域
2. 指定条件模式:rainfall、duration、region_code 全部传入 → 按指定条件预测
"""
disaster_name: str = Field(min_length=1, max_length=255)
point_ids: Optional[List[int]] = Field(None, max_length=500,
description="点位ID列表,不传则查询所有点")
region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104',不传则不限区域")
rainfall: Optional[float] = Field(None, ge=0,
description="累计降雨量(mm),不传则从气象表自动获取")
duration: Optional[float] = Field(None, ge=0,
description="降雨持续时间(h),不传则从气象表自动获取")
region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104'")
rainfall: Optional[float] = Field(None, ge=0, description="累计降雨量(mm)")
duration: Optional[float] = Field(None, ge=0, description="降雨持续时间(h)")
occurred_time: Optional[datetime] = Field(None, description="事件发生时间,不传则为当前时间")
operation_type: str = Field("模拟", min_length=1, max_length=50,
description="操作类型(如 '模拟', '实时监测', '应急评估'")
@model_validator(mode='after')
def validate_mode_exclusivity(self):
"""校验 rainfall / duration / region_code 必须要么全不传,要么全传"""
trio = (self.rainfall, self.duration, self.region_code)
none_count = sum(1 for v in trio if v is None)
if none_count not in (0, 3):
raise ValueError(
"rainfall、duration、region_code 必须全部传入或全部不传,"
"不允许只传部分参数"
)
return self
# ============================================================
# 地震预测
@@ -86,3 +102,27 @@ class HealthResponse(BaseModel):
status: str = "ok"
rainfall_model_loaded: bool = False
earthquake_model_loaded: bool = False
# ============================================================
# 各区降雨概况
# ============================================================
class DistrictSummaryRequest(BaseModel):
"""各区降雨概况请求"""
inference_id: int = Field(..., ge=1, description="推理结果IDxian_inference_result.id")
class DistrictSummaryItem(BaseModel):
"""单个区的降雨概况"""
district_name: str = Field(..., description="行政区名称(如 '碑林区'")
district_code: str = Field(..., description="行政区划代码(如 '610103'")
rainfall: float = Field(..., description="累计降雨量(mm)")
duration_hours: float = Field(..., description="持续降雨时间(h)")
class DistrictSummaryResponse(BaseModel):
"""各区降雨概况响应"""
code: int = Field(200, description="状态码")
message: str = Field("success", description="提示信息")
data: Optional[List[DistrictSummaryItem]] = Field(None, description="各区降雨概况列表")
+159
View File
@@ -0,0 +1,159 @@
"""
将项目 fonts/ 目录下的中文字体安装到 Docker QGIS 容器。
QGIS 官方 Docker 镜像不包含中文字体,模板中的 SimHei/SimSun/YaHei 字体会显示为方块。
本脚本将 fonts/ 目录下的字体文件复制到容器内并刷新字体缓存。
用法:
python app/script/install_fonts_to_container.py [--container qgis-server] [--dry-run]
前置条件:
- Docker 容器已启动(docker start qgis-server
- 项目根目录下 fonts/ 目录包含所需的 .ttf/.ttc 字体文件
"""
import argparse
import subprocess
import sys
import time
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
FONTS_DIR = PROJECT_ROOT / "fonts"
# 容器内字体目录
CONTAINER_FONT_DIR = "/usr/share/fonts/truetype/winfonts"
def _run(cmd, timeout=10, **kwargs):
"""subprocess.run 封装,统一 UTF-8 编码"""
return subprocess.run(
cmd, capture_output=True, timeout=timeout,
encoding="utf-8", errors="replace", **kwargs,
)
def _load_config():
"""从 settings.toml 读取配置"""
try:
from config import settings
return {
"container": getattr(settings, "QGIS_DOCKER_CONTAINER", "qgis-server"),
}
except ImportError:
pass
cfg = {"container": "qgis-server"}
toml_path = PROJECT_ROOT / "settings.toml"
if toml_path.exists():
for line in toml_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line.startswith("QGIS_DOCKER_CONTAINER") and "=" in line:
cfg["container"] = line.split("=", 1)[1].strip().strip('"').strip("'")
return cfg
def _check_container(container: str):
"""检查容器是否运行"""
result = _run(["docker", "inspect", "--format={{.State.Running}}", container], timeout=5)
if (result.stdout or "").strip() != "true":
raise RuntimeError(f"容器 {container} 未运行,请先 docker start {container}")
def install_fonts(container: str = None, dry_run: bool = False):
"""将 fonts/ 目录下的字体安装到容器"""
cfg = _load_config()
if container is None:
container = cfg["container"]
_check_container(container)
# 扫描字体文件
if not FONTS_DIR.is_dir():
print(f"字体目录不存在: {FONTS_DIR}")
print(f"请先在项目根目录下创建 fonts/ 目录,并放入 .ttf/.ttc 字体文件。")
print(f"Windows 字体路径: C:\\Windows\\Fonts\\")
print(f" simhei.ttf — 黑体(模板默认字体)")
print(f" simsun.ttc — 宋体")
print(f" msyh.ttc — 微软雅黑")
print(f" msyhbd.ttc — 微软雅黑粗体")
sys.exit(1)
font_files = [f for f in FONTS_DIR.iterdir()
if f.is_file() and f.suffix.lower() in (".ttf", ".ttc", ".otf")]
if not font_files:
print(f"字体目录为空或无字体文件: {FONTS_DIR}")
print(f"请放入 .ttf/.ttc/.otf 字体文件后重试。")
sys.exit(1)
print(f"=== 安装中文字体到 Docker 容器 {container} ===\n")
print(f" 字体目录: {FONTS_DIR}")
print(f" 字体文件: {len(font_files)}")
for f in sorted(font_files):
size_kb = f.stat().st_size / 1024
print(f" - {f.name} ({size_kb:.0f} KB)")
print(f" 容器目标: {container}:{CONTAINER_FONT_DIR}")
print()
if dry_run:
print(" [dry-run] 跳过安装")
return
# 1. 在容器内创建字体目录
t0 = time.time()
_run(["docker", "exec", container, "mkdir", "-p", CONTAINER_FONT_DIR])
# 2. 逐个复制字体文件到容器
print(" [1/3] 复制字体文件...")
for f in sorted(font_files):
result = _run(
["docker", "cp", str(f), f"{container}:{CONTAINER_FONT_DIR}/{f.name}"],
timeout=30,
)
if result.returncode != 0:
print(f" FAIL {f.name}: {result.stderr.strip()}")
else:
print(f" OK {f.name}")
# 3. 刷新字体缓存
print("\n [2/3] 刷新字体缓存...")
result = _run(["docker", "exec", container, "fc-cache", "-fv"], timeout=30)
if result.returncode != 0:
print(f" WARN fc-cache 输出: {result.stderr.strip()}")
else:
print(" OK")
# 4. 验证字体
print("\n [3/3] 验证字体...")
verify_script = (
"from PyQt5.QtGui import QFontDatabase; "
"db = QFontDatabase(); "
"zh = [f for f in db.families() if any(k in f for k in "
"['SimHei','YaHei','SimSun','WenQuanYi','Noto Sans CJK'])]; "
"print('中文字体:', zh if zh else '未安装!')"
)
result = _run(
["docker", "exec", container, "python3", "-c", verify_script],
timeout=10,
)
print(f" {(result.stdout or '').strip()}")
elapsed = time.time() - t0
print(f"\n=== 完成,耗时 {elapsed:.1f}s ===")
def main():
parser = argparse.ArgumentParser(description="将中文字体安装到 Docker QGIS 容器")
parser.add_argument("--container", default=None, help="Docker 容器名称")
parser.add_argument("--dry-run", action="store_true", help="仅显示信息,不实际安装")
args = parser.parse_args()
try:
install_fonts(container=args.container, dry_run=args.dry_run)
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+12 -5
View File
@@ -172,6 +172,8 @@ def map_template_to_container(host_path: str) -> str:
主机: F:/project/xian/xian_algorithm_new/app/data/template/rainfall/xxx.qgz
容器: /data/template/rainfall/xxx.qgz
已为容器路径时直接返回(幂等)。
"""
try:
from config import settings
@@ -179,16 +181,21 @@ def map_template_to_container(host_path: str) -> str:
except Exception:
container_tpl = ""
# 已经是容器路径,直接返回
normalized = host_path.replace("\\", "/")
if container_tpl and normalized.startswith(container_tpl.rstrip("/") + "/"):
return normalized
if not container_tpl:
# fallback: 用通用映射(走 9P,慢)
return map_host_to_container(host_path)
normalized = host_path.replace("\\", "/").lower()
# 找 "app/data/template/" 后面的部分
marker = "app/data/template/"
idx = normalized.find(marker)
normalized_lower = normalized.lower()
# 找模板子目录后面的相对路径(主机端 marker 从配置读取)
marker = getattr(settings, "QGIS_HOST_TEMPLATE_SUBDIR", "app/data/template").rstrip("/") + "/"
idx = normalized_lower.find(marker.lower())
if idx >= 0:
relative = host_path.replace("\\", "/")[idx + len(marker):]
relative = normalized[idx + len(marker):]
return f"{container_tpl.rstrip('/')}/{relative}"
return map_host_to_container(host_path)
+324
View File
@@ -0,0 +1,324 @@
"""
QGIS Worker 进程池管理器 — 主机侧运行。
管理 N 个长驻 Docker Worker 进程,提供任务提交和结果回收。
架构:
主机 FastAPI ←→ qgis_pool (本模块) ←→ docker exec -i ←→ qgis_worker.py (容器内)
用法:
from app.services.qgis.qgis_pool import qgis_pool
# 提交任务(阻塞等待完成)
results, summary = qgis_pool.submit_job(config, models, progress_callback)
# 关闭池
qgis_pool.shutdown()
"""
import json
import subprocess
import threading
import time
from typing import Callable, Dict, List, Optional, Tuple
from app.config.paths import get_logger
logger = get_logger("qgis.pool")
class _WorkerHandle:
"""单个 Worker 进程的句柄"""
def __init__(self, worker_id: int, proc: subprocess.Popen):
self.worker_id = worker_id
self.proc = proc
self.busy = False
self.lock = threading.Lock()
self._reader_thread: Optional[threading.Thread] = None
def is_alive(self) -> bool:
return self.proc.poll() is None
def mark_busy(self):
with self.lock:
self.busy = True
def mark_free(self):
with self.lock:
self.busy = False
def is_free(self) -> bool:
with self.lock:
return not self.busy and self.is_alive()
class QgisPool:
"""QGIS Worker 进程池"""
def __init__(self, pool_size: int = None):
"""
Args:
pool_size: 池中 Worker 数量,默认从配置读取
"""
if pool_size is None:
try:
from config import settings
pool_size = getattr(settings, "QGIS_POOL_SIZE", 2)
except Exception:
pool_size = 2
self._pool_size = pool_size
self._workers: List[_WorkerHandle] = []
self._lock = threading.Lock()
self._started = False
def _ensure_started(self):
"""懒启动:首次使用时启动 Worker 进程"""
if self._started:
return
with self._lock:
if self._started:
return
self._start_workers()
self._started = True
def _start_workers(self):
"""启动所有 Worker 进程"""
from app.services.qgis.qgis_env import (
get_docker_container,
get_docker_project_dir,
get_container_python_path,
)
container = get_docker_container()
project_dir = get_docker_project_dir()
python_path = get_container_python_path()
worker_script = f"{project_dir}/app/services/qgis/qgis_worker.py"
# Qt 平台设置
try:
from config import settings
qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen")
except Exception:
qt_platform = "offscreen"
for i in range(self._pool_size):
cmd = [
"docker", "exec", "-i",
"-w", project_dir,
"-e", f"QT_QPA_PLATFORM={qt_platform}",
container,
python_path, worker_script,
]
logger.info(f"[Pool] 启动 Worker-{i}: {' '.join(cmd)}")
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
worker = _WorkerHandle(i, proc)
self._workers.append(worker)
# 后台线程消费 stderr,防止管道满阻塞
t = threading.Thread(
target=self._drain_stderr, args=(worker,), daemon=True
)
t.start()
logger.info(f"[Pool] 已启动 {self._pool_size} 个 Worker")
@staticmethod
def _drain_stderr(worker: _WorkerHandle):
"""持续读取 Worker stderr 防止管道缓冲区满"""
try:
for line in worker.proc.stderr:
line = line.strip()
if line:
logger.info(f"[Worker-{worker.worker_id}] {line}")
except Exception:
pass
def _find_free_worker(self, timeout: float = 30) -> Optional[_WorkerHandle]:
"""找到一个空闲 Worker,超时返回 None"""
deadline = time.time() + timeout
while time.time() < deadline:
with self._lock:
for w in self._workers:
if w.is_free():
return w
time.sleep(0.1)
return None
def submit_job(
self,
config: dict,
models: list,
progress_callback: Callable[[dict], None] = None,
) -> Tuple[list, dict]:
"""
提交一个批次任务,阻塞等待完成。
Args:
config: QGIS 配置字典
models: 模型参数列表
progress_callback: 每张图完成时的回调(接收 PROGRESS dict
Returns:
(results_list, summary_dict)
"""
self._ensure_started()
worker = self._find_free_worker(timeout=60)
if worker is None:
raise RuntimeError("[Pool] 无可用 Worker(等待 60s 超时)")
worker.mark_busy()
try:
return self._run_job(worker, config, models, progress_callback)
finally:
# 检查 Worker 是否还活着,死了就重启
if not worker.is_alive():
logger.error(f"[Pool] Worker-{worker.worker_id} 已死亡,重启...")
self._restart_worker(worker)
worker.mark_free()
def _run_job(
self,
worker: _WorkerHandle,
config: dict,
models: list,
progress_callback: Callable[[dict], None] = None,
) -> Tuple[list, dict]:
"""向指定 Worker 发送任务并读取结果"""
job = {"config": config, "models": models}
job_json = json.dumps(job, ensure_ascii=False) + "\n"
# 写入 stdin
try:
worker.proc.stdin.write(job_json)
worker.proc.stdin.flush()
except (BrokenPipeError, OSError) as e:
raise RuntimeError(f"[Pool] 写入 Worker-{worker.worker_id} stdin 失败: {e}")
# 读取输出(直到 JOB_DONE 或 FATAL
results = []
summary = {}
while True:
line = worker.proc.stdout.readline()
if not line:
# stdout 关闭,Worker 可能已死亡
logger.error(f"[Pool] Worker-{worker.worker_id} stdout 关闭")
summary = {"error": "Worker stdout 关闭"}
break
line = line.strip()
if not line:
continue
if line.startswith("PROGRESS:"):
try:
r = json.loads(line[len("PROGRESS:"):])
results.append(r)
if progress_callback:
progress_callback(r)
except json.JSONDecodeError:
logger.warning(f"[Pool] Worker-{worker.worker_id} PROGRESS 解析失败: {line[:200]}")
elif line.startswith("JOB_DONE:"):
try:
summary = json.loads(line[len("JOB_DONE:"):])
except json.JSONDecodeError:
summary = {"error": f"JOB_DONE 解析失败: {line[:200]}"}
break
elif line.startswith("FATAL:"):
try:
summary = json.loads(line[len("FATAL:"):])
except json.JSONDecodeError:
summary = {"error": f"FATAL: {line[:200]}"}
logger.error(f"[Pool] Worker-{worker.worker_id} FATAL: {summary}")
break
else:
logger.debug(f"[Pool] Worker-{worker.worker_id} 未知输出: {line[:200]}")
return results, summary
def _restart_worker(self, dead_worker: _WorkerHandle):
"""重启一个死亡的 Worker"""
from app.services.qgis.qgis_env import (
get_docker_container,
get_docker_project_dir,
get_container_python_path,
)
container = get_docker_container()
project_dir = get_docker_project_dir()
python_path = get_container_python_path()
worker_script = f"{project_dir}/app/services/qgis/qgis_worker.py"
try:
from config import settings
qt_platform = getattr(settings, "QGIS_DOCKER_QT_PLATFORM", "offscreen")
except Exception:
qt_platform = "offscreen"
cmd = [
"docker", "exec", "-i",
"-w", project_dir,
"-e", f"QT_QPA_PLATFORM={qt_platform}",
container,
python_path, worker_script,
]
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
dead_worker.proc = proc
dead_worker.busy = False
# 重启 stderr 消费线程
t = threading.Thread(
target=self._drain_stderr, args=(dead_worker,), daemon=True
)
t.start()
logger.info(f"[Pool] Worker-{dead_worker.worker_id} 已重启")
def shutdown(self):
"""关闭所有 Worker"""
with self._lock:
for w in self._workers:
if w.proc.poll() is None:
try:
w.proc.stdin.close()
except Exception:
pass
try:
w.proc.terminate()
w.proc.wait(timeout=5)
except Exception:
try:
w.proc.kill()
except Exception:
pass
self._workers.clear()
self._started = False
logger.info("[Pool] 已关闭")
# 全局单例
qgis_pool = QgisPool()
+198
View File
@@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""
QGIS 长驻 Worker — Docker 容器内运行,常驻进程模式。
与 qgis_runner.py 的区别:
- qgis_runner.py: 一次性进程,处理完一批任务后退出
- qgis_worker.py: 长驻进程,初始化 QGIS 一次,循环从 stdin 读取任务处理
通信协议(stdin/stdout JSON Lines):
输入: 每行一个 JSON 任务 {"config": {...}, "models": [{...}, ...]}
输出: 每张图一行 PROGRESS PROGRESS:{"name": "...", "output": "..."}
任务结束一行 JOB_DONE:{"total": N, "ok": M, "fail": K, "elapsed": T}
致命错误 FATAL:{"error": "..."}
用法:
docker exec -i qgis-server python3 /app/app/services/qgis/qgis_worker.py
"""
import json
import os
import sys
import time
# ============================================================
# 环境初始化(与 qgis_runner.py 保持一致)
# ============================================================
def _setup_python_path():
"""将项目根目录和 QGIS dist-packages 加入 sys.path"""
project_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
if project_root not in sys.path:
sys.path.insert(0, project_root)
try:
from config import settings
pythonpath = getattr(settings, "QGIS_DOCKER_PYTHONPATH", None) or []
except Exception:
pythonpath = []
for p in pythonpath:
if os.path.isdir(p) and p not in sys.path:
sys.path.insert(0, p)
def _setup_environment():
"""设置 QGIS 运行所需的环境变量"""
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
def _init_qgis():
"""初始化 QgsApplication"""
from qgis.core import QgsApplication
try:
from config import settings
prefix_path = getattr(settings, "QGIS_DOCKER_PREFIX_PATH", "/usr")
except Exception:
prefix_path = "/usr"
QgsApplication.setPrefixPath(prefix_path, True)
qgs_app = QgsApplication([], False)
qgs_app.initQgis()
return qgs_app
# ============================================================
# 单任务处理
# ============================================================
def _emit(line: str):
"""输出一行到 stdout"""
print(line, flush=True)
def _emit_progress(result: dict):
"""输出 PROGRESS 行"""
_emit(f"PROGRESS:{json.dumps(result, ensure_ascii=False)}")
def _process_job(service, job: dict) -> dict:
"""
处理一个批次任务(多个模板),返回汇总结果。
每张图输出一行 PROGRESS,最后返回汇总 dict。
"""
models = job.get("models") or []
if not models:
return {"total": 0, "ok": 0, "fail": 0, "elapsed": 0}
ok = 0
fail = 0
t_start = time.time()
for i, model in enumerate(models):
t_model = time.time()
try:
name = service.generate(model)
result = {"name": name, "output": model["outFile"]}
_emit_progress(result)
ok += 1
elapsed = time.time() - t_model
print(
f"[worker] [{i+1}/{len(models)}] 完成: {name}, 耗时 {elapsed:.1f}s",
file=sys.stderr,
)
except Exception as e:
elapsed = time.time() - t_model
error_msg = f"{e}"
fail_result = {"name": model.get("name", ""), "output": "", "error": error_msg}
_emit_progress(fail_result)
fail += 1
print(
f"[worker] [{i+1}/{len(models)}] 失败: {model.get('name', '?')}, "
f"耗时 {elapsed:.1f}s — {error_msg}",
file=sys.stderr,
)
total_elapsed = time.time() - t_start
summary = {"total": len(models), "ok": ok, "fail": fail, "elapsed": round(total_elapsed, 1)}
print(
f"[worker] 批次完成: {ok}成功/{fail}失败, 总耗时 {total_elapsed:.1f}s",
file=sys.stderr,
)
return summary
# ============================================================
# 主循环
# ============================================================
def main():
_setup_environment()
_setup_python_path()
print("[worker] 正在初始化 QGIS...", file=sys.stderr)
t_init = time.time()
qgs_app = _init_qgis()
print(f"[worker] QGIS 初始化完成 ({time.time() - t_init:.1f}s),等待任务...", file=sys.stderr)
try:
from app.services.qgis.map_service import MapService
# 持有 MapService 实例(config 在第一个任务时设置)
service = None
for line_no, line in enumerate(sys.stdin, 1):
line = line.strip()
if not line:
continue
try:
job = json.loads(line)
except json.JSONDecodeError as e:
print(f"[worker] 第{line_no}行 JSON 解析失败: {e}", file=sys.stderr)
_emit(f'JOB_DONE:{{"error": "JSON解析失败: {e}"}}')
continue
config = job.get("config")
if config:
# 更新 config(每个任务可以带不同的 config)
service = MapService(config)
if service is None:
_emit(f'JOB_DONE:{{"error": "未提供 config"}}')
continue
summary = _process_job(service, job)
_emit(f"JOB_DONE:{json.dumps(summary, ensure_ascii=False)}")
# 清空项目,恢复到初始状态,等待下一个任务
from qgis.core import QgsProject
QgsProject.instance().clear()
print("[worker] 项目已清空,等待下一个任务", file=sys.stderr)
except KeyboardInterrupt:
print("[worker] 收到中断信号,退出", file=sys.stderr)
except Exception as e:
print(f"[worker] 致命错误: {e}", file=sys.stderr)
_emit(f'FATAL:{{"error": "{e}"}}')
finally:
qgs_app.exitQgis()
print("[worker] QGIS 已退出", file=sys.stderr)
if __name__ == "__main__":
try:
main()
except Exception as e:
# 捕获 main() 未处理的异常,确保输出到 stderr
import traceback
print(f"[worker] 未捕获异常: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
sys.exit(1)
+20
View File
@@ -23,6 +23,19 @@ class RedisHelper:
'socket_timeout': 5, # 读写超时时间(秒)
}
self._client: Optional[redis.Redis] = None
self._logged_config = False # 避免重复打印配置
def _log_config_once(self):
"""首次连接失败时打印配置信息,便于排查"""
if not self._logged_config:
from app.utils.logger import get_logger
_logger = get_logger("redis")
_logger.warning(
f"Redis 连接配置: host={self.redis_config['host']}, "
f"port={self.redis_config['port']}, db={self.redis_config['db']}, "
f"password={'***' if self.redis_config['password'] else 'None'}"
)
self._logged_config = True
@property
def client(self) -> redis.Redis:
@@ -32,8 +45,15 @@ class RedisHelper:
self._client = redis.Redis(**self.redis_config)
# 测试连接
self._client.ping()
except redis.AuthenticationError as e:
self._log_config_once()
raise ConnectionError(f"Redis 认证失败(密码错误): {e}")
except redis.ConnectionError as e:
self._log_config_once()
raise ConnectionError(f"无法连接到 Redis 服务器: {e}")
except Exception as e:
self._log_config_once()
raise ConnectionError(f"Redis 连接异常: {e}")
return self._client
def close(self):
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
+10 -4
View File
@@ -12,6 +12,8 @@ PREDICT_PROBABILITY_THRESHOLD = 50
QGIS_GPKG_DIR = "app/data/gpkg"
# 容器内 GPKG 本地路径
QGIS_DOCKER_GPKG_DIR = "/data/gpkg"
# 模板目录(相对于项目根,用于路径映射时的 marker 匹配)
QGIS_HOST_TEMPLATE_SUBDIR = "app/data/template"
# 容器内模板本地路径
QGIS_DOCKER_TEMPLATE_DIR = "/data/template"
# 专题图输出子目录
@@ -23,10 +25,8 @@ QGIS_DEFAULTS_ZOOM_VALUE = "5"
QGIS_DEFAULTS_MAP_UNIT = "制图单位:西安市应急管理局"
# 专题图DPI
QGIS_EXPORT_DPI = 200
# 并行 docker exec 子进程数
QGIS_PARALLEL_WORKERS = 4
# 最大并发请求数
QGIS_MAX_CONCURRENT = 2
# Worker 进程池大小
QGIS_POOL_SIZE = 4
# ============================================================
# Docker QGIS 配置
# ============================================================
@@ -36,6 +36,12 @@ QGIS_DOCKER_CONTAINER = "qgis-server"
QGIS_DOCKER_PROJECT_DIR = "/app"
# 容器内 Python 解释器路径
QGIS_DOCKER_PYTHON = "/usr/bin/python3"
# QGIS 安装根目录
QGIS_DOCKER_PREFIX_PATH = "/usr"
# QGIS Python 包路径(官方镜像为空列表)
QGIS_DOCKER_PYTHONPATH = []
# Qt 无头渲染模式
QGIS_DOCKER_QT_PLATFORM = "offscreen"
# Docker 镜像名称
QGIS_DOCKER_IMAGE = "qgis/qgis:3.44.11"
# 优先产出模板
Binary file not shown.