Compare commits

...

10 Commits

Author SHA1 Message Date
wzy-warehouse d9adc06a9f 修改时间 2026-06-14 16:56:09 +08:00
wzy-warehouse 615a563369 增加概率阈值 2026-06-14 16:50:03 +08:00
wzy-warehouse 75046c99c8 修改API接口返回值——添加经纬度 2026-06-14 16:41:31 +08:00
wzy-warehouse b2b5c00f33 修改API接口返回值 2026-06-14 16:37:04 +08:00
wzy-warehouse de118dc57b 修改API接口 2026-06-14 16:29:01 +08:00
wzy-warehouse 028b7989ef 修改API返回值 2026-06-14 16:00:33 +08:00
wzy-warehouse d4eee07890 更新条件概率表 2026-06-14 15:52:56 +08:00
wzy-warehouse c375cd33d9 修改条件存储 2026-06-14 15:52:33 +08:00
wzy-warehouse 5e1dc585d4 修改接口返回值 2026-06-14 15:31:42 +08:00
wzy-warehouse 9a49764b35 添加灾害名称 2026-06-14 14:38:20 +08:00
6 changed files with 151 additions and 104 deletions
+55 -37
View File
@@ -6,7 +6,7 @@ from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException
from app.schemas.api_schemas import EarthquakePredictRequest, PredictResponse, PredictionItem
from app.schemas.api_schemas import EarthquakePredictRequest, PredictResponse, PredictData
from app.utils.api_deps import get_earthquake_model, get_prediction_semaphore
from app.repositories.dbn_repository import dbn_repository
from app.config.paths import get_logger
@@ -18,24 +18,46 @@ SOURCE_TYPE_MAP = {1: "隐患点", 2: "风险点"}
LEVEL_MAP = {"": "", "": "", "较高": "较高", "": ""}
def _build_prediction_items(results: List[Dict[str, Any]]) -> List[PredictionItem]:
"""将模型原始结果转换为接口返回格式"""
items = []
def _build_prediction_map(results: List[Dict[str, Any]]) -> Dict[str, float]:
"""将模型原始结果转换为存储格式: {id_type: 概率百分比}"""
result_map = {}
for r in results:
probs = r.get("disaster_probabilities", {})
levels = r.get("disaster_levels", {})
if not probs:
continue
source_id = r["source_id"]
source_type = r.get("source_type")
max_hazard = max(probs, key=probs.get)
items.append(PredictionItem(
id=r["source_id"], # 使用 source_id(隐患点/风险点ID)而非 xian_risk_factors.id
type=SOURCE_TYPE_MAP.get(r.get("source_type"), "未知"),
probability=round(probs[max_hazard], 4),
level=LEVEL_MAP.get(levels.get(max_hazard, "none"), ""),
))
return items
key = f"{source_id}_{source_type}"
result_map[key] = round(probs[max_hazard] * 100, 2)
return result_map
def _build_prediction_map_with_location(results: List[Dict[str, Any]], threshold: float = 50.0) -> Dict[str, Dict[str, Any]]:
"""将模型原始结果转换为返回格式: {id_type: {probability, lon, lat}}"""
from config import settings
threshold = getattr(settings, 'PREDICT_PROBABILITY_THRESHOLD', threshold)
result_map = {}
for r in results:
probs = r.get("disaster_probabilities", {})
if not probs:
continue
source_id = r["source_id"]
source_type = r.get("source_type")
max_hazard = max(probs, key=probs.get)
prob_value = round(probs[max_hazard] * 100, 2)
# 低于阈值不返回
if prob_value < threshold:
continue
key = f"{source_id}_{source_type}"
result_map[key] = {
"probability": prob_value,
"lon": r.get("lon"),
"lat": r.get("lat")
}
return result_map
def _fetch_points(point_ids: Optional[List[int]], region_code: Optional[str]) -> List[Dict[str, Any]]:
@@ -52,11 +74,11 @@ def _predict_sync(point_ids: Optional[List[int]], region_code: Optional[str],
同步执行地震预测(在线程池中运行)
Returns:
(预测结果列表, 原始结果)
(存储用result_map, 返回用result_map_with_location)
"""
points = _fetch_points(point_ids, region_code)
if not points:
return [], []
return {}, {}
model = get_earthquake_model()
raw_results = model.predict_multiple_points(
@@ -66,28 +88,18 @@ def _predict_sync(point_ids: Optional[List[int]], region_code: Optional[str],
epicenter_lon=epicenter_lon,
epicenter_lat=epicenter_lat,
)
items = _build_prediction_items(raw_results)
result_map = _build_prediction_map(raw_results) # 用于存储
result_map_with_location = _build_prediction_map_with_location(raw_results) # 用于返回
save_results = [
{
"point_id": r.get("source_id"), # 使用 source_id(隐患点/风险点ID)而非 xian_risk_factors.id
"source_type": r.get("source_type"),
"lon": r.get("lon"),
"lat": r.get("lat"),
"disaster_probabilities": r.get("disaster_probabilities", {}),
"disaster_levels": r.get("disaster_levels", {})
}
for r in raw_results
]
return items, save_results
return result_map, result_map_with_location
@router.post("/predict", response_model=PredictResponse, summary="地震灾害链预测")
async def predict_earthquake(req: EarthquakePredictRequest):
"""
根据震级、震源深度和震中位置,批量预测隐患点/风险点的次生灾害概率和等级
根据震级、震源深度和震中位置,批量预测隐患点/风险点的次生灾害概率。
- **disaster_name**: 灾害名称
- **point_ids**: 点位ID列表(可选,不传则查询所有点)
- **region_code**: 行政区划代码(可选,不传则不限区域)
- **magnitude**: 震级(Richter)
@@ -102,7 +114,7 @@ async def predict_earthquake(req: EarthquakePredictRequest):
async with semaphore:
loop = asyncio.get_event_loop()
try:
items, save_results = await loop.run_in_executor(
result_map, result_map_with_location = await loop.run_in_executor(
None, _predict_sync, req.point_ids, req.region_code,
req.magnitude, req.depth, req.epicenter_lon, req.epicenter_lat
)
@@ -112,25 +124,31 @@ async def predict_earthquake(req: EarthquakePredictRequest):
# 保存推理结果
record_id = None
if save_results:
if result_map:
try:
# 使用传入的 occurred_time,如果未传则使用当前时间
from datetime import datetime
occurred_time = req.occurred_time if req.occurred_time else datetime.now()
# 存储经过默认值处理的条件
condition = {
"point_ids": req.point_ids,
"region_code": req.region_code,
"magnitude": req.magnitude,
"depth": req.depth,
"depth": req.depth, # 已有默认值 10.0
"epicenter_lon": req.epicenter_lon,
"epicenter_lat": req.epicenter_lat
"epicenter_lat": req.epicenter_lat,
"occurred_time": occurred_time.isoformat() if hasattr(occurred_time, 'isoformat') else str(occurred_time)
}
record_id = dbn_repository.save_inference_result(
disaster_name=req.disaster_name,
event_type="earthquake",
occurred_time=req.occurred_time,
occurred_time=occurred_time,
operation_type=req.operation_type,
condition=condition,
result=save_results
result=result_map
)
logger.info(f"推理结果已保存,record_id={record_id}")
except Exception as e:
logger.error(f"保存推理结果失败: {e}", exc_info=True)
return PredictResponse(code=200, message="success", data=items, record_id=record_id)
return PredictResponse(code=200, message="success", data=PredictData(record_id=record_id, list=result_map_with_location))
+64 -40
View File
@@ -7,7 +7,7 @@ from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException
from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictionItem, UpdateMonitoringTimeRequest
from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictData, UpdateMonitoringTimeRequest
from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore
from app.repositories.dbn_repository import dbn_repository
from app.core.rainfall_manager import rainfall_manager
@@ -21,24 +21,46 @@ SOURCE_TYPE_MAP = {1: "隐患点", 2: "风险点"}
LEVEL_MAP = {"": "", "": "", "较高": "较高", "": ""}
def _build_prediction_items(results: List[Dict[str, Any]]) -> List[PredictionItem]:
"""将模型原始结果转换为接口返回格式"""
items = []
def _build_prediction_map(results: List[Dict[str, Any]]) -> Dict[str, float]:
"""将模型原始结果转换为存储格式: {id_type: 概率百分比}"""
result_map = {}
for r in results:
probs = r.get("disaster_probabilities", {})
levels = r.get("disaster_levels", {})
if not probs:
continue
source_id = r["source_id"]
source_type = r.get("source_type")
max_hazard = max(probs, key=probs.get)
items.append(PredictionItem(
id=r["source_id"], # 使用 source_id(隐患点/风险点ID)而非 xian_risk_factors.id
type=SOURCE_TYPE_MAP.get(r.get("source_type"), "未知"),
probability=round(probs[max_hazard], 4),
level=LEVEL_MAP.get(levels.get(max_hazard, "none"), ""),
))
return items
key = f"{source_id}_{source_type}"
result_map[key] = round(probs[max_hazard] * 100, 2)
return result_map
def _build_prediction_map_with_location(results: List[Dict[str, Any]], threshold: float = 50.0) -> Dict[str, Dict[str, Any]]:
"""将模型原始结果转换为返回格式: {id_type: {probability, lon, lat}}"""
from config import settings
threshold = getattr(settings, 'PREDICT_PROBABILITY_THRESHOLD', threshold)
result_map = {}
for r in results:
probs = r.get("disaster_probabilities", {})
if not probs:
continue
source_id = r["source_id"]
source_type = r.get("source_type")
max_hazard = max(probs, key=probs.get)
prob_value = round(probs[max_hazard] * 100, 2)
# 低于阈值不返回
if prob_value < threshold:
continue
key = f"{source_id}_{source_type}"
result_map[key] = {
"probability": prob_value,
"lon": r.get("lon"),
"lat": r.get("lat")
}
return result_map
def _fetch_points(point_ids: Optional[List[int]], region_code: Optional[str]) -> List[Dict[str, Any]]:
@@ -50,42 +72,42 @@ def _fetch_points(point_ids: Optional[List[int]], region_code: Optional[str]) ->
def _predict_sync(point_ids: Optional[List[int]], region_code: Optional[str],
rainfall: Optional[float], duration: Optional[float],
operation_type: str) -> tuple:
operation_type: str, occurred_time: Optional[datetime] = None) -> tuple:
"""
同步执行暴雨预测(在线程池中运行)
Args:
occurred_time: 事件发生时间,用于查询降雨数据和DBN推理
Returns:
(预测结果列表, 原始结果, 输入条件, 当前时间)
(存储用result_map, 返回用result_map_with_location, 传入的条件, 实际使用的事件时间)
"""
points = _fetch_points(point_ids, region_code)
if not points:
return [], [], {}, datetime.now()
return {}, {}, {}, occurred_time or datetime.now()
# 使用传入的时间,如果没有传则使用 rainfall_manager 中的全局查询时间,最后才用当前时间
if occurred_time:
query_time = occurred_time
else:
from app.core.rainfall_manager import rainfall_manager
query_time = rainfall_manager.get_current_query_time() or datetime.now()
model = get_rainfall_model()
raw_results = model.predict_multiple_points(points, rainfall=rainfall, duration=duration)
items = _build_prediction_items(raw_results)
raw_results = model.predict_multiple_points(points, rainfall=rainfall, duration=duration, query_time=query_time)
result_map = _build_prediction_map(raw_results) # 用于存储
result_map_with_location = _build_prediction_map_with_location(raw_results) # 用于返回
# 构建条件和结果用于保存
now = datetime.now()
# 存储传入的原始条件(降雨量和持续时间可能每个点不同,所以存储传入值)
condition = {
"point_ids": point_ids,
"region_code": region_code,
"rainfall": rainfall,
"duration": duration
"duration": duration,
"occurred_time": query_time.isoformat() if hasattr(query_time, 'isoformat') else str(query_time)
}
save_results = [
{
"point_id": r.get("source_id"), # 使用 source_id(隐患点/风险点ID)而非 xian_risk_factors.id
"source_type": r.get("source_type"),
"lon": r.get("lon"),
"lat": r.get("lat"),
"disaster_probabilities": r.get("disaster_probabilities", {}),
"disaster_levels": r.get("disaster_levels", {})
}
for r in raw_results
]
return items, save_results, condition, now
return result_map, result_map_with_location, condition, query_time
@router.post("/update-monitoring-time", summary="更新降雨监测查询时间")
@@ -119,8 +141,9 @@ async def update_monitoring_time(req: UpdateMonitoringTimeRequest):
@router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测")
async def predict_rainfall(req: RainfallPredictRequest):
"""
根据降雨量和持续时间,批量预测隐患点/风险点的灾害概率和等级
根据降雨量和持续时间,批量预测隐患点/风险点的灾害概率。
- **disaster_name**: 灾害名称
- **point_ids**: 点位ID列表(可选,不传则查询所有点)
- **region_code**: 行政区划代码(可选,不传则不限区域)
- **rainfall**: 累计降雨量(mm),不传则从气象表自动获取
@@ -132,9 +155,9 @@ async def predict_rainfall(req: RainfallPredictRequest):
async with semaphore:
loop = asyncio.get_event_loop()
try:
items, save_results, condition, now = await loop.run_in_executor(
result_map, result_map_with_location, condition, occurred_time = await loop.run_in_executor(
None, _predict_sync, req.point_ids, req.region_code,
req.rainfall, req.duration, req.operation_type
req.rainfall, req.duration, req.operation_type, req.occurred_time
)
except Exception as e:
logger.error(f"暴雨预测失败: {e}", exc_info=True)
@@ -142,17 +165,18 @@ async def predict_rainfall(req: RainfallPredictRequest):
# 保存推理结果
record_id = None
if save_results:
if result_map:
try:
record_id = dbn_repository.save_inference_result(
disaster_name=req.disaster_name,
event_type="rainfall",
occurred_time=now,
occurred_time=occurred_time,
operation_type=req.operation_type,
condition=condition,
result=save_results
result=result_map
)
logger.info(f"推理结果已保存,record_id={record_id}")
except Exception as e:
logger.error(f"保存推理结果失败: {e}", exc_info=True)
return PredictResponse(code=200, message="success", data=items, record_id=record_id)
return PredictResponse(code=200, message="success", data=PredictData(record_id=record_id, list=result_map_with_location))
+13 -12
View File
@@ -86,11 +86,11 @@ landslide:
# 暴雨+缓坡+松散堆积物:塬面黄土
- condition: {rain_intensity: [storm, downpour, extreme], slope: [low, medium], lithology: [unconsolidated]}
probability: 0.30
# 断裂带+陡坡(无强降雨也有基岩蠕变风险
- condition: {dist_to_fault: [very_close], slope: [high, very_high]}
# 断裂带+陡坡+降雨(无强降雨基岩蠕变速率极低,需降雨触发失稳
- condition: {rain_intensity: [light, moderate, heavy, storm, downpour, extreme], dist_to_fault: [very_close], slope: [high, very_high]}
probability: 0.25
# 城区+中坡:工程扰动
- condition: {landuse: [urban], slope: [medium, high]}
# 城区+中坡+降雨:工程扰动需降雨触发
- condition: {rain_intensity: [light, moderate, heavy, storm, downpour, extreme], landuse: [urban], slope: [medium, high]}
probability: 0.20
# === 低风险兜底(≤0.05===
@@ -151,8 +151,8 @@ debris_flow:
# 暴雨+缓坡+松散堆积物
- condition: {rain_intensity: [storm, downpour, extreme], slope: [low, medium], lithology: [unconsolidated]}
probability: 0.25
# 断裂带+陡坡(无降雨崩塌堆积物也可启动)
- condition: {dist_to_fault: [very_close], slope: [high, very_high]}
# 断裂带+陡坡+降雨(无降雨崩塌堆积物缺乏运移介质,泥石流无法启动)
- condition: {rain_intensity: [light, moderate, heavy, storm, downpour, extreme], dist_to_fault: [very_close], slope: [high, very_high]}
probability: 0.22
# === 低风险兜底(≤0.05===
@@ -320,8 +320,9 @@ collapse:
# 暴雨+陡坡+松散岩性(黄土/碎屑岩):黄土塬边崩塌
- condition: {rain_intensity: [storm, downpour, extreme], slope: [high, very_high], lithology: [terrigenous, unconsolidated, mixed_clastic]}
probability: 0.72
# 近河道+松散堆积物+陡坡:河流侧蚀致崩塌(渭河/灞河/浐河)
- condition: {dist_to_river: [very_close, close], lithology: [unconsolidated], slope: [high, very_high]}
# 近河道+松散堆积物+陡坡+降雨:河流侧蚀致崩塌(渭河/灞河/浐河)
# 降雨是崩塌的直接触发因素,无降雨时河流侧蚀是缓慢累积过程,不构成即时风险
- condition: {rain_intensity: [light, moderate, heavy, storm, downpour, extreme], dist_to_river: [very_close, close], lithology: [unconsolidated], slope: [high, very_high]}
probability: 0.70
# === 高风险(0.50-0.65===
@@ -342,14 +343,14 @@ collapse:
probability: 0.55
# === 中风险(0.20-0.40===
# 近河道+缓坡+松散堆积物:侧蚀累积效应
- condition: {dist_to_river: [very_close, close], lithology: [unconsolidated], slope: [medium]}
# 近河道+缓坡+松散堆积物+降雨:侧蚀累积效应需降雨触发
- condition: {rain_intensity: [light, moderate, heavy, storm, downpour, extreme], dist_to_river: [very_close, close], lithology: [unconsolidated], slope: [medium]}
probability: 0.35
# 大雨+中坡
- condition: {rain_intensity: [heavy], slope: [medium, high]}
probability: 0.30
# 断裂带+陡坡(无强降雨也有蠕变崩塌风险
- condition: {dist_to_fault: [very_close], slope: [high, very_high]}
# 断裂带+陡坡+降雨(无强降雨蠕变崩塌概率极低,需降雨触发
- condition: {rain_intensity: [light, moderate, heavy, storm, downpour, extreme], dist_to_fault: [very_close], slope: [high, very_high]}
probability: 0.25
# 暴雨+缓坡+松散岩性
- condition: {rain_intensity: [storm, downpour, extreme], slope: [low, medium], lithology: [unconsolidated]}
+5 -3
View File
@@ -689,12 +689,13 @@ class DbnRepository:
return None
@staticmethod
def save_inference_result(event_type: str, occurred_time, operation_type: str,
def save_inference_result(disaster_name: str, event_type: str, occurred_time, operation_type: str,
condition: dict, result: list) -> int:
"""
保存推理结果到 inference_result 表
Args:
disaster_name: 灾害名称
event_type: 事件类型('rainfall''earthquake'
occurred_time: 事件发生时间
operation_type: 操作类型
@@ -706,11 +707,12 @@ class DbnRepository:
"""
import json
sql = """
INSERT INTO xian_inference_result (event_type, occurred_time, operation_type, condition, result)
VALUES (%s, %s, %s, %s::jsonb, %s::jsonb)
INSERT INTO xian_inference_result (name, event_type, occurred_time, operation_type, condition, result)
VALUES (%s, %s, %s, %s, %s::jsonb, %s::jsonb)
RETURNING id
"""
row = db_helper.execute_query_one(sql, (
disaster_name,
event_type,
occurred_time,
operation_type,
+10 -10
View File
@@ -2,7 +2,7 @@
API 请求/响应数据模型
"""
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
@@ -12,6 +12,7 @@ from pydantic import BaseModel, Field
class RainfallPredictRequest(BaseModel):
"""暴雨灾害链预测请求"""
disaster_name: str = Field(min_length=1, max_length=255)
point_ids: Optional[List[int]] = Field(None, max_length=500,
description="点位ID列表,不传则查询所有点")
region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104'),不传则不限区域")
@@ -19,6 +20,7 @@ class RainfallPredictRequest(BaseModel):
description="累计降雨量(mm),不传则从气象表自动获取")
duration: Optional[float] = Field(None, ge=0,
description="降雨持续时间(h),不传则从气象表自动获取")
occurred_time: Optional[datetime] = Field(None, description="事件发生时间,不传则为当前时间")
operation_type: str = Field("模拟", min_length=1, max_length=50,
description="操作类型(如 '模拟', '实时监测', '应急评估'")
@@ -29,6 +31,7 @@ class RainfallPredictRequest(BaseModel):
class EarthquakePredictRequest(BaseModel):
"""地震灾害链预测请求"""
disaster_name: str = Field(min_length=1, max_length=255)
point_ids: Optional[List[int]] = Field(None, max_length=500,
description="点位ID列表,不传则查询所有点")
region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104'),不传则不限区域")
@@ -36,7 +39,7 @@ class EarthquakePredictRequest(BaseModel):
depth: float = Field(10.0, gt=0, le=700, description="震源深度(km),默认10km")
epicenter_lon: float = Field(..., ge=-180, le=180, description="震中经度")
epicenter_lat: float = Field(..., ge=-90, le=90, description="震中纬度")
occurred_time: datetime = Field(..., description="地震发生时间")
occurred_time: Optional[datetime] = Field(None, description="地震发生时间,不传则为当前时间")
operation_type: str = Field("模拟", min_length=1, max_length=50,
description="操作类型(如 '模拟', '实时监测', '应急评估'")
@@ -45,20 +48,17 @@ class EarthquakePredictRequest(BaseModel):
# 通用响应
# ============================================================
class PredictionItem(BaseModel):
"""单个点位预测结果"""
id: int = Field(..., description="点位ID")
type: str = Field(..., description="类型: 隐患点 / 风险点")
probability: float = Field(..., description="最大灾害概率")
level: str = Field(..., description="灾害等级: 低/中/较高/高")
class PredictData(BaseModel):
"""预测数据"""
record_id: Optional[int] = Field(None, description="推理结果记录ID")
list: Dict[str, Dict[str, Any]] = Field(default_factory=dict, description="预测结果列表,包含概率和经纬度")
class PredictResponse(BaseModel):
"""预测响应"""
code: int = Field(200, description="状态码")
message: str = Field("success", description="提示信息")
data: List[PredictionItem] = Field(default_factory=list, description="预测结果列表")
record_id: Optional[int] = Field(None, description="推理结果记录ID")
data: Optional[PredictData] = Field(None, description="预测数据")
class UpdateMonitoringTimeRequest(BaseModel):
+2
View File
@@ -8,6 +8,8 @@ RAIN_STATION_GRID_DIR = "/xian/rainfall/grid/images/:id"
REDIS_RAIN_STATION_GRID_KEY = "xian:rainfall:rain_station_grid"
# 雨量站存储标识符的redis的key
REDIS_RAIN_STATION_IDENTIFIER_KEY = "xian:rainfall:rain_station_identifier"
# 预测结果概率阈值(低于此值不返回给前端)
PREDICT_PROBABILITY_THRESHOLD = 50
# 开发环境
[development]