From 316177c2ba98f4f8ab8562e2a3e631593b652ae0 Mon Sep 17 00:00:00 2001 From: wzy-warehouse <18135009705@163.com> Date: Fri, 12 Jun 2026 14:53:35 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E8=AE=A1=E7=AE=97=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=BB=A5=E5=8F=8A=E6=97=B6=E9=97=B4=E8=BD=AC=E6=8D=A2?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/launcher.py | 4 +- app/repositories/dbn_repository.py | 42 ++++---- app/repositories/rainfall_repository.py | 81 +++++++++------ app/services/rainfall_grid_service.py | 10 +- app/utils/time_converter.py | 127 ++++++++++++++++++++++++ 5 files changed, 215 insertions(+), 49 deletions(-) create mode 100644 app/utils/time_converter.py diff --git a/app/core/launcher.py b/app/core/launcher.py index 6e308ed..cefe795 100644 --- a/app/core/launcher.py +++ b/app/core/launcher.py @@ -3,6 +3,7 @@ """ import sys from pathlib import Path +from datetime import datetime from app.core.env_checker import check_environment from app.core.venv_manager import check_virtualenv @@ -61,7 +62,7 @@ class AppLauncher: return # 以下代码仅在虚拟环境中执行 - # 检查安装依赖(只执行一次) + # 检查安装依赖 check_dependencies(self.project_root) # 启动应用 @@ -104,6 +105,7 @@ def start(): # 启动降雨站点监测 logger.info("启动降雨站点监测服务...") + rainfall_manager.monitoring_rainfall_station_id('2025-09-16 20:00:00') # 阻塞主线程,防止程序立即退出 diff --git a/app/repositories/dbn_repository.py b/app/repositories/dbn_repository.py index 7439364..8215212 100644 --- a/app/repositories/dbn_repository.py +++ b/app/repositories/dbn_repository.py @@ -3,9 +3,10 @@ 负责从 xian_risk_factors、xian_meteorology 等表获取数据 """ import math -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Union from datetime import datetime from app.utils.db_helper import db_helper +from app.utils.time_converter import time_converter from app.config.paths import get_logger logger = get_logger("dbn") @@ -166,20 +167,23 @@ class DbnRepository: @staticmethod def get_nearest_station_rainfall(lon: float, lat: float, - query_time: Optional[datetime] = None) -> Dict[str, Any]: + query_time: Optional[Union[datetime, str]] = None) -> Dict[str, Any]: """ 获取最近雨量站的降雨数据 Args: lon: 经度 lat: 纬度 - query_time: 查询时间,若未提供则取当前时间 + query_time: 查询时间,若未提供则取当前时间。支持datetime对象或标准时间字符串(如'2025-07-04 20:00:00') Returns: 降雨数据 """ if query_time is None: query_time = datetime.now() + + # 获取时间范围(24小时窗口) + start_time, end_time = time_converter.to_db_time_range(query_time, hours=24) # noinspection SqlNoDataSourceInspection sql = """ @@ -190,9 +194,7 @@ class DbnRepository: SUM(CAST(rainfall_1h AS DOUBLE PRECISION)) as total_rainfall, COUNT(*) as record_count FROM xian_meteorology - WHERE datetime BETWEEN - CAST(EXTRACT(EPOCH FROM (%s::timestamp - INTERVAL '24 hours')) AS BIGINT) - AND CAST(EXTRACT(EPOCH FROM %s::timestamp) AS BIGINT) + WHERE datetime BETWEEN %s AND %s AND rainfall_1h IS NOT NULL AND CAST(rainfall_1h AS DOUBLE PRECISION) > 0 GROUP BY lon, lat @@ -210,7 +212,7 @@ class DbnRepository: ORDER BY distance LIMIT 1 """ - result = db_helper.execute_query_one(sql, (query_time, query_time, lon, lat)) + result = db_helper.execute_query_one(sql, (start_time, end_time, lon, lat)) if result: return { @@ -231,14 +233,14 @@ class DbnRepository: @staticmethod def get_rainfall_data_with_duration(lon: float, lat: float, - query_time: Optional[datetime] = None) -> Dict[str, Any]: + query_time: Optional[Union[datetime, str]] = None) -> Dict[str, Any]: """ 获取降雨数据,包括累计降雨量和持续时间 Args: lon: 经度 lat: 纬度 - query_time: 查询时间,若未提供则取当前时间 + query_time: 查询时间,若未提供则取当前时间。支持datetime对象或标准时间字符串(如'2025-07-04 20:00:00') Returns: 降雨数据:{accum_rain, duration_hours, rain_intensity} @@ -270,6 +272,9 @@ class DbnRepository: station_lon = station['lon'] station_lat = station['lat'] + # 获取时间范围(72小时窗口) + start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) + # 查询该站点的降雨时序数据 # noinspection SqlNoDataSourceInspection sql = """ @@ -278,12 +283,10 @@ class DbnRepository: CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall FROM xian_meteorology WHERE lon = %s AND lat = %s - AND datetime BETWEEN - CAST(EXTRACT(EPOCH FROM (%s::timestamp - INTERVAL '72 hours')) AS BIGINT) - AND CAST(EXTRACT(EPOCH FROM %s::timestamp) AS BIGINT) + AND datetime BETWEEN %s AND %s ORDER BY datetime DESC """ - results = db_helper.execute_query(sql, (station_lon, station_lat, query_time, query_time)) + results = db_helper.execute_query(sql, (station_lon, station_lat, start_time, end_time)) if not results: return {'accum_rain': 0.0, 'duration_hours': 0, 'rain_intensity': 0.0} @@ -357,13 +360,13 @@ class DbnRepository: @classmethod def get_rainfall_data_batch(cls, points: List[Dict[str, Any]], - query_time: Optional[datetime] = None) -> Dict[str, Dict[str, Any]]: + query_time: Optional[Union[datetime, str]] = None) -> Dict[str, Dict[str, Any]]: """ 批量获取多个点的降雨数据(2次DB查询替代 N×2次) Args: points: 预测点列表,每个含 {'id': str, 'lon': float, 'lat': float} - query_time: 查询时间 + query_time: 查询时间,支持datetime对象或标准时间字符串(如'2025-07-04 20:00:00') Returns: {point_id: {accum_rain, duration_hours, rain_intensity}} @@ -394,16 +397,17 @@ class DbnRepository: params: List[Any] = [] for slon, slat in station_keys: params.extend([slon, slat]) - params.extend([query_time, query_time]) + + # 获取时间范围(72小时窗口) + start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) + params.extend([start_time, end_time]) # noinspection SqlNoDataSourceInspection sql = f""" SELECT lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall FROM xian_meteorology WHERE (lon, lat) IN ({placeholders}) - AND datetime BETWEEN - CAST(EXTRACT(EPOCH FROM (%s::timestamp - INTERVAL '72 hours')) AS BIGINT) - AND CAST(EXTRACT(EPOCH FROM %s::timestamp) AS BIGINT) + AND datetime BETWEEN %s AND %s ORDER BY lon, lat, datetime DESC """ rows = db_helper.execute_query(sql, tuple(params)) diff --git a/app/repositories/rainfall_repository.py b/app/repositories/rainfall_repository.py index 06b57d4..486934e 100644 --- a/app/repositories/rainfall_repository.py +++ b/app/repositories/rainfall_repository.py @@ -2,74 +2,99 @@ 降雨数据仓库 负责数据库查询操作 """ -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Union from datetime import datetime from app.utils.db_helper import db_helper +from app.utils.time_converter import time_converter class RainfallRepository: """降雨数据仓库""" - def get_max_rainfall_id(self, query_time: datetime) -> Optional[int]: + def get_max_rainfall_id(self, query_time: Union[datetime, str]) -> Optional[int]: """ - 查询数据库中指定时间窗口内的最大ID + 查询数据库中指定时间窗口内的最大ID(72小时窗口) Args: - query_time: 查询时间 + query_time: 查询时间(datetime对象或标准时间字符串,如'2025-07-04 20:00:00') Returns: 最大ID,如果没有数据则返回None """ + # 获取时间范围 + start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) + sql = """ SELECT max(id) as max_id FROM xian_meteorology - WHERE datetime BETWEEN ( - to_char(%s::timestamp - interval '12 hours', 'YYYYMMDDHH24MISS') - )::bigint AND ( - to_char(%s::timestamp, 'YYYYMMDDHH24MISS') - )::bigint + WHERE datetime BETWEEN %s AND %s """ - result = db_helper.execute_query_one(sql, (query_time, query_time)) - + result = db_helper.execute_query_one(sql, (start_time, end_time)) + if result and result.get('max_id'): return int(result['max_id']) return None - def get_rainfall_stations_data(self, query_time: datetime) -> List[Dict[str, Any]]: + def get_rainfall_stations_data(self, query_time: Union[datetime, str]) -> List[Dict[str, Any]]: """ 查询雨量站点降雨数据 Args: - query_time: 查询时间 + query_time: 查询时间(datetime对象或标准时间字符串,如'2025-07-04 20:00:00') Returns: - 站点数据列表,每个元素包含lon, lat, rainfall + 站点数据列表,每个元素包含lon, lat, rainfall, duration_hours """ + # 获取时间范围 + start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) + + # 查询72小时内的降雨时序数据 sql = """ SELECT lon, lat, - SUM(rainfall_1h::numeric) AS rainfall + datetime, + CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h FROM xian_meteorology - WHERE datetime BETWEEN ( - to_char(%s::timestamp - interval '12 hours', 'YYYYMMDDHH24MISS') - )::bigint AND ( - to_char(%s::timestamp, 'YYYYMMDDHH24MISS') - )::bigint - GROUP BY lon, lat + WHERE datetime BETWEEN %s AND %s + ORDER BY lon, lat, datetime DESC """ + + results = db_helper.execute_query(sql, (start_time, end_time)) - results = db_helper.execute_query(sql, (query_time, query_time)) + if not results: + return [] + + # 按站点分组处理 + from itertools import groupby - # 转换数据格式 station_data = [] - for row in results: - if row.get('lon') and row.get('lat'): + for (lon, lat), group in groupby(results, key=lambda r: (r['lon'], r['lat'])): + accum_rain = 0.0 + duration_hours = 0 + consecutive_no_rain = 0 + + # 应用"连续3小时无雨截断"规则 + for row in group: + rainfall = float(row['rainfall_1h']) if row['rainfall_1h'] else 0.0 + if rainfall > 0: + accum_rain += rainfall + duration_hours += 1 + consecutive_no_rain = 0 + else: + consecutive_no_rain += 1 + if consecutive_no_rain >= 3: + break # 连续3小时无雨,停止累加 + if accum_rain > 0: + duration_hours += 1 + + if accum_rain > 0 or duration_hours > 0: station_data.append({ - 'lon': float(row['lon']), - 'lat': float(row['lat']), - 'rainfall': float(row['rainfall']) if row.get('rainfall') else 0.0 + 'lon': float(lon), + 'lat': float(lat), + 'rainfall': accum_rain, # 累计降雨量 + 'duration_hours': duration_hours # 持续时间 }) return station_data diff --git a/app/services/rainfall_grid_service.py b/app/services/rainfall_grid_service.py index d935edf..016ecc3 100644 --- a/app/services/rainfall_grid_service.py +++ b/app/services/rainfall_grid_service.py @@ -120,6 +120,10 @@ class RainfallGridService: def interpolate_rainfall(self, station_data: List[Dict[str, Any]]) -> Dict[str, Any]: """ 使用优化的反距离权重法(IDW)进行降雨插值 + + 注意:station_data 现在包含 'rainfall'(累计降雨量)和 'duration_hours'(持续时间) + 与DBN推演使用相同的降雨量计算逻辑(72小时回溯 + 3小时无雨截断) + 改进: 1. 高斯核衰减替代简单幂律 2. 自适应距离阈值 @@ -127,7 +131,11 @@ class RainfallGridService: 4. 高斯平滑减少突变 Args: - station_data: 站点数据列表 + station_data: 站点数据列表,格式: + [ + {'lon': x, 'lat': y, 'rainfall': z, 'duration_hours': h}, + ... + ] Returns: 插值结果字典 diff --git a/app/utils/time_converter.py b/app/utils/time_converter.py new file mode 100644 index 0000000..84c6034 --- /dev/null +++ b/app/utils/time_converter.py @@ -0,0 +1,127 @@ +""" +时间转换工具 +统一处理标准时间输入与数据库时间格式的转换 +""" +from datetime import datetime, timedelta +from typing import Union + + +class TimeConverter: + """时间转换器""" + + # 数据库时间格式:YYYYMMDDHHmmss(如 20250907070000) + DB_FORMAT = '%Y%m%d%H%M%S' + + # 标准输入格式:2025-07-04 20:00:00 或 2025-07-04T20:00:00 + INPUT_FORMATS = [ + '%Y-%m-%d %H:%M:%S', # 2025-07-04 20:00:00 + '%Y-%m-%dT%H:%M:%S', # 2025-07-04T20:00:00 + '%Y-%m-%d %H:%M', # 2025-07-04 20:00 + '%Y-%m-%dT%H:%M', # 2025-07-04T20:00 + ] + + @staticmethod + def parse_input_time(time_str: str) -> datetime: + """ + 解析标准时间输入字符串 + + 支持格式: + - 2025-07-04 20:00:00 + - 2025-07-04T20:00:00 + - 2025-07-04 20:00 + - 2025-07-04T20:00 + + Args: + time_str: 时间字符串 + + Returns: + datetime对象 + + Raises: + ValueError: 时间格式无法识别 + """ + if not time_str: + raise ValueError("时间字符串不能为空") + + # 尝试所有支持的格式 + for fmt in TimeConverter.INPUT_FORMATS: + try: + return datetime.strptime(time_str, fmt) + except ValueError: + continue + + raise ValueError(f"无法识别的时间格式: {time_str},支持格式: {TimeConverter.INPUT_FORMATS}") + + @staticmethod + def to_db_format(dt: datetime) -> int: + """ + 将datetime对象转换为数据库时间格式(YYYYMMDDHHmmss整数) + + Args: + dt: datetime对象 + + Returns: + YYYYMMDDHHmmss格式的整数 + """ + return int(dt.strftime(TimeConverter.DB_FORMAT)) + + @staticmethod + def from_db_format(db_time: int) -> datetime: + """ + 将数据库时间格式(YYYYMMDDHHmmss整数)转换为datetime对象 + + Args: + db_time: YYYYMMDDHHmmss格式的整数 + + Returns: + datetime对象 + """ + return datetime.strptime(str(db_time), TimeConverter.DB_FORMAT) + + @staticmethod + def to_db_time_range(query_time: Union[datetime, str], hours: int = 72) -> tuple: + """ + 将查询时间转换为数据库时间范围 + + Args: + query_time: 查询时间(datetime对象或标准时间字符串) + hours: 时间窗口(小时),默认72小时 + + Returns: + (start_time, end_time) 元组,均为YYYYMMDDHHmmss格式的整数 + """ + # 如果是字符串,先解析为datetime + if isinstance(query_time, str): + query_time = TimeConverter.parse_input_time(query_time) + + # 计算时间范围 + end_time = query_time + start_time = query_time - timedelta(hours=hours) + + # 转换为数据库格式 + return (TimeConverter.to_db_format(start_time), TimeConverter.to_db_format(end_time)) + + @staticmethod + def get_sql_time_condition(column: str = 'datetime', + query_time: Union[datetime, str] = None, + hours: int = 72) -> str: + """ + 生成SQL时间条件片段 + + Args: + column: 时间列名,默认'datetime' + query_time: 查询时间 + hours: 时间窗口(小时) + + Returns: + SQL条件字符串,如 "datetime BETWEEN 20250904070000 AND 20250907070000" + """ + if query_time is None: + query_time = datetime.now() + + start, end = TimeConverter.to_db_time_range(query_time, hours) + return f"{column} BETWEEN {start} AND {end}" + + +# 创建全局实例 +time_converter = TimeConverter() \ No newline at end of file