diff --git a/app/api/rainfall.py b/app/api/rainfall.py index e163a95..13e6ba5 100644 --- a/app/api/rainfall.py +++ b/app/api/rainfall.py @@ -7,10 +7,12 @@ from typing import List, Dict, Any, Optional from fastapi import APIRouter, HTTPException -from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictionItem +from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictionItem, UpdateMonitoringTimeRequest from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore from app.repositories.dbn_repository import dbn_repository +from app.core.rainfall_manager import rainfall_manager from app.config.paths import get_logger +from app.utils.time_converter import TimeConverter router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"]) logger = get_logger("api.rainfall") @@ -86,6 +88,34 @@ def _predict_sync(point_ids: Optional[List[int]], region_code: Optional[str], return items, save_results, condition, now +@router.post("/update-monitoring-time", summary="更新降雨监测查询时间") +async def update_monitoring_time(req: UpdateMonitoringTimeRequest): + """ + 更新降雨站点监测的查询时间,触发重新计算 + + - **query_time**: 新的查询时间,格式: YYYY-MM-DD HH:mm:ss + """ + try: + # 将字符串时间解析为 datetime 对象 + new_time = TimeConverter.parse_input_time(req.query_time) + + # 更新监测时间,触发重新计算 + result = rainfall_manager.update_query_time(new_time) + + logger.info(f"更新监测时间成功: {result}") + return { + "code": 200, + "message": "success", + "data": result + } + except ValueError as e: + logger.error(f"时间格式错误: {e}") + raise HTTPException(status_code=400, detail=f"时间格式错误: {e}") + except Exception as e: + logger.error(f"更新监测时间失败: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"更新监测时间失败: {e}") + + @router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测") async def predict_rainfall(req: RainfallPredictRequest): """ diff --git a/app/core/rainfall_manager.py b/app/core/rainfall_manager.py index 8cbc364..51454a9 100644 --- a/app/core/rainfall_manager.py +++ b/app/core/rainfall_manager.py @@ -2,6 +2,7 @@ 降雨管理器 负责降雨监测调度和任务编排 """ +import threading import time from datetime import datetime from typing import Optional @@ -16,6 +17,8 @@ class RainfallManager: """初始化降雨管理器""" self.logger = get_logger() self.last_max_id = None # 记录上次查询的最大ID + self._query_time: Optional[datetime] = None # 可动态更新的查询时间(全局变量) + self._lock = threading.Lock() # 线程锁,保证时间更新的线程安全 def monitoring_rainfall_station_id(self, query_time: Optional[datetime] = None): """ @@ -29,28 +32,61 @@ class RainfallManager: if query_time is None: query_time = datetime.now() + # 存储查询时间到实例变量(线程安全) + with self._lock: + self._query_time = query_time + self.logger.info(f"启动降雨站点监测,查询时间: {query_time}") - # 提交监测任务到线程池 + # 提交监测任务到线程池(不再传时间参数,循环内从 self._query_time 读取) thread_pool_manager.submit_task( self._monitoring_loop, - query_time, task_name="降雨站点ID监测" ) - def _monitoring_loop(self, initial_query_time: datetime): + def update_query_time(self, new_time: datetime) -> dict: + """ + 更新查询时间并重置监测状态,触发重新计算 + + Args: + new_time: 新的查询时间 + + Returns: + 包含操作结果的字典 + """ + with self._lock: + old_time = self._query_time + self._query_time = new_time + # 重置 last_max_id 为 None,强制触发重新计算 + self.last_max_id = None + + self.logger.info(f"查询时间已更新: {old_time} -> {new_time},将触发重新计算") + return { + "old_time": str(old_time) if old_time else None, + "new_time": str(new_time), + "message": "查询时间已更新,将在下一轮监测循环中触发重新计算" + } + + def get_current_query_time(self) -> Optional[datetime]: + """获取当前查询时间""" + with self._lock: + return self._query_time + + def _monitoring_loop(self): """ 监测循环,定期检查最大ID是否改变 - - Args: - initial_query_time: 初始查询时间 + 每次循环从 self._query_time 读取最新的查询时间 """ from app.repositories.rainfall_repository import rainfall_repository - query_time = initial_query_time - while True: try: + # 每次循环从全局变量读取最新的查询时间 + with self._lock: + query_time = self._query_time + if query_time is None: + query_time = datetime.now() + # 查询当前时间窗口内的最大ID max_id = rainfall_repository.get_max_rainfall_id(query_time) @@ -58,7 +94,7 @@ class RainfallManager: if self.last_max_id is None or max_id != self.last_max_id: from app.utils.thread_pool_manager import thread_pool_manager - self.logger.info(f"检测到数据更新,旧ID: {self.last_max_id}, 新ID: {max_id}") + self.logger.info(f"检测到数据更新,旧ID: {self.last_max_id}, 新ID: {max_id}, 查询时间: {query_time}") # 提交栅格生成任务 thread_pool_manager.submit_task( diff --git a/app/schemas/api_schemas.py b/app/schemas/api_schemas.py index 77f108c..af178fe 100644 --- a/app/schemas/api_schemas.py +++ b/app/schemas/api_schemas.py @@ -61,6 +61,11 @@ class PredictResponse(BaseModel): record_id: Optional[int] = Field(None, description="推理结果记录ID") +class UpdateMonitoringTimeRequest(BaseModel): + """更新监测时间请求""" + query_time: str = Field(..., description="查询时间,格式: YYYY-MM-DD HH:mm:ss,如 '2025-09-16 20:00:00'") + + class HealthResponse(BaseModel): """健康检查响应""" status: str = "ok"