7 Commits

Author SHA1 Message Date
wzy-warehouse b1191d39aa 进行路径计算 2026-06-29 11:38:48 +08:00
wzy-warehouse e9169dfd13 进行路径计算 2026-06-29 11:37:04 +08:00
wzy-warehouse 1f01ceb062 添加降雨量与持续时间接口 2026-06-28 09:26:13 +08:00
wzy-warehouse 54f7300557 扫描QGIS模板 2026-06-27 10:51:13 +08:00
wzy-warehouse 1b08f2c4a2 添加中文字体 2026-06-24 15:52:40 +08:00
wzy-warehouse 7163ca67f9 修改适配Linux 2026-06-24 14:16:23 +08:00
wzy-warehouse e5582bab5d 修改适配Linux 2026-06-24 13:03:15 +08:00
27 changed files with 1342 additions and 117 deletions
+84 -45
View File
@@ -32,15 +32,11 @@ docker load -i qgis-34411.tar
```bash ```bash
# ---- Linux / macOS ---- # ---- Linux / macOS ----
PROJECT_ROOT=/home/xian/xian_algorithm_new
FILE_STORE=/data
docker run -d \ docker run -d \
--name qgis-server \ --name qgis-server \
--restart unless-stopped \ --restart unless-stopped \
-v "${PROJECT_ROOT}:/app:ro" \ -v "/www/wwwroot/xian_algorithm_new:/app:ro" \
-v "${FILE_STORE}:${FILE_STORE}" \ -v "/www/wwwroot/xian_algorithm_new/files:/files" \
qgis/qgis:3.44.11 \ qgis/qgis:3.44.11 \
sleep infinity sleep infinity
@@ -124,60 +120,103 @@ docker exec qgis-server ls -la /data/template/earthquake
- 容器重建后(`/data` 目录丢失) - 容器重建后(`/data` 目录丢失)
- 首次部署时 - 首次部署时
## 5. 安装中文字体(手动) ## 5. 安装中文字体(手动,必须执行
QGIS 模板使用了 SimHei(黑体)、SimSun(宋体)、Microsoft YaHei(微软雅黑)等 Windows 中文字体, QGIS 模板使用了 SimHei(黑体)、SimSun(宋体)、Microsoft YaHei(微软雅黑)等 Windows 中文字体,
Docker 镜像默认不包含这些字体,会导致中文全部乱码。**字体需手动安装,代码不会自动安装。** Docker 镜像默认不包含这些字体,会导致中文全部乱码。**字体需手动安装,代码不会自动安装。**
### 步骤 ### 5.1 准备字体文件
字体文件存放在项目根目录的 `fonts/` 目录下,已预置 4 个常用中文字体:
```
fonts/
├── simhei.ttf — 黑体(模板默认字体)
├── simsun.ttc — 宋体
├── msyh.ttc — 微软雅黑
└── msyhbd.ttc — 微软雅黑粗体
```
如果字体缺失,从 Windows 主机复制(`C:\Windows\Fonts\`):
```bash ```bash
# 1. 创建字体目录 # Linux 服务器用 SCP 从 Windows 传
docker exec qgis-server mkdir -p /usr/share/fonts/truetype/winfonts scp "C:\Windows\Fonts\simhei.ttf" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\simsun.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\msyh.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
scp "C:\Windows\Fonts\msyhbd.ttc" root@服务器IP:/www/wwwroot/xian_algorithm_new/fonts/
```
# 2. 从主机复制 Windows 中文字体 ### 5.2 一键安装到容器
# Windows 路径(根据实际字体文件位置调整):
docker cp "C:\Windows\Fonts\simhei.ttf" qgis-server:/usr/share/fonts/truetype/winfonts/
docker cp "C:\Windows\Fonts\simsun.ttc" qgis-server:/usr/share/fonts/truetype/winfonts/
docker cp "C:\Windows\Fonts\msyh.ttc" qgis-server:/usr/share/fonts/truetype/winfonts/
docker cp "C:\Windows\Fonts\msyhbd.ttc" qgis-server:/usr/share/fonts/truetype/winfonts/
# Linux 字体路径示例: ```bash
# docker cp /usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf qgis-server:/usr/share/fonts/truetype/winfonts/ python app/script/install_fonts_to_container.py
```
# 3. 刷新字体缓存 输出示例:
```
=== 安装中文字体到 Docker 容器 qgis-server ===
字体目录: /www/wwwroot/xian_algorithm_new/fonts
字体文件: 4 个
- msyh.ttc (4165 KB)
- msyhbd.ttc (4167 KB)
- simhei.ttf (2637 KB)
- simsun.ttc (10126 KB)
容器目标: qgis-server:/usr/share/fonts/truetype/winfonts
[1/3] 复制字体文件...
OK msyh.ttc
OK msyhbd.ttc
OK simhei.ttf
OK simsun.ttc
[2/3] 刷新字体缓存...
OK
[3/3] 验证字体...
中文字体: ['SimHei', 'SimSun', 'Microsoft YaHei', 'Microsoft YaHei UI']
=== 完成,耗时 3.2s ===
```
其他用法:
```bash
python app/script/install_fonts_to_container.py --dry-run # 仅查看信息
python app/script/install_fonts_to_container.py --container my # 指定容器名
```
### 5.3 持久化(推荐)
容器重建后字体丢失,**强烈建议挂载 `fonts/` 目录**。
```bash
# 停掉旧容器
docker stop qgis-server && docker rm qgis-server
# 重建容器,加上字体挂载
docker run -d \
--name qgis-server \
--restart unless-stopped \
-v "/www/wwwroot/xian_algorithm_new:/app:ro" \
-v "/www/wwwroot/xian_algorithm_new/files:/files" \
-v "/www/wwwroot/xian_algorithm_new/fonts:/usr/share/fonts/truetype/winfonts:ro" \
qgis/qgis:3.44.11 \
sleep infinity
# 挂载后只需刷新一次缓存
docker exec qgis-server fc-cache -fv docker exec qgis-server fc-cache -fv
# 4. 验证字体已识别
docker exec qgis-server python3 -c "
from PyQt5.QtGui import QFontDatabase
db = QFontDatabase()
zh = [f for f in db.families() if 'SimHei' in f or 'YaHei' in f or 'SimSun' in f]
print('中文字体:', zh)
"
``` ```
### 持久化方案 ### 5.4 无法获取 Windows 字体时的替代方案
容器重建后字体丢失。可选方案:
**方案 A:挂载单个字体文件**
```bash ```bash
docker run -d ... \ # Linux 服务器安装开源中文字体
-v "C:\Windows\Fonts\simhei.ttf:/usr/share/fonts/truetype/winfonts/simhei.ttf" \ yum install wqy-microhei-fonts # CentOS / RHEL
-v "C:\Windows\Fonts\simsun.ttc:/usr/share/fonts/truetype/winfonts/simsun.ttc" \ apt install fonts-wqy-microhei # Debian / Ubuntu
...
```
**方案 B:挂载整个字体目录(推荐)** # 将系统字体复制到项目 fonts/ 目录
```bash cp /usr/share/fonts/wqy-microhei/wqy-microhei.ttc /www/wwwroot/xian_algorithm_new/fonts/
# 先在主机创建字体目录,放入所需字体文件
mkdir -p /opt/qgis-fonts
cp /usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf /opt/qgis-fonts/
docker run -d ... \
-v "/opt/qgis-fonts:/usr/share/fonts/truetype/winfonts:ro" \
...
``` ```
## 6. 验证容器 ## 6. 验证容器
+1 -1
View File
@@ -134,7 +134,7 @@ async def predict_earthquake(req: EarthquakePredictRequest):
"point_ids": req.point_ids, "point_ids": req.point_ids,
"region_code": req.region_code, "region_code": req.region_code,
"magnitude": req.magnitude, "magnitude": req.magnitude,
"depth": req.depth, # 已有默认值 10.0 "depth": req.depth,
"epicenter_lon": req.epicenter_lon, "epicenter_lon": req.epicenter_lon,
"epicenter_lat": req.epicenter_lat, "epicenter_lat": req.epicenter_lat,
"occurred_time": occurred_time.isoformat() if hasattr(occurred_time, 'isoformat') else str(occurred_time) "occurred_time": occurred_time.isoformat() if hasattr(occurred_time, 'isoformat') else str(occurred_time)
+49 -13
View File
@@ -154,23 +154,51 @@ def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
# 构建 QGIS 服务配置字典 # 构建 QGIS 服务配置字典
# ============================================================ # ============================================================
def _build_qgis_config(batch_folder: str) -> dict: def _check_docker_running() -> bool:
"""构建 QGIS 服务配置(含批次输出目录)""" """检测 Docker 容器是否在运行"""
from app.services.qgis.qgis_env import ( from app.services.qgis.qgis_env import get_docker_container
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
gpkg_dir = get_gpkg_dir()
# Docker 模式:config 中的路径必须是容器内路径(模板修改器在容器内运行)
try: try:
result = subprocess.run( result = subprocess.run(
["docker", "inspect", "--format={{.State.Running}}", get_docker_container()], ["docker", "inspect", "--format={{.State.Running}}", get_docker_container()],
capture_output=True, text=True, timeout=5, capture_output=True, text=True, timeout=5,
) )
is_docker = result.stdout.strip() == "true" return result.stdout.strip() == "true"
except Exception: except Exception:
is_docker = False return False
def _list_container_templates(container: str, event_type: str) -> list:
"""Docker 模式下:扫描容器内模板目录,返回模板文件名列表"""
from app.services.qgis.qgis_env import get_docker_container
container_tpl_dir = getattr(settings, "QGIS_DOCKER_TEMPLATE_DIR", "") or "/data/template"
container_path = f"{container_tpl_dir.rstrip('/')}/{event_type}"
try:
result = subprocess.run(
["docker", "exec", container, "ls", container_path],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
logger.error(f"[Docker] 列出容器模板失败: {result.stderr.strip()}")
return []
files = [
f.strip() for f in result.stdout.splitlines()
if f.strip().endswith(".qgz") and not f.strip().startswith("tmp")
]
logger.info(f"[Docker] 容器内模板扫描: {container_path}{len(files)} 个模板")
return sorted(files)
except Exception as e:
logger.error(f"[Docker] 列出容器模板异常: {e}")
return []
def _build_qgis_config(batch_folder: str) -> dict:
"""构建 QGIS 服务配置(含批次输出目录和 Docker 模式标志)"""
from app.services.qgis.qgis_env import (
get_docker_container, get_host_file_store, get_container_file_store,
get_docker_project_dir,
)
gpkg_dir = get_gpkg_dir()
is_docker = _check_docker_running()
if is_docker: if is_docker:
# GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P) # GPKG 目录:优先使用容器内本地路径(预拷贝后绕过 WSL2 9P)
@@ -206,6 +234,7 @@ def _build_qgis_config(batch_folder: str) -> dict:
}, },
"static_layers": build_static_layers_config(gpkg_dir), "static_layers": build_static_layers_config(gpkg_dir),
"batch_folder": batch_folder, "batch_folder": batch_folder,
"is_docker": is_docker,
} }
@@ -242,9 +271,16 @@ def _background_export(inference_id: int) -> None:
config = _build_qgis_config(batch_folder) config = _build_qgis_config(batch_folder)
# 2. 扫描模板 # 2. 扫描模板
is_docker = config.get("is_docker", False)
if is_docker:
container_tpl = getattr(settings, "QGIS_DOCKER_TEMPLATE_DIR", "") or "/data/template"
template_dir = f"{container_tpl.rstrip('/')}/{event_type}"
template_files = _list_container_templates(get_docker_container(), event_type)
else:
tpl_subdir = getattr(settings, "QGIS_HOST_TEMPLATE_SUBDIR", "app/data/template")
template_base = os.path.join( template_base = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"app", "data", "template" *tpl_subdir.replace("\\", "/").split("/")
) )
template_dir = os.path.join(template_base, event_type) template_dir = os.path.join(template_base, event_type)
template_files = sorted([ template_files = sorted([
@@ -316,7 +352,7 @@ def _background_export(inference_id: int) -> None:
status = "FAIL" if "error" in r else "OK" status = "FAIL" if "error" in r else "OK"
logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}") logger.info(f"[Pool] {status} {r.get('name', '?')}: {r.get('error', r.get('output', ''))[:100]}")
if inference_id and file_store and "error" not in r: if inference_id and file_store and "error" not in r:
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock) _write_single_path(inference_id, r["output"], file_store, db_lock)
results, summary = qgis_pool.submit_job(config, container_models, _on_progress) results, summary = qgis_pool.submit_job(config, container_models, _on_progress)
+74 -6
View File
@@ -7,11 +7,16 @@ from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictData, UpdateMonitoringTimeRequest from app.schemas.api_schemas import (
RainfallPredictRequest, PredictResponse, PredictData,
UpdateMonitoringTimeRequest, DistrictSummaryRequest, DistrictSummaryItem
)
from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore
from app.repositories.dbn_repository import dbn_repository from app.repositories.dbn_repository import dbn_repository
from app.repositories.rainfall_repository import rainfall_repository as rain_repo
from app.core.rainfall_manager import rainfall_manager from app.core.rainfall_manager import rainfall_manager
from app.config.paths import get_logger from app.config.paths import get_logger
from app.utils.db_helper import db_helper
from app.utils.time_converter import TimeConverter from app.utils.time_converter import TimeConverter
router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"]) router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"])
@@ -141,13 +146,17 @@ async def update_monitoring_time(req: UpdateMonitoringTimeRequest):
@router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测") @router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测")
async def predict_rainfall(req: RainfallPredictRequest): async def predict_rainfall(req: RainfallPredictRequest):
""" """
根据降雨量和持续时间,批量预测隐患点/风险点的灾害概率。 批量预测隐患点/风险点的灾害概率。两种模式(二选一):
**自动推演模式**rainfall、duration、region_code 全部不传
→ 从气象表自动获取各点最近站降雨数据,不限区域
**指定条件模式**rainfall、duration、region_code 全部传入
→ 按指定降雨条件和区域预测
- **disaster_name**: 灾害名称 - **disaster_name**: 灾害名称
- **point_ids**: 点位ID列表(可选,不传则查询所有点 - **point_ids**: 点位ID列表(可选)
- **region_code**: 行政区划代码(可选,不传则不限区域 - **occurred_time**: 事件发生时间(可选,不传则为当前时间
- **rainfall**: 累计降雨量(mm),不传则从气象表自动获取
- **duration**: 降雨持续时间(h),不传则从气象表自动获取
- **operation_type**: 操作类型(如 '实时监测', '情景模拟', '应急评估' - **operation_type**: 操作类型(如 '实时监测', '情景模拟', '应急评估'
""" """
semaphore = get_prediction_semaphore() semaphore = get_prediction_semaphore()
@@ -180,3 +189,62 @@ async def predict_rainfall(req: RainfallPredictRequest):
logger.error(f"保存推理结果失败: {e}", exc_info=True) logger.error(f"保存推理结果失败: {e}", exc_info=True)
return PredictResponse(code=200, message="success", data=PredictData(record_id=record_id, list=result_map_with_location)) return PredictResponse(code=200, message="success", data=PredictData(record_id=record_id, list=result_map_with_location))
@router.post("/district-summary", summary="获取各区降雨概况")
async def district_summary(req: DistrictSummaryRequest):
"""
根据推理结果ID获取各行政区的降雨概况(用于报告生成)。
逻辑:
- 如果预测时传了 rainfall → 直接用 condition 中的值
- 如果预测时没传 rainfall → 从 xian_meteorology 按区聚合实际气象数据
- **inference_id**: 推理结果ID
"""
# 1. 查推理结果
record = dbn_repository.get_inference_result(req.inference_id)
if not record:
raise HTTPException(status_code=404, detail=f"推理结果不存在: {req.inference_id}")
condition = record.get('condition') or {}
occurred_time = record.get('occurred_time')
# 2. 如果传了 rainfall → 直接用 condition 中的值
if condition.get('rainfall') is not None:
region_code = condition.get('region_code')
rainfall_val = float(condition['rainfall'])
duration_val = float(condition.get('duration', 0))
if region_code:
# 查 district 名称
district_row = db_helper.execute_query_one(
"SELECT name FROM xian_district WHERE code = %s AND is_delete = 0", (region_code,)
)
district_name = district_row['name'] if district_row else region_code
items = [{
'district_name': district_name,
'district_code': region_code,
'rainfall': rainfall_val,
'duration_hours': duration_val
}]
else:
items = [{
'district_name': '全市',
'district_code': '',
'rainfall': rainfall_val,
'duration_hours': duration_val
}]
else:
# 3. 自动推演模式 → 从气象表按区聚合
items = rain_repo.get_district_rainfall_summary(occurred_time)
if not items:
return {"code": 200, "message": "success", "data": []}
# 转换为响应格式
result_items = [
DistrictSummaryItem(**item).model_dump()
for item in items
]
return {"code": 200, "message": "success", "data": result_items}
+1
View File
@@ -0,0 +1 @@
# hazard 配置模块
+83
View File
@@ -0,0 +1,83 @@
# 灾害影响范围计算参数配置
# ============================================
# 滑坡 — 经验到达角法 + 地形修正
# ============================================
landslide:
height_drop_m: 50 # 假设崩滑高差(m)
reach_angle: # 到达角经验值(度)
小型: 31
中型: 28
大型: 25
特大型: 23
fan_angle: 45 # 侧向扇形展开角(度)
river_erosion_enhance: 1.3 # 河流距离<200m时的增强系数
river_distance_threshold: 200 # 河流侵蚀增强触发距离(m)
fault_enhance: 1.2 # 断裂带距离<500m时的增强系数
fault_distance_threshold: 500 # 断裂带增强触发距离(m)
min_radius_m: 100 # 最小影响半径
max_radius_m: 2000 # 最大影响半径
# ============================================
# 泥石流 — 河流关联缓冲区
# ============================================
debris_flow:
base_radius_m: 200 # 基础影响半径
slope_factor: 10 # 坡度影响系数 (半径 = base + slope * factor)
river_buffer_m: 80 # 河道缓冲区宽度
max_river_search_m: 2000 # 最近河流搜索范围
min_radius_m: 100
max_radius_m: 2000
# ============================================
# 山洪 — 多级河流缓冲区
# ============================================
flash_flood:
river_dist_thresholds: [100, 300, 500] # 距河流距离阈值(m)
buffer_by_level: # 对应各级缓冲区(m)
1: 500 # 干流
2: 300 # 一级支流
3: 150 # 二级支流
4: 80 # 三级以下
5: 80
max_river_search_m: 3000 # 最近河流搜索范围
# ============================================
# 内涝 — TWI简化 + 不透水率 + 管网修正
# ============================================
waterlogging:
base_radius_m: 100 # 基础积水半径
impervious_factor: 3 # 不透水率修正系数
pipe_density_factor: 400 # 管网密度修正系数
pipe_lower_bound: 0.3 # 管网修正下限
min_radius_m: 50
max_radius_m: 800
# ============================================
# 崩塌 — 锥体传播模型 (CONEFALL简化)
# ============================================
rockfall:
height_drop_m: 50 # 假设崩落高差(m)
reach_angle: # 到达角经验值(度)
小型: 33
中型: 30
大型: 27
特大型: 25
fault_angle_penalty: 5 # 断裂带<300m时到达角减少量(度)
fault_penalty_threshold: 300 # 断裂带惩罚触发距离(m)
fault_radius_enhance: 1.5 # 断裂带<200m时半径增强系数
fault_enhance_threshold: 200 # 半径增强触发距离(m)
min_radius_m: 50
max_radius_m: 1000
# ============================================
# 默认参数 — 未匹配灾害类型时的兜底
# ============================================
default:
radius_m: 200
+7 -7
View File
@@ -39,8 +39,14 @@ class AppLauncher:
check_virtualenv(self.project_root) check_virtualenv(self.project_root)
# 检查是否正在使用虚拟环境运行 # 检查是否正在使用虚拟环境运行
# sys.prefix != sys.base_prefix 是 Python 检测 venv 的标准方式
# 不依赖路径解析,Windows/Linux 均适用
in_venv = hasattr(sys, 'real_prefix') or (
hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix
)
if not in_venv:
import platform import platform
import sys
venv_path = self.project_root / ".venv" venv_path = self.project_root / ".venv"
os_name = platform.system() os_name = platform.system()
@@ -49,16 +55,10 @@ class AppLauncher:
else: # Linux/Mac else: # Linux/Mac
venv_python = venv_path / "bin" / "python3" venv_python = venv_path / "bin" / "python3"
# 如果当前不是使用虚拟环境的Python,则重新启动
current_python = Path(sys.executable).resolve()
venv_python_resolved = venv_python.resolve()
if current_python != venv_python_resolved:
print("\n" + "=" * 50) print("\n" + "=" * 50)
print("检测到未使用虚拟环境,正在切换到虚拟环境...") print("检测到未使用虚拟环境,正在切换到虚拟环境...")
print("=" * 50) print("=" * 50)
# 使用虚拟环境的Python重新启动应用(不传递参数避免重复检查)
import subprocess import subprocess
cmd = [str(venv_python)] + sys.argv cmd = [str(venv_python)] + sys.argv
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
+4 -8
View File
@@ -6,8 +6,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from app.utils.api_deps import get_rainfall_model, get_earthquake_model, is_model_loaded from app.utils.api_deps import get_rainfall_model, get_earthquake_model
from app.schemas.api_schemas import HealthResponse
from app.config.paths import get_logger from app.config.paths import get_logger
from config import settings from config import settings
@@ -57,6 +56,9 @@ def create_app() -> FastAPI:
start = time.time() start = time.time()
response = await call_next(request) response = await call_next(request)
elapsed = time.time() - start elapsed = time.time() - start
# 静默健康检查探针
if request.url.path == "/" and request.method == "GET":
return response
logger.info(f"{request.method} {request.url.path} -> {response.status_code} ({elapsed:.3f}s)") logger.info(f"{request.method} {request.url.path} -> {response.status_code} ({elapsed:.3f}s)")
return response return response
@@ -64,10 +66,4 @@ def create_app() -> FastAPI:
from app.api import register_routers from app.api import register_routers
register_routers(application) register_routers(application)
@application.get("/health", response_model=HealthResponse, tags=["系统"])
async def health_check():
"""健康检查"""
status = is_model_loaded()
return HealthResponse(status="ok", **status)
return application return application
+3
View File
@@ -0,0 +1,3 @@
from app.models.hazard.hazard_zone_calculator import HazardZoneCalculator, hazard_zone_calculator
__all__ = ['HazardZoneCalculator', 'hazard_zone_calculator']
+413
View File
@@ -0,0 +1,413 @@
"""
灾害影响范围计算器
根据灾害类型和静态因子计算隐患点的影响范围(zone)和影响路径(path)
论文支撑:
- 滑坡: He et al. (2023) Landslides — 经验到达角法
- 泥石流: Baggio et al. (2021) NHESS — 河流关联缓冲区
- 山洪: Costache et al. (2022) STOTEN — 多级缓冲区
- 内涝: Wang et al. (2022) Water Res. Mgmt — TWI简化法
- 崩塌: Guerin et al. (2022) Eng. Geology — 锥体传播模型
"""
import math
import os
from typing import Optional, Tuple, Dict, Any
import yaml
from app.config.paths import CONFIG_DIR
# 配置文件路径
_PARAMS_PATH = os.path.join(CONFIG_DIR, 'hazard', 'hazard_zone_params.yaml')
# 地球半径 (WGS84)
_EARTH_RADIUS_M = 6371000.0
def _load_params() -> Dict[str, Any]:
"""加载灾害影响范围计算参数"""
with open(_PARAMS_PATH, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _to_wkt_polygon(coords: list) -> str:
"""
将坐标列表转换为 WKT POLYGON 字符串
Args:
coords: [(lon1, lat1), (lon2, lat2), ...] 首尾需闭合
"""
points = ', '.join([f'{c[0]:.8f} {c[1]:.8f}' for c in coords])
return f'POLYGON(({points}))'
def _to_wkt_linestring(coords: list) -> str:
"""将坐标列表转换为 WKT LINESTRING 字符串"""
points = ', '.join([f'{c[0]:.8f} {c[1]:.8f}' for c in coords])
return f'LINESTRING({points})'
def _offset_to_lonlat(lon: float, lat: float, dx_m: float, dy_m: float) -> Tuple[float, float]:
"""
将米级偏移转换回经纬度
Args:
lon, lat: 中心点经纬度
dx_m: 东向偏移(m)
dy_m: 北向偏移(m)
Returns:
(new_lon, new_lat)
"""
lat_rad = math.radians(lat)
dlon = math.degrees(dx_m / (_EARTH_RADIUS_M * math.cos(lat_rad)))
dlat = math.degrees(dy_m / _EARTH_RADIUS_M)
return lon + dlon, lat + dlat
def _make_fan_wkt(lon: float, lat: float, direction_deg: float,
spread_deg: float, radius_m: float,
segments: int = 24) -> str:
"""
创建扇形/锥形影响范围 WKT
Args:
lon, lat: 隐患点坐标
direction_deg: 扇形主方向(坡向, 0=北, 90=东)
spread_deg: 扇形总开角(度)
radius_m: 半径(m)
segments: 弧线段数
Returns:
POLYGON WKT 字符串
"""
half_spread = spread_deg / 2.0
coords = [(lon, lat)] # 中心点
for i in range(segments + 1):
angle_deg = direction_deg - half_spread + (spread_deg * i / segments)
angle_rad = math.radians(angle_deg)
dx = radius_m * math.sin(angle_rad)
dy = radius_m * math.cos(angle_rad)
coords.append(_offset_to_lonlat(lon, lat, dx, dy))
coords.append((lon, lat)) # 闭合回中心
return _to_wkt_polygon(coords)
def _make_circle_wkt(lon: float, lat: float, radius_m: float,
segments: int = 48) -> str:
"""
创建圆形缓冲区 WKT(用正多边形近似圆)
Args:
lon, lat: 中心点
radius_m: 半径(m)
segments: 边数
"""
coords = []
for i in range(segments):
angle_rad = 2 * math.pi * i / segments
dx = radius_m * math.sin(angle_rad)
dy = radius_m * math.cos(angle_rad)
coords.append(_offset_to_lonlat(lon, lat, dx, dy))
coords.append(coords[0]) # 闭合
return _to_wkt_polygon(coords)
class HazardZoneCalculator:
"""
灾害影响范围计算器
根据隐患点的灾害类型和静态因子,计算影响范围(WKT多边形)和影响路径(WKT线)。
每种灾害类型使用不同的经验公式,参数从 hazard_zone_params.yaml 读取。
"""
def __init__(self):
self.params = _load_params()
# ==================== 公开接口 ====================
def calculate(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
lon: float, lat: float,
static_factors: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
"""
计算单个隐患点的影响范围
Args:
source_id: 隐患点ID
disaster_type: 灾害类型 (滑坡/泥石流/山洪/内涝/崩塌)
scale_grade: 规模等级 (小型/中型/大型)
lon: 经度
lat: 纬度
static_factors: 静态因子字典,从 xian_risk_factors.static_factors 获取
Returns:
(zone_wkt, path_wkt) — zone为POLYGON WKT, path为LINESTRING WKT或None
"""
# 选择对应灾害类型的计算方法
method_map = {
'滑坡': self._calc_landslide,
'泥石流': self._calc_debris_flow,
'山洪': self._calc_flash_flood,
'内涝': self._calc_waterlogging,
'崩塌': self._calc_rockfall,
}
method = method_map.get(disaster_type, self._calc_default)
return method(source_id, disaster_type, scale_grade, static_factors, lon, lat)
# ==================== 滑坡 ====================
def _calc_landslide(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""滑坡: 经验到达角法 + 扇形展开"""
p = self.params['landslide']
dem = float(factors.get('dem_value', 0))
aspect = float(factors.get('aspect_value', 0))
river_dist = float(factors.get('river_distance', 9999))
fault_dist = float(factors.get('fault_distance', 9999))
# 到达角
angle_key = scale_grade if scale_grade in p['reach_angle'] else '中型'
reach_angle = p['reach_angle'][angle_key]
# 半径: H / tan(到达角)
H = p['height_drop_m']
radius = H / math.tan(math.radians(reach_angle))
# 修正
if river_dist < p['river_distance_threshold']:
radius *= p['river_erosion_enhance']
if fault_dist < p['fault_distance_threshold']:
radius *= p['fault_enhance']
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
# 扇形 — 沿坡向展开
if aspect and aspect > 0:
zone_wkt = _make_fan_wkt(lon, lat, aspect, p['fan_angle'], radius)
# 滑坡路径: 从隐患点沿坡向延伸到堆积区边缘
end_lon, end_lat = _offset_to_lonlat(
lon, lat,
radius * math.sin(math.radians(aspect)),
radius * math.cos(math.radians(aspect))
)
path_wkt = _to_wkt_linestring([(lon, lat), (end_lon, end_lat)])
else:
zone_wkt = _make_circle_wkt(lon, lat, radius)
path_wkt = None
return zone_wkt, path_wkt
# ==================== 泥石流 ====================
def _calc_debris_flow(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""
泥石流: 沿最近河流方向建立影响区
path_geom 为隐患点到最近河流的连线,代表潜在流动路径
"""
p = self.params['debris_flow']
slope = float(factors.get('slope_value', 0))
radius = p['base_radius_m'] + slope * p['slope_factor']
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
zone_wkt = _make_circle_wkt(lon, lat, radius)
# 查询最近河流段 + 构建完整流动路径
river_geom_wkt = self._get_nearest_river_geom(lon, lat, p['max_river_search_m'])
if river_geom_wkt:
# 取河流上最近点
river_point = self._get_nearest_river_point(lon, lat, p['max_river_search_m'])
if river_point:
path_wkt = _to_wkt_linestring([
(lon, lat),
(river_point[0], river_point[1])
])
else:
path_wkt = river_geom_wkt # 降级:直接展示河流段
else:
path_wkt = None
return zone_wkt, path_wkt
# ==================== 山洪 ====================
def _calc_flash_flood(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""
山洪: 根据距河流距离 + 河流等级确定缓冲区
path_geom 为关联的河流段
"""
p = self.params['flash_flood']
river_dist = float(factors.get('river_distance', 9999))
thresholds = p['river_dist_thresholds']
buffer_map = p['buffer_by_level']
# 按距离选缓冲半径
if river_dist < thresholds[0]:
radius = buffer_map[1]
elif river_dist < thresholds[1]:
radius = buffer_map[2]
elif river_dist < thresholds[2]:
radius = buffer_map[3]
else:
radius = buffer_map.get(4, 80)
zone_wkt = _make_circle_wkt(lon, lat, radius)
# 山洪路径: 关联的最近河流段(淹没路径沿河展开)
path_wkt = self._get_nearest_river_geom(lon, lat, p['max_river_search_m'])
return zone_wkt, path_wkt
# ==================== 内涝 ====================
def _calc_waterlogging(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""内涝: TWI简化 + 不透水率 + 管网修正"""
p = self.params['waterlogging']
slope = float(factors.get('slope_value', 1))
impervious = float(factors.get('impervious_surface', 0))
pipe_density = float(factors.get('pipe_density', 0))
slope_rad = max(math.radians(slope), 0.001)
# TWI_proxy = ln(1 / tanβ)
twi = math.log(1.0 / math.tan(slope_rad))
radius = (p['base_radius_m'] * twi
* (1.0 + impervious * p['impervious_factor'])
* max(p['pipe_lower_bound'], 1.0 - pipe_density * p['pipe_density_factor']))
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
zone_wkt = _make_circle_wkt(lon, lat, radius)
return zone_wkt, None
# ==================== 崩塌 ====================
def _calc_rockfall(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""崩塌: 锥体传播模型 (CONEFALL简化)"""
p = self.params['rockfall']
aspect = float(factors.get('aspect_value', 0))
fault_dist = float(factors.get('fault_distance', 9999))
# 到达角
angle_key = scale_grade if scale_grade in p['reach_angle'] else '中型'
reach_angle = p['reach_angle'][angle_key]
# 断裂带惩罚:减少到达角 = 更平缓 = 影响更大
if fault_dist < p['fault_penalty_threshold']:
reach_angle -= p['fault_angle_penalty']
# 半径
H = p['height_drop_m']
radius = H / math.tan(math.radians(max(reach_angle, 10)))
# 断裂带增强
if fault_dist < p['fault_enhance_threshold']:
radius *= p['fault_radius_enhance']
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
# 锥形 — 沿坡向,窄扇形
if aspect and aspect > 0:
zone_wkt = _make_fan_wkt(lon, lat, aspect, 45, radius)
else:
zone_wkt = _make_circle_wkt(lon, lat, radius)
return zone_wkt, None
# ==================== 默认 ====================
def _calc_default(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""未匹配灾害类型的兜底:固定半径圆形"""
p = self.params['default']
zone_wkt = _make_circle_wkt(lon, lat, p['radius_m'])
return zone_wkt, None
# ==================== 数据库查询辅助 ====================
@staticmethod
def _get_nearest_river_point(lon: float, lat: float,
max_dist_m: float) -> Optional[Tuple[float, float]]:
"""
查询最近河流上距离隐患点最近的点的坐标
"""
from app.utils.db_helper import db_helper
sql = """
SELECT ST_X(ST_ClosestPoint(r.geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))) AS rx,
ST_Y(ST_ClosestPoint(r.geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))) AS ry
FROM xian_rivers r
WHERE r.is_delete = 0
AND ST_DWithin(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography,
%s)
ORDER BY ST_Distance(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography)
LIMIT 1
"""
row = db_helper.execute_query_one(
sql, (lon, lat, lon, lat, lon, lat, max_dist_m, lon, lat)
)
if row and row['rx'] and row['ry']:
return float(row['rx']), float(row['ry'])
return None
@staticmethod
def _get_nearest_river_geom(lon: float, lat: float,
max_dist_m: float) -> Optional[str]:
"""
查询最近河流段的 WKT
Returns:
LINESTRING WKT 字符串或 None
"""
from app.utils.db_helper import db_helper
sql = """
SELECT ST_AsText(r.geom) AS wkt
FROM xian_rivers r
WHERE r.is_delete = 0
AND ST_DWithin(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography,
%s)
ORDER BY ST_Distance(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography)
LIMIT 1
"""
row = db_helper.execute_query_one(sql, (lon, lat, max_dist_m, lon, lat))
if row and row['wkt']:
return row['wkt']
return None
# 全局实例
hazard_zone_calculator = HazardZoneCalculator()
+20 -1
View File
@@ -195,6 +195,7 @@ class DbnRepository:
COUNT(*) as record_count COUNT(*) as record_count
FROM xian_meteorology FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
AND rainfall_1h IS NOT NULL AND rainfall_1h IS NOT NULL
AND CAST(rainfall_1h AS DOUBLE PRECISION) > 0 AND CAST(rainfall_1h AS DOUBLE PRECISION) > 0
GROUP BY lon, lat GROUP BY lon, lat
@@ -259,6 +260,7 @@ class DbnRepository:
ST_SetSRID(ST_MakePoint(%s, %s), 4326) ST_SetSRID(ST_MakePoint(%s, %s), 4326)
) as dist ) as dist
FROM xian_meteorology FROM xian_meteorology
WHERE is_delete = 0
) t ) t
WHERE dist < 50000 WHERE dist < 50000
ORDER BY dist ORDER BY dist
@@ -283,6 +285,7 @@ class DbnRepository:
CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall
FROM xian_meteorology FROM xian_meteorology
WHERE lon = %s AND lat = %s WHERE lon = %s AND lat = %s
AND is_delete = 0
AND datetime BETWEEN %s AND %s AND datetime BETWEEN %s AND %s
ORDER BY datetime DESC ORDER BY datetime DESC
""" """
@@ -326,7 +329,7 @@ class DbnRepository:
"""一次性加载所有气象站点坐标到内存(188个站点,约2KB)""" """一次性加载所有气象站点坐标到内存(188个站点,约2KB)"""
if cls._cached_stations is not None: if cls._cached_stations is not None:
return cls._cached_stations return cls._cached_stations
sql = "SELECT DISTINCT lon, lat FROM xian_meteorology" sql = "SELECT DISTINCT lon, lat FROM xian_meteorology WHERE is_delete = 0"
cls._cached_stations = db_helper.execute_query(sql) cls._cached_stations = db_helper.execute_query(sql)
logger.info(f"已缓存 {len(cls._cached_stations)} 个气象站点坐标") logger.info(f"已缓存 {len(cls._cached_stations)} 个气象站点坐标")
return cls._cached_stations return cls._cached_stations
@@ -407,6 +410,7 @@ class DbnRepository:
SELECT lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall SELECT lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall
FROM xian_meteorology FROM xian_meteorology
WHERE (lon, lat) IN ({placeholders}) WHERE (lon, lat) IN ({placeholders})
AND is_delete = 0
AND datetime BETWEEN %s AND %s AND datetime BETWEEN %s AND %s
ORDER BY lon, lat, datetime DESC ORDER BY lon, lat, datetime DESC
""" """
@@ -688,6 +692,21 @@ class DbnRepository:
return float(result['aspect']) return float(result['aspect'])
return None return None
@staticmethod
def get_inference_result(inference_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取推理结果
Returns:
{id, name, event_type, occurred_time, operation_type, condition, result}
"""
sql = """
SELECT id, name, event_type, occurred_time, operation_type, condition, result
FROM xian_inference_result
WHERE id = %s AND is_delete = 0
"""
return db_helper.execute_query_one(sql, (inference_id,))
@staticmethod @staticmethod
def save_inference_result(disaster_name: str, event_type: str, occurred_time, operation_type: str, def save_inference_result(disaster_name: str, event_type: str, occurred_time, operation_type: str,
condition: dict, result: list) -> int: condition: dict, result: list) -> int:
+120
View File
@@ -0,0 +1,120 @@
"""
灾害影响范围数据仓库
负责 xian_hazard_zones 表的数据读取和写入
"""
from typing import List, Dict, Any, Optional
from app.utils.db_helper import db_helper
class HazardRepository:
"""灾害影响范围数据访问层"""
# ==================== 隐患点查询 ====================
@staticmethod
def get_all_hidden_danger_spots() -> List[Dict[str, Any]]:
"""
获取所有隐患点(含静态因子)
Returns:
隐患点列表,每个元素含: source_id, disaster_name, disaster_type,
scale_grade, county, lon, lat, static_factors
"""
sql = """
SELECT
h.id AS source_id,
h.disaster_name,
h.disaster_type,
h.scale_grade,
h.county,
rf.lon,
rf.lat,
rf.static_factors
FROM xian_hidden_danger_spots h
JOIN xian_risk_factors rf
ON rf.source_id = h.id AND rf.source_type = 1
WHERE h.is_delete = 0
AND rf.is_delete = 0
AND h.disaster_type IN ('滑坡', '泥石流', '山洪', '内涝', '崩塌')
AND rf.lon IS NOT NULL
AND rf.lat IS NOT NULL
ORDER BY h.id
"""
rows = db_helper.execute_query(sql)
return [{
'source_id': r['source_id'],
'disaster_name': r['disaster_name'],
'disaster_type': r['disaster_type'],
'scale_grade': r['scale_grade'],
'county': r['county'],
'lon': float(r['lon']),
'lat': float(r['lat']),
'static_factors': r.get('static_factors') or {}
} for r in rows]
# ==================== 影响范围写入 ====================
@staticmethod
def delete_existing_zones() -> int:
"""逻辑删除已有影响范围记录"""
sql = "UPDATE xian_hazard_zones SET is_delete = 1 WHERE is_delete = 0"
return db_helper.execute_update(sql)
@staticmethod
def insert_hazard_zone(source_id: int, disaster_type: str,
zone_wkt: str, path_wkt: Optional[str]) -> int:
"""
插入单条影响范围记录
Args:
source_id: 隐患点ID
disaster_type: 灾害类型
zone_wkt: 影响范围 POLYGON WKT
path_wkt: 影响路径 LINESTRING WKT (可为None)
Returns:
新插入记录的ID
"""
if path_wkt:
sql = """
INSERT INTO xian_hazard_zones (source_id, disaster_type, zone_geom, path_geom)
VALUES (%s, %s, ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326))
RETURNING id
"""
row = db_helper.execute_query_one(sql, (source_id, disaster_type, zone_wkt, path_wkt))
else:
sql = """
INSERT INTO xian_hazard_zones (source_id, disaster_type, zone_geom)
VALUES (%s, %s, ST_GeomFromText(%s, 4326))
RETURNING id
"""
row = db_helper.execute_query_one(sql, (source_id, disaster_type, zone_wkt))
return row['id'] if row else 0
@staticmethod
def batch_insert_zones(records: List[Dict[str, Any]]) -> int:
"""
批量插入影响范围记录
Args:
records: [{source_id, disaster_type, zone_wkt, path_wkt}, ...]
Returns:
成功插入的记录数
"""
count = 0
for rec in records:
hid = HazardRepository.insert_hazard_zone(
rec['source_id'],
rec['disaster_type'],
rec['zone_wkt'],
rec.get('path_wkt')
)
if hid > 0:
count += 1
return count
# 全局实例
hazard_repository = HazardRepository()
+1 -1
View File
@@ -12,7 +12,7 @@ class QgisRepository:
sql = """ sql = """
SELECT id, name, event_type, occurred_time, condition SELECT id, name, event_type, occurred_time, condition
FROM xian_inference_result FROM xian_inference_result
WHERE id = %s WHERE id = %s AND is_delete = 0
""" """
rows = db_helper.execute_query(sql, (inference_id,)) rows = db_helper.execute_query(sql, (inference_id,))
if not rows: if not rows:
+97
View File
@@ -28,6 +28,7 @@ class RainfallRepository:
SELECT max(id) as max_id SELECT max(id) as max_id
FROM xian_meteorology FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
""" """
result = db_helper.execute_query_one(sql, (start_time, end_time)) result = db_helper.execute_query_one(sql, (start_time, end_time))
@@ -58,6 +59,7 @@ class RainfallRepository:
CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h
FROM xian_meteorology FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
ORDER BY lon, lat, datetime DESC ORDER BY lon, lat, datetime DESC
""" """
@@ -100,5 +102,100 @@ class RainfallRepository:
return station_data return station_data
def get_district_rainfall_summary(self, query_time, region_code: Optional[str] = None) -> List[Dict[str, Any]]:
"""
按行政区聚合降雨统计(取区内最大站点值)
Args:
query_time: 查询时间
region_code: 行政区划代码,不传则返回所有区
Returns:
[{district_name, district_code, rainfall, duration_hours}, ...]
"""
start_time, end_time = time_converter.to_db_time_range(query_time, hours=72)
# 一次查询:area_code + lon + lat + rainfall_1h
sql = """
SELECT area_code, lon, lat, datetime,
CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h
FROM xian_meteorology
WHERE datetime BETWEEN %s AND %s
AND is_delete = 0
ORDER BY area_code, lon, lat, datetime DESC
"""
rows = db_helper.execute_query(sql, (start_time, end_time))
if not rows:
return []
# 按站点分组,计算 accum_rain + duration_hours(复用现有算法)
from itertools import groupby
station_stats: Dict[tuple, Dict[str, Any]] = {} # (area_code, lon, lat) → stats
for (area_code, lon, lat), group in groupby(rows,
key=lambda r: (r['area_code'], r['lon'], r['lat'])):
accum_rain = 0.0
duration_hours = 0
consecutive_no_rain = 0
for row in group:
rainfall = float(row['rainfall_1h']) if row['rainfall_1h'] else 0.0
if rainfall > 0:
accum_rain += rainfall
duration_hours += 1
consecutive_no_rain = 0
else:
consecutive_no_rain += 1
if consecutive_no_rain >= 3:
break
if accum_rain > 0:
duration_hours += 1
key = (area_code, lon, lat)
station_stats[key] = {
'area_code': area_code,
'accum_rain': accum_rain,
'duration_hours': duration_hours
}
# 按 area_code 聚合:取区内最大降雨站点
district_max: Dict[str, Dict[str, Any]] = {}
for key, stats in station_stats.items():
code = stats['area_code']
if region_code and code != region_code:
continue
if code not in district_max or stats['accum_rain'] > district_max[code]['rainfall']:
district_max[code] = {
'district_code': code,
'rainfall': round(stats['accum_rain'], 1),
'duration_hours': stats['duration_hours']
}
if not district_max:
return []
# 查 xian_district 获取名称
codes = list(district_max.keys())
placeholders = ', '.join(['%s'] * len(codes))
sql = f"SELECT code, name FROM xian_district WHERE code IN ({placeholders}) AND is_delete = 0"
district_rows = db_helper.execute_query(sql, tuple(codes))
code_to_name = {r['code']: r['name'] for r in district_rows}
result = []
for code, info in district_max.items():
name = code_to_name.get(code)
if name is None:
continue # 跳过 xian_district 中不存在的代码
result.append({
'district_name': name,
'district_code': code,
'rainfall': info['rainfall'],
'duration_hours': info['duration_hours']
})
# 按名称排序
result.sort(key=lambda x: x['district_name'])
return result
# 创建全局实例 # 创建全局实例
rainfall_repository = RainfallRepository() rainfall_repository = RainfallRepository()
+47 -7
View File
@@ -3,7 +3,7 @@ API 请求/响应数据模型
""" """
from datetime import datetime from datetime import datetime
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, model_validator
# ============================================================ # ============================================================
@@ -11,19 +11,35 @@ from pydantic import BaseModel, Field
# ============================================================ # ============================================================
class RainfallPredictRequest(BaseModel): class RainfallPredictRequest(BaseModel):
"""暴雨灾害链预测请求""" """
暴雨灾害链预测请求
参数规则(二选一):
1. 自动推演模式:rainfall、duration、region_code 全部不传 → 从气象表自动获取,不限区域
2. 指定条件模式:rainfall、duration、region_code 全部传入 → 按指定条件预测
"""
disaster_name: str = Field(min_length=1, max_length=255) disaster_name: str = Field(min_length=1, max_length=255)
point_ids: Optional[List[int]] = Field(None, max_length=500, point_ids: Optional[List[int]] = Field(None, max_length=500,
description="点位ID列表,不传则查询所有点") description="点位ID列表,不传则查询所有点")
region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104',不传则不限区域") region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104'")
rainfall: Optional[float] = Field(None, ge=0, rainfall: Optional[float] = Field(None, ge=0, description="累计降雨量(mm)")
description="累计降雨量(mm),不传则从气象表自动获取") duration: Optional[float] = Field(None, ge=0, description="降雨持续时间(h)")
duration: Optional[float] = Field(None, ge=0,
description="降雨持续时间(h),不传则从气象表自动获取")
occurred_time: Optional[datetime] = Field(None, description="事件发生时间,不传则为当前时间") occurred_time: Optional[datetime] = Field(None, description="事件发生时间,不传则为当前时间")
operation_type: str = Field("模拟", min_length=1, max_length=50, operation_type: str = Field("模拟", min_length=1, max_length=50,
description="操作类型(如 '模拟', '实时监测', '应急评估'") description="操作类型(如 '模拟', '实时监测', '应急评估'")
@model_validator(mode='after')
def validate_mode_exclusivity(self):
"""校验 rainfall / duration / region_code 必须要么全不传,要么全传"""
trio = (self.rainfall, self.duration, self.region_code)
none_count = sum(1 for v in trio if v is None)
if none_count not in (0, 3):
raise ValueError(
"rainfall、duration、region_code 必须全部传入或全部不传,"
"不允许只传部分参数"
)
return self
# ============================================================ # ============================================================
# 地震预测 # 地震预测
@@ -86,3 +102,27 @@ class HealthResponse(BaseModel):
status: str = "ok" status: str = "ok"
rainfall_model_loaded: bool = False rainfall_model_loaded: bool = False
earthquake_model_loaded: bool = False earthquake_model_loaded: bool = False
# ============================================================
# 各区降雨概况
# ============================================================
class DistrictSummaryRequest(BaseModel):
"""各区降雨概况请求"""
inference_id: int = Field(..., ge=1, description="推理结果IDxian_inference_result.id")
class DistrictSummaryItem(BaseModel):
"""单个区的降雨概况"""
district_name: str = Field(..., description="行政区名称(如 '碑林区'")
district_code: str = Field(..., description="行政区划代码(如 '610103'")
rainfall: float = Field(..., description="累计降雨量(mm)")
duration_hours: float = Field(..., description="持续降雨时间(h)")
class DistrictSummaryResponse(BaseModel):
"""各区降雨概况响应"""
code: int = Field(200, description="状态码")
message: str = Field("success", description="提示信息")
data: Optional[List[DistrictSummaryItem]] = Field(None, description="各区降雨概况列表")
+115
View File
@@ -0,0 +1,115 @@
"""
计算所有隐患点的灾害影响范围并填充 xian_hazard_zones 表
仅计算隐患点 (xian_hidden_danger_spots),不计算风险点。
灾害类型包括: 滑坡、泥石流、山洪、内涝、崩塌
"""
import sys
import os
from datetime import datetime
# 确保项目根目录在 sys.path 中
_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from app.utils.logger import LoggerManager
from app.repositories.hazard_repository import hazard_repository
from app.models.hazard.hazard_zone_calculator import hazard_zone_calculator
_BATCH_SIZE = 200 # 每处理200条输出一次进度
logger = LoggerManager.get_logger("hazard_zones",
os.path.join(_project_root, "logs"))
def main():
logger.info("=" * 60)
logger.info("开始计算隐患点灾害影响范围")
logger.info(f"启动时间: {datetime.now().isoformat()}")
logger.info("=" * 60)
# 1. 获取所有隐患点
spots = hazard_repository.get_all_hidden_danger_spots()
total = len(spots)
logger.info(f"查询到 {total} 个隐患点")
if total == 0:
logger.warning("没有隐患点数据,退出")
return
# 按灾害类型统计
type_counts = {}
for s in spots:
dt = s['disaster_type']
type_counts[dt] = type_counts.get(dt, 0) + 1
logger.info(f"灾害类型分布: {type_counts}")
# 2. 逐点计算
records = []
success_count = 0
skip_count = 0
error_count = 0
for i, spot in enumerate(spots):
sid = spot['source_id']
dtype = spot['disaster_type']
dname = spot['disaster_name']
scale = spot.get('scale_grade')
try:
zone_wkt, path_wkt = hazard_zone_calculator.calculate(
source_id=sid,
disaster_type=dtype,
scale_grade=scale,
lon=spot['lon'],
lat=spot['lat'],
static_factors=spot['static_factors']
)
if zone_wkt:
records.append({
'source_id': sid,
'disaster_type': dtype,
'zone_wkt': zone_wkt,
'path_wkt': path_wkt
})
success_count += 1
else:
skip_count += 1
logger.debug(f"跳过 [{sid}] {dname}({dtype}): 无法计算影响范围")
except Exception as e:
error_count += 1
logger.error(f"计算失败 [{sid}] {dname}({dtype}): {e}")
# 进度输出
if (i + 1) % _BATCH_SIZE == 0 or (i + 1) == total:
logger.info(f"进度: {i + 1}/{total} "
f"(成功:{success_count} 跳过:{skip_count} 失败:{error_count})")
# 3. 写入数据库
logger.info(f"计算完成。开始写入数据库,共 {len(records)} 条记录")
# 先逻辑删除旧数据
deleted = hazard_repository.delete_existing_zones()
logger.info(f"已标记 {deleted} 条旧记录为删除")
# 批量写入新数据
inserted = hazard_repository.batch_insert_zones(records)
logger.info(f"成功写入 {inserted} 条新记录")
# 4. 汇总
logger.info("=" * 60)
logger.info("执行完成")
logger.info(f" 隐患点总数: {total}")
logger.info(f" 计算成功: {success_count}")
logger.info(f" 跳过: {skip_count}")
logger.info(f" 计算失败: {error_count}")
logger.info(f" 写入记录: {inserted}")
logger.info(f"完成时间: {datetime.now().isoformat()}")
logger.info("=" * 60)
if __name__ == '__main__':
main()
+159
View File
@@ -0,0 +1,159 @@
"""
将项目 fonts/ 目录下的中文字体安装到 Docker QGIS 容器。
QGIS 官方 Docker 镜像不包含中文字体,模板中的 SimHei/SimSun/YaHei 字体会显示为方块。
本脚本将 fonts/ 目录下的字体文件复制到容器内并刷新字体缓存。
用法:
python app/script/install_fonts_to_container.py [--container qgis-server] [--dry-run]
前置条件:
- Docker 容器已启动(docker start qgis-server
- 项目根目录下 fonts/ 目录包含所需的 .ttf/.ttc 字体文件
"""
import argparse
import subprocess
import sys
import time
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
FONTS_DIR = PROJECT_ROOT / "fonts"
# 容器内字体目录
CONTAINER_FONT_DIR = "/usr/share/fonts/truetype/winfonts"
def _run(cmd, timeout=10, **kwargs):
"""subprocess.run 封装,统一 UTF-8 编码"""
return subprocess.run(
cmd, capture_output=True, timeout=timeout,
encoding="utf-8", errors="replace", **kwargs,
)
def _load_config():
"""从 settings.toml 读取配置"""
try:
from config import settings
return {
"container": getattr(settings, "QGIS_DOCKER_CONTAINER", "qgis-server"),
}
except ImportError:
pass
cfg = {"container": "qgis-server"}
toml_path = PROJECT_ROOT / "settings.toml"
if toml_path.exists():
for line in toml_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line.startswith("QGIS_DOCKER_CONTAINER") and "=" in line:
cfg["container"] = line.split("=", 1)[1].strip().strip('"').strip("'")
return cfg
def _check_container(container: str):
"""检查容器是否运行"""
result = _run(["docker", "inspect", "--format={{.State.Running}}", container], timeout=5)
if (result.stdout or "").strip() != "true":
raise RuntimeError(f"容器 {container} 未运行,请先 docker start {container}")
def install_fonts(container: str = None, dry_run: bool = False):
"""将 fonts/ 目录下的字体安装到容器"""
cfg = _load_config()
if container is None:
container = cfg["container"]
_check_container(container)
# 扫描字体文件
if not FONTS_DIR.is_dir():
print(f"字体目录不存在: {FONTS_DIR}")
print(f"请先在项目根目录下创建 fonts/ 目录,并放入 .ttf/.ttc 字体文件。")
print(f"Windows 字体路径: C:\\Windows\\Fonts\\")
print(f" simhei.ttf — 黑体(模板默认字体)")
print(f" simsun.ttc — 宋体")
print(f" msyh.ttc — 微软雅黑")
print(f" msyhbd.ttc — 微软雅黑粗体")
sys.exit(1)
font_files = [f for f in FONTS_DIR.iterdir()
if f.is_file() and f.suffix.lower() in (".ttf", ".ttc", ".otf")]
if not font_files:
print(f"字体目录为空或无字体文件: {FONTS_DIR}")
print(f"请放入 .ttf/.ttc/.otf 字体文件后重试。")
sys.exit(1)
print(f"=== 安装中文字体到 Docker 容器 {container} ===\n")
print(f" 字体目录: {FONTS_DIR}")
print(f" 字体文件: {len(font_files)}")
for f in sorted(font_files):
size_kb = f.stat().st_size / 1024
print(f" - {f.name} ({size_kb:.0f} KB)")
print(f" 容器目标: {container}:{CONTAINER_FONT_DIR}")
print()
if dry_run:
print(" [dry-run] 跳过安装")
return
# 1. 在容器内创建字体目录
t0 = time.time()
_run(["docker", "exec", container, "mkdir", "-p", CONTAINER_FONT_DIR])
# 2. 逐个复制字体文件到容器
print(" [1/3] 复制字体文件...")
for f in sorted(font_files):
result = _run(
["docker", "cp", str(f), f"{container}:{CONTAINER_FONT_DIR}/{f.name}"],
timeout=30,
)
if result.returncode != 0:
print(f" FAIL {f.name}: {result.stderr.strip()}")
else:
print(f" OK {f.name}")
# 3. 刷新字体缓存
print("\n [2/3] 刷新字体缓存...")
result = _run(["docker", "exec", container, "fc-cache", "-fv"], timeout=30)
if result.returncode != 0:
print(f" WARN fc-cache 输出: {result.stderr.strip()}")
else:
print(" OK")
# 4. 验证字体
print("\n [3/3] 验证字体...")
verify_script = (
"from PyQt5.QtGui import QFontDatabase; "
"db = QFontDatabase(); "
"zh = [f for f in db.families() if any(k in f for k in "
"['SimHei','YaHei','SimSun','WenQuanYi','Noto Sans CJK'])]; "
"print('中文字体:', zh if zh else '未安装!')"
)
result = _run(
["docker", "exec", container, "python3", "-c", verify_script],
timeout=10,
)
print(f" {(result.stdout or '').strip()}")
elapsed = time.time() - t0
print(f"\n=== 完成,耗时 {elapsed:.1f}s ===")
def main():
parser = argparse.ArgumentParser(description="将中文字体安装到 Docker QGIS 容器")
parser.add_argument("--container", default=None, help="Docker 容器名称")
parser.add_argument("--dry-run", action="store_true", help="仅显示信息,不实际安装")
args = parser.parse_args()
try:
install_fonts(container=args.container, dry_run=args.dry_run)
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+12 -5
View File
@@ -172,6 +172,8 @@ def map_template_to_container(host_path: str) -> str:
主机: F:/project/xian/xian_algorithm_new/app/data/template/rainfall/xxx.qgz 主机: F:/project/xian/xian_algorithm_new/app/data/template/rainfall/xxx.qgz
容器: /data/template/rainfall/xxx.qgz 容器: /data/template/rainfall/xxx.qgz
已为容器路径时直接返回(幂等)。
""" """
try: try:
from config import settings from config import settings
@@ -179,16 +181,21 @@ def map_template_to_container(host_path: str) -> str:
except Exception: except Exception:
container_tpl = "" container_tpl = ""
# 已经是容器路径,直接返回
normalized = host_path.replace("\\", "/")
if container_tpl and normalized.startswith(container_tpl.rstrip("/") + "/"):
return normalized
if not container_tpl: if not container_tpl:
# fallback: 用通用映射(走 9P,慢) # fallback: 用通用映射(走 9P,慢)
return map_host_to_container(host_path) return map_host_to_container(host_path)
normalized = host_path.replace("\\", "/").lower() normalized_lower = normalized.lower()
# 找 "app/data/template/" 后面的部分 # 找模板子目录后面的相对路径(主机端 marker 从配置读取)
marker = "app/data/template/" marker = getattr(settings, "QGIS_HOST_TEMPLATE_SUBDIR", "app/data/template").rstrip("/") + "/"
idx = normalized.find(marker) idx = normalized_lower.find(marker.lower())
if idx >= 0: if idx >= 0:
relative = host_path.replace("\\", "/")[idx + len(marker):] relative = normalized[idx + len(marker):]
return f"{container_tpl.rstrip('/')}/{relative}" return f"{container_tpl.rstrip('/')}/{relative}"
return map_host_to_container(host_path) return map_host_to_container(host_path)
+1 -1
View File
@@ -139,7 +139,7 @@ class QgisPool:
for line in worker.proc.stderr: for line in worker.proc.stderr:
line = line.strip() line = line.strip()
if line: if line:
logger.debug(f"[Worker-{worker.worker_id}] stderr: {line}") logger.info(f"[Worker-{worker.worker_id}] {line}")
except Exception: except Exception:
pass pass
+7
View File
@@ -188,4 +188,11 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
try:
main() main()
except Exception as e:
# 捕获 main() 未处理的异常,确保输出到 stderr
import traceback
print(f"[worker] 未捕获异常: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
sys.exit(1)
+20
View File
@@ -23,6 +23,19 @@ class RedisHelper:
'socket_timeout': 5, # 读写超时时间(秒) 'socket_timeout': 5, # 读写超时时间(秒)
} }
self._client: Optional[redis.Redis] = None self._client: Optional[redis.Redis] = None
self._logged_config = False # 避免重复打印配置
def _log_config_once(self):
"""首次连接失败时打印配置信息,便于排查"""
if not self._logged_config:
from app.utils.logger import get_logger
_logger = get_logger("redis")
_logger.warning(
f"Redis 连接配置: host={self.redis_config['host']}, "
f"port={self.redis_config['port']}, db={self.redis_config['db']}, "
f"password={'***' if self.redis_config['password'] else 'None'}"
)
self._logged_config = True
@property @property
def client(self) -> redis.Redis: def client(self) -> redis.Redis:
@@ -32,8 +45,15 @@ class RedisHelper:
self._client = redis.Redis(**self.redis_config) self._client = redis.Redis(**self.redis_config)
# 测试连接 # 测试连接
self._client.ping() self._client.ping()
except redis.AuthenticationError as e:
self._log_config_once()
raise ConnectionError(f"Redis 认证失败(密码错误): {e}")
except redis.ConnectionError as e: except redis.ConnectionError as e:
self._log_config_once()
raise ConnectionError(f"无法连接到 Redis 服务器: {e}") raise ConnectionError(f"无法连接到 Redis 服务器: {e}")
except Exception as e:
self._log_config_once()
raise ConnectionError(f"Redis 连接异常: {e}")
return self._client return self._client
def close(self): def close(self):
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
+2
View File
@@ -12,6 +12,8 @@ PREDICT_PROBABILITY_THRESHOLD = 50
QGIS_GPKG_DIR = "app/data/gpkg" QGIS_GPKG_DIR = "app/data/gpkg"
# 容器内 GPKG 本地路径 # 容器内 GPKG 本地路径
QGIS_DOCKER_GPKG_DIR = "/data/gpkg" QGIS_DOCKER_GPKG_DIR = "/data/gpkg"
# 模板目录(相对于项目根,用于路径映射时的 marker 匹配)
QGIS_HOST_TEMPLATE_SUBDIR = "app/data/template"
# 容器内模板本地路径 # 容器内模板本地路径
QGIS_DOCKER_TEMPLATE_DIR = "/data/template" QGIS_DOCKER_TEMPLATE_DIR = "/data/template"
# 专题图输出子目录 # 专题图输出子目录
Binary file not shown.