添加修改时间触发逻辑
This commit is contained in:
+31
-1
@@ -7,10 +7,12 @@ from typing import List, Dict, Any, Optional
|
|||||||
|
|
||||||
from fastapi import APIRouter, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
|
|
||||||
from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictionItem
|
from app.schemas.api_schemas import RainfallPredictRequest, PredictResponse, PredictionItem, UpdateMonitoringTimeRequest
|
||||||
from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore
|
from app.utils.api_deps import get_rainfall_model, get_prediction_semaphore
|
||||||
from app.repositories.dbn_repository import dbn_repository
|
from app.repositories.dbn_repository import dbn_repository
|
||||||
|
from app.core.rainfall_manager import rainfall_manager
|
||||||
from app.config.paths import get_logger
|
from app.config.paths import get_logger
|
||||||
|
from app.utils.time_converter import TimeConverter
|
||||||
|
|
||||||
router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"])
|
router = APIRouter(prefix="/rainfall", tags=["暴雨灾害链"])
|
||||||
logger = get_logger("api.rainfall")
|
logger = get_logger("api.rainfall")
|
||||||
@@ -86,6 +88,34 @@ def _predict_sync(point_ids: Optional[List[int]], region_code: Optional[str],
|
|||||||
return items, save_results, condition, now
|
return items, save_results, condition, now
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/update-monitoring-time", summary="更新降雨监测查询时间")
|
||||||
|
async def update_monitoring_time(req: UpdateMonitoringTimeRequest):
|
||||||
|
"""
|
||||||
|
更新降雨站点监测的查询时间,触发重新计算
|
||||||
|
|
||||||
|
- **query_time**: 新的查询时间,格式: YYYY-MM-DD HH:mm:ss
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 将字符串时间解析为 datetime 对象
|
||||||
|
new_time = TimeConverter.parse_input_time(req.query_time)
|
||||||
|
|
||||||
|
# 更新监测时间,触发重新计算
|
||||||
|
result = rainfall_manager.update_query_time(new_time)
|
||||||
|
|
||||||
|
logger.info(f"更新监测时间成功: {result}")
|
||||||
|
return {
|
||||||
|
"code": 200,
|
||||||
|
"message": "success",
|
||||||
|
"data": result
|
||||||
|
}
|
||||||
|
except ValueError as e:
|
||||||
|
logger.error(f"时间格式错误: {e}")
|
||||||
|
raise HTTPException(status_code=400, detail=f"时间格式错误: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"更新监测时间失败: {e}", exc_info=True)
|
||||||
|
raise HTTPException(status_code=500, detail=f"更新监测时间失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
@router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测")
|
@router.post("/predict", response_model=PredictResponse, summary="暴雨灾害链预测")
|
||||||
async def predict_rainfall(req: RainfallPredictRequest):
|
async def predict_rainfall(req: RainfallPredictRequest):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
降雨管理器
|
降雨管理器
|
||||||
负责降雨监测调度和任务编排
|
负责降雨监测调度和任务编排
|
||||||
"""
|
"""
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
@@ -16,6 +17,8 @@ class RainfallManager:
|
|||||||
"""初始化降雨管理器"""
|
"""初始化降雨管理器"""
|
||||||
self.logger = get_logger()
|
self.logger = get_logger()
|
||||||
self.last_max_id = None # 记录上次查询的最大ID
|
self.last_max_id = None # 记录上次查询的最大ID
|
||||||
|
self._query_time: Optional[datetime] = None # 可动态更新的查询时间(全局变量)
|
||||||
|
self._lock = threading.Lock() # 线程锁,保证时间更新的线程安全
|
||||||
|
|
||||||
def monitoring_rainfall_station_id(self, query_time: Optional[datetime] = None):
|
def monitoring_rainfall_station_id(self, query_time: Optional[datetime] = None):
|
||||||
"""
|
"""
|
||||||
@@ -29,28 +32,61 @@ class RainfallManager:
|
|||||||
if query_time is None:
|
if query_time is None:
|
||||||
query_time = datetime.now()
|
query_time = datetime.now()
|
||||||
|
|
||||||
|
# 存储查询时间到实例变量(线程安全)
|
||||||
|
with self._lock:
|
||||||
|
self._query_time = query_time
|
||||||
|
|
||||||
self.logger.info(f"启动降雨站点监测,查询时间: {query_time}")
|
self.logger.info(f"启动降雨站点监测,查询时间: {query_time}")
|
||||||
|
|
||||||
# 提交监测任务到线程池
|
# 提交监测任务到线程池(不再传时间参数,循环内从 self._query_time 读取)
|
||||||
thread_pool_manager.submit_task(
|
thread_pool_manager.submit_task(
|
||||||
self._monitoring_loop,
|
self._monitoring_loop,
|
||||||
query_time,
|
|
||||||
task_name="降雨站点ID监测"
|
task_name="降雨站点ID监测"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _monitoring_loop(self, initial_query_time: datetime):
|
def update_query_time(self, new_time: datetime) -> dict:
|
||||||
"""
|
"""
|
||||||
监测循环,定期检查最大ID是否改变
|
更新查询时间并重置监测状态,触发重新计算
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
initial_query_time: 初始查询时间
|
new_time: 新的查询时间
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含操作结果的字典
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
old_time = self._query_time
|
||||||
|
self._query_time = new_time
|
||||||
|
# 重置 last_max_id 为 None,强制触发重新计算
|
||||||
|
self.last_max_id = None
|
||||||
|
|
||||||
|
self.logger.info(f"查询时间已更新: {old_time} -> {new_time},将触发重新计算")
|
||||||
|
return {
|
||||||
|
"old_time": str(old_time) if old_time else None,
|
||||||
|
"new_time": str(new_time),
|
||||||
|
"message": "查询时间已更新,将在下一轮监测循环中触发重新计算"
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_current_query_time(self) -> Optional[datetime]:
|
||||||
|
"""获取当前查询时间"""
|
||||||
|
with self._lock:
|
||||||
|
return self._query_time
|
||||||
|
|
||||||
|
def _monitoring_loop(self):
|
||||||
|
"""
|
||||||
|
监测循环,定期检查最大ID是否改变
|
||||||
|
每次循环从 self._query_time 读取最新的查询时间
|
||||||
"""
|
"""
|
||||||
from app.repositories.rainfall_repository import rainfall_repository
|
from app.repositories.rainfall_repository import rainfall_repository
|
||||||
|
|
||||||
query_time = initial_query_time
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
# 每次循环从全局变量读取最新的查询时间
|
||||||
|
with self._lock:
|
||||||
|
query_time = self._query_time
|
||||||
|
if query_time is None:
|
||||||
|
query_time = datetime.now()
|
||||||
|
|
||||||
# 查询当前时间窗口内的最大ID
|
# 查询当前时间窗口内的最大ID
|
||||||
max_id = rainfall_repository.get_max_rainfall_id(query_time)
|
max_id = rainfall_repository.get_max_rainfall_id(query_time)
|
||||||
|
|
||||||
@@ -58,7 +94,7 @@ class RainfallManager:
|
|||||||
if self.last_max_id is None or max_id != self.last_max_id:
|
if self.last_max_id is None or max_id != self.last_max_id:
|
||||||
from app.utils.thread_pool_manager import thread_pool_manager
|
from app.utils.thread_pool_manager import thread_pool_manager
|
||||||
|
|
||||||
self.logger.info(f"检测到数据更新,旧ID: {self.last_max_id}, 新ID: {max_id}")
|
self.logger.info(f"检测到数据更新,旧ID: {self.last_max_id}, 新ID: {max_id}, 查询时间: {query_time}")
|
||||||
|
|
||||||
# 提交栅格生成任务
|
# 提交栅格生成任务
|
||||||
thread_pool_manager.submit_task(
|
thread_pool_manager.submit_task(
|
||||||
|
|||||||
@@ -61,6 +61,11 @@ class PredictResponse(BaseModel):
|
|||||||
record_id: Optional[int] = Field(None, description="推理结果记录ID")
|
record_id: Optional[int] = Field(None, description="推理结果记录ID")
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateMonitoringTimeRequest(BaseModel):
|
||||||
|
"""更新监测时间请求"""
|
||||||
|
query_time: str = Field(..., description="查询时间,格式: YYYY-MM-DD HH:mm:ss,如 '2025-09-16 20:00:00'")
|
||||||
|
|
||||||
|
|
||||||
class HealthResponse(BaseModel):
|
class HealthResponse(BaseModel):
|
||||||
"""健康检查响应"""
|
"""健康检查响应"""
|
||||||
status: str = "ok"
|
status: str = "ok"
|
||||||
|
|||||||
Reference in New Issue
Block a user