From 1f01ceb062f48b12be317119e0ee0fa366b0e44a Mon Sep 17 00:00:00 2001 From: wzy-warehouse <18135009705@163.com> Date: Sun, 28 Jun 2026 09:26:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=99=8D=E9=9B=A8=E9=87=8F?= =?UTF-8?q?=E4=B8=8E=E6=8C=81=E7=BB=AD=E6=97=B6=E9=97=B4=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/earthquake.py | 2 +- app/api/rainfall.py | 80 ++++++++++++++++++-- app/repositories/dbn_repository.py | 21 +++++- app/repositories/qgis_repository.py | 2 +- app/repositories/rainfall_repository.py | 97 +++++++++++++++++++++++++ app/schemas/api_schemas.py | 54 ++++++++++++-- 6 files changed, 240 insertions(+), 16 deletions(-) diff --git a/app/api/earthquake.py b/app/api/earthquake.py index 691cb9d..a60e79a 100644 --- a/app/api/earthquake.py +++ b/app/api/earthquake.py @@ -134,7 +134,7 @@ async def predict_earthquake(req: EarthquakePredictRequest): "point_ids": req.point_ids, "region_code": req.region_code, "magnitude": req.magnitude, - "depth": req.depth, # 已有默认值 10.0 + "depth": req.depth, "epicenter_lon": req.epicenter_lon, "epicenter_lat": req.epicenter_lat, "occurred_time": occurred_time.isoformat() if hasattr(occurred_time, 'isoformat') else str(occurred_time) diff --git a/app/api/rainfall.py b/app/api/rainfall.py index d399a33..179c46f 100644 --- a/app/api/rainfall.py +++ b/app/api/rainfall.py @@ -7,11 +7,16 @@ from typing import List, Dict, Any, Optional from fastapi import APIRouter, HTTPException -from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictData, UpdateMonitoringTimeRequest +from app.schemas.api_schemas import ( + RainfallPredictRequest, PredictResponse, PredictData, + UpdateMonitoringTimeRequest, DistrictSummaryRequest, DistrictSummaryItem +) from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore from app.repositories.dbn_repository import dbn_repository +from app.repositories.rainfall_repository import rainfall_repository as rain_repo from app.core.rainfall_manager import rainfall_manager from app.config.paths import get_logger +from app.utils.db_helper import db_helper from app.utils.time_converter import TimeConverter router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"]) @@ -141,13 +146,17 @@ async def update_monitoring_time(req: UpdateMonitoringTimeRequest): @router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测") async def predict_rainfall(req: RainfallPredictRequest): """ - 根据降雨量和持续时间,批量预测隐患点/风险点的灾害概率。 + 批量预测隐患点/风险点的灾害概率。两种模式(二选一): + + **自动推演模式**:rainfall、duration、region_code 全部不传 + → 从气象表自动获取各点最近站降雨数据,不限区域 + + **指定条件模式**:rainfall、duration、region_code 全部传入 + → 按指定降雨条件和区域预测 - **disaster_name**: 灾害名称 - - **point_ids**: 点位ID列表(可选,不传则查询所有点) - - **region_code**: 行政区划代码(可选,不传则不限区域) - - **rainfall**: 累计降雨量(mm),不传则从气象表自动获取 - - **duration**: 降雨持续时间(h),不传则从气象表自动获取 + - **point_ids**: 点位ID列表(可选) + - **occurred_time**: 事件发生时间(可选,不传则为当前时间) - **operation_type**: 操作类型(如 '实时监测', '情景模拟', '应急评估') """ semaphore = get_prediction_semaphore() @@ -180,3 +189,62 @@ async def predict_rainfall(req: RainfallPredictRequest): logger.error(f"保存推理结果失败: {e}", exc_info=True) return PredictResponse(code=200, message="success", data=PredictData(record_id=record_id, list=result_map_with_location)) + + +@router.post("/district-summary", summary="获取各区降雨概况") +async def district_summary(req: DistrictSummaryRequest): + """ + 根据推理结果ID获取各行政区的降雨概况(用于报告生成)。 + + 逻辑: + - 如果预测时传了 rainfall → 直接用 condition 中的值 + - 如果预测时没传 rainfall → 从 xian_meteorology 按区聚合实际气象数据 + + - **inference_id**: 推理结果ID + """ + # 1. 查推理结果 + record = dbn_repository.get_inference_result(req.inference_id) + if not record: + raise HTTPException(status_code=404, detail=f"推理结果不存在: {req.inference_id}") + + condition = record.get('condition') or {} + occurred_time = record.get('occurred_time') + + # 2. 如果传了 rainfall → 直接用 condition 中的值 + if condition.get('rainfall') is not None: + region_code = condition.get('region_code') + rainfall_val = float(condition['rainfall']) + duration_val = float(condition.get('duration', 0)) + + if region_code: + # 查 district 名称 + district_row = db_helper.execute_query_one( + "SELECT name FROM xian_district WHERE code = %s AND is_delete = 0", (region_code,) + ) + district_name = district_row['name'] if district_row else region_code + items = [{ + 'district_name': district_name, + 'district_code': region_code, + 'rainfall': rainfall_val, + 'duration_hours': duration_val + }] + else: + items = [{ + 'district_name': '全市', + 'district_code': '', + 'rainfall': rainfall_val, + 'duration_hours': duration_val + }] + else: + # 3. 自动推演模式 → 从气象表按区聚合 + items = rain_repo.get_district_rainfall_summary(occurred_time) + + if not items: + return {"code": 200, "message": "success", "data": []} + + # 转换为响应格式 + result_items = [ + DistrictSummaryItem(**item).model_dump() + for item in items + ] + return {"code": 200, "message": "success", "data": result_items} diff --git a/app/repositories/dbn_repository.py b/app/repositories/dbn_repository.py index 9535045..a93e18f 100644 --- a/app/repositories/dbn_repository.py +++ b/app/repositories/dbn_repository.py @@ -195,6 +195,7 @@ class DbnRepository: COUNT(*) as record_count FROM xian_meteorology WHERE datetime BETWEEN %s AND %s + AND is_delete = 0 AND rainfall_1h IS NOT NULL AND CAST(rainfall_1h AS DOUBLE PRECISION) > 0 GROUP BY lon, lat @@ -259,6 +260,7 @@ class DbnRepository: ST_SetSRID(ST_MakePoint(%s, %s), 4326) ) as dist FROM xian_meteorology + WHERE is_delete = 0 ) t WHERE dist < 50000 ORDER BY dist @@ -283,6 +285,7 @@ class DbnRepository: CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall FROM xian_meteorology WHERE lon = %s AND lat = %s + AND is_delete = 0 AND datetime BETWEEN %s AND %s ORDER BY datetime DESC """ @@ -326,7 +329,7 @@ class DbnRepository: """一次性加载所有气象站点坐标到内存(188个站点,约2KB)""" if cls._cached_stations is not None: return cls._cached_stations - sql = "SELECT DISTINCT lon, lat FROM xian_meteorology" + sql = "SELECT DISTINCT lon, lat FROM xian_meteorology WHERE is_delete = 0" cls._cached_stations = db_helper.execute_query(sql) logger.info(f"已缓存 {len(cls._cached_stations)} 个气象站点坐标") return cls._cached_stations @@ -407,6 +410,7 @@ class DbnRepository: SELECT lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall FROM xian_meteorology WHERE (lon, lat) IN ({placeholders}) + AND is_delete = 0 AND datetime BETWEEN %s AND %s ORDER BY lon, lat, datetime DESC """ @@ -688,6 +692,21 @@ class DbnRepository: return float(result['aspect']) return None + @staticmethod + def get_inference_result(inference_id: int) -> Optional[Dict[str, Any]]: + """ + 根据ID获取推理结果 + + Returns: + {id, name, event_type, occurred_time, operation_type, condition, result} + """ + sql = """ + SELECT id, name, event_type, occurred_time, operation_type, condition, result + FROM xian_inference_result + WHERE id = %s AND is_delete = 0 + """ + return db_helper.execute_query_one(sql, (inference_id,)) + @staticmethod def save_inference_result(disaster_name: str, event_type: str, occurred_time, operation_type: str, condition: dict, result: list) -> int: diff --git a/app/repositories/qgis_repository.py b/app/repositories/qgis_repository.py index 53af6ca..57ae1da 100644 --- a/app/repositories/qgis_repository.py +++ b/app/repositories/qgis_repository.py @@ -12,7 +12,7 @@ class QgisRepository: sql = """ SELECT id, name, event_type, occurred_time, condition FROM xian_inference_result - WHERE id = %s + WHERE id = %s AND is_delete = 0 """ rows = db_helper.execute_query(sql, (inference_id,)) if not rows: diff --git a/app/repositories/rainfall_repository.py b/app/repositories/rainfall_repository.py index 486934e..9e309bc 100644 --- a/app/repositories/rainfall_repository.py +++ b/app/repositories/rainfall_repository.py @@ -28,6 +28,7 @@ class RainfallRepository: SELECT max(id) as max_id FROM xian_meteorology WHERE datetime BETWEEN %s AND %s + AND is_delete = 0 """ result = db_helper.execute_query_one(sql, (start_time, end_time)) @@ -58,6 +59,7 @@ class RainfallRepository: CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h FROM xian_meteorology WHERE datetime BETWEEN %s AND %s + AND is_delete = 0 ORDER BY lon, lat, datetime DESC """ @@ -100,5 +102,100 @@ class RainfallRepository: return station_data + def get_district_rainfall_summary(self, query_time, region_code: Optional[str] = None) -> List[Dict[str, Any]]: + """ + 按行政区聚合降雨统计(取区内最大站点值) + + Args: + query_time: 查询时间 + region_code: 行政区划代码,不传则返回所有区 + + Returns: + [{district_name, district_code, rainfall, duration_hours}, ...] + """ + start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) + + # 一次查询:area_code + lon + lat + rainfall_1h + sql = """ + SELECT area_code, lon, lat, datetime, + CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h + FROM xian_meteorology + WHERE datetime BETWEEN %s AND %s + AND is_delete = 0 + ORDER BY area_code, lon, lat, datetime DESC + """ + rows = db_helper.execute_query(sql, (start_time, end_time)) + if not rows: + return [] + + # 按站点分组,计算 accum_rain + duration_hours(复用现有算法) + from itertools import groupby + + station_stats: Dict[tuple, Dict[str, Any]] = {} # (area_code, lon, lat) → stats + for (area_code, lon, lat), group in groupby(rows, + key=lambda r: (r['area_code'], r['lon'], r['lat'])): + accum_rain = 0.0 + duration_hours = 0 + consecutive_no_rain = 0 + for row in group: + rainfall = float(row['rainfall_1h']) if row['rainfall_1h'] else 0.0 + if rainfall > 0: + accum_rain += rainfall + duration_hours += 1 + consecutive_no_rain = 0 + else: + consecutive_no_rain += 1 + if consecutive_no_rain >= 3: + break + if accum_rain > 0: + duration_hours += 1 + + key = (area_code, lon, lat) + station_stats[key] = { + 'area_code': area_code, + 'accum_rain': accum_rain, + 'duration_hours': duration_hours + } + + # 按 area_code 聚合:取区内最大降雨站点 + district_max: Dict[str, Dict[str, Any]] = {} + for key, stats in station_stats.items(): + code = stats['area_code'] + if region_code and code != region_code: + continue + if code not in district_max or stats['accum_rain'] > district_max[code]['rainfall']: + district_max[code] = { + 'district_code': code, + 'rainfall': round(stats['accum_rain'], 1), + 'duration_hours': stats['duration_hours'] + } + + if not district_max: + return [] + + # 查 xian_district 获取名称 + codes = list(district_max.keys()) + placeholders = ', '.join(['%s'] * len(codes)) + sql = f"SELECT code, name FROM xian_district WHERE code IN ({placeholders}) AND is_delete = 0" + district_rows = db_helper.execute_query(sql, tuple(codes)) + code_to_name = {r['code']: r['name'] for r in district_rows} + + result = [] + for code, info in district_max.items(): + name = code_to_name.get(code) + if name is None: + continue # 跳过 xian_district 中不存在的代码 + result.append({ + 'district_name': name, + 'district_code': code, + 'rainfall': info['rainfall'], + 'duration_hours': info['duration_hours'] + }) + + # 按名称排序 + result.sort(key=lambda x: x['district_name']) + return result + + # 创建全局实例 rainfall_repository = RainfallRepository() diff --git a/app/schemas/api_schemas.py b/app/schemas/api_schemas.py index d175739..307c47a 100644 --- a/app/schemas/api_schemas.py +++ b/app/schemas/api_schemas.py @@ -3,7 +3,7 @@ API 请求/响应数据模型 """ from datetime import datetime from typing import List, Optional, Dict, Any -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator # ============================================================ @@ -11,19 +11,35 @@ from pydantic import BaseModel, Field # ============================================================ class RainfallPredictRequest(BaseModel): - """暴雨灾害链预测请求""" + """ + 暴雨灾害链预测请求 + + 参数规则(二选一): + 1. 自动推演模式:rainfall、duration、region_code 全部不传 → 从气象表自动获取,不限区域 + 2. 指定条件模式:rainfall、duration、region_code 全部传入 → 按指定条件预测 + """ disaster_name: str = Field(min_length=1, max_length=255) point_ids: Optional[List[int]] = Field(None, max_length=500, description="点位ID列表,不传则查询所有点") - region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104'),不传则不限区域") - rainfall: Optional[float] = Field(None, ge=0, - description="累计降雨量(mm),不传则从气象表自动获取") - duration: Optional[float] = Field(None, ge=0, - description="降雨持续时间(h),不传则从气象表自动获取") + region_code: Optional[str] = Field(None, description="行政区划代码(如 '610104')") + rainfall: Optional[float] = Field(None, ge=0, description="累计降雨量(mm)") + duration: Optional[float] = Field(None, ge=0, description="降雨持续时间(h)") occurred_time: Optional[datetime] = Field(None, description="事件发生时间,不传则为当前时间") operation_type: str = Field("模拟", min_length=1, max_length=50, description="操作类型(如 '模拟', '实时监测', '应急评估')") + @model_validator(mode='after') + def validate_mode_exclusivity(self): + """校验 rainfall / duration / region_code 必须要么全不传,要么全传""" + trio = (self.rainfall, self.duration, self.region_code) + none_count = sum(1 for v in trio if v is None) + if none_count not in (0, 3): + raise ValueError( + "rainfall、duration、region_code 必须全部传入或全部不传," + "不允许只传部分参数" + ) + return self + # ============================================================ # 地震预测 @@ -86,3 +102,27 @@ class HealthResponse(BaseModel): status: str = "ok" rainfall_model_loaded: bool = False earthquake_model_loaded: bool = False + + +# ============================================================ +# 各区降雨概况 +# ============================================================ + +class DistrictSummaryRequest(BaseModel): + """各区降雨概况请求""" + inference_id: int = Field(..., ge=1, description="推理结果ID(xian_inference_result.id)") + + +class DistrictSummaryItem(BaseModel): + """单个区的降雨概况""" + district_name: str = Field(..., description="行政区名称(如 '碑林区')") + district_code: str = Field(..., description="行政区划代码(如 '610103')") + rainfall: float = Field(..., description="累计降雨量(mm)") + duration_hours: float = Field(..., description="持续降雨时间(h)") + + +class DistrictSummaryResponse(BaseModel): + """各区降雨概况响应""" + code: int = Field(200, description="状态码") + message: str = Field("success", description="提示信息") + data: Optional[List[DistrictSummaryItem]] = Field(None, description="各区降雨概况列表")