""" 降雨数据仓库 负责数据库查询操作 """ from typing import Optional, List, Dict, Any, Union from datetime import datetime from app.utils.db_helper import db_helper from app.utils.time_converter import time_converter class RainfallRepository: """降雨数据仓库""" def get_max_rainfall_id(self, query_time: Union[datetime, str]) -> Optional[int]: """ 查询数据库中指定时间窗口内的最大ID(72小时窗口) Args: query_time: 查询时间(datetime对象或标准时间字符串,如'2025-07-04 20:00:00') Returns: 最大ID,如果没有数据则返回None """ # 获取时间范围 start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) sql = """ SELECT max(id) as max_id FROM xian_meteorology WHERE datetime BETWEEN %s AND %s AND is_delete = 0 """ result = db_helper.execute_query_one(sql, (start_time, end_time)) if result and result.get('max_id'): return int(result['max_id']) return None def get_rainfall_stations_data(self, query_time: Union[datetime, str]) -> List[Dict[str, Any]]: """ 查询雨量站点降雨数据 Args: query_time: 查询时间(datetime对象或标准时间字符串,如'2025-07-04 20:00:00') Returns: 站点数据列表,每个元素包含lon, lat, rainfall, duration_hours """ # 获取时间范围 start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) # 查询72小时内的降雨时序数据 sql = """ SELECT lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h FROM xian_meteorology WHERE datetime BETWEEN %s AND %s AND is_delete = 0 ORDER BY lon, lat, datetime DESC """ results = db_helper.execute_query(sql, (start_time, end_time)) if not results: return [] # 按站点分组处理 from itertools import groupby station_data = [] for (lon, lat), group in groupby(results, key=lambda r: (r['lon'], r['lat'])): accum_rain = 0.0 duration_hours = 0 consecutive_no_rain = 0 # 应用"连续3小时无雨截断"规则 for row in group: rainfall = float(row['rainfall_1h']) if row['rainfall_1h'] else 0.0 if rainfall > 0: accum_rain += rainfall duration_hours += 1 consecutive_no_rain = 0 else: consecutive_no_rain += 1 if consecutive_no_rain >= 3: break # 连续3小时无雨,停止累加 if accum_rain > 0: duration_hours += 1 if accum_rain > 0 or duration_hours > 0: station_data.append({ 'lon': float(lon), 'lat': float(lat), 'rainfall': accum_rain, # 累计降雨量 'duration_hours': duration_hours # 持续时间 }) return station_data def get_district_rainfall_summary(self, query_time, region_code: Optional[str] = None) -> List[Dict[str, Any]]: """ 按行政区聚合降雨统计(取区内最大站点值) Args: query_time: 查询时间 region_code: 行政区划代码,不传则返回所有区 Returns: [{district_name, district_code, rainfall, duration_hours}, ...] """ start_time, end_time = time_converter.to_db_time_range(query_time, hours=72) # 一次查询:area_code + lon + lat + rainfall_1h sql = """ SELECT area_code, lon, lat, datetime, CAST(rainfall_1h AS DOUBLE PRECISION) as rainfall_1h FROM xian_meteorology WHERE datetime BETWEEN %s AND %s AND is_delete = 0 ORDER BY area_code, lon, lat, datetime DESC """ rows = db_helper.execute_query(sql, (start_time, end_time)) if not rows: return [] # 按站点分组,计算 accum_rain + duration_hours(复用现有算法) from itertools import groupby station_stats: Dict[tuple, Dict[str, Any]] = {} # (area_code, lon, lat) → stats for (area_code, lon, lat), group in groupby(rows, key=lambda r: (r['area_code'], r['lon'], r['lat'])): accum_rain = 0.0 duration_hours = 0 consecutive_no_rain = 0 for row in group: rainfall = float(row['rainfall_1h']) if row['rainfall_1h'] else 0.0 if rainfall > 0: accum_rain += rainfall duration_hours += 1 consecutive_no_rain = 0 else: consecutive_no_rain += 1 if consecutive_no_rain >= 3: break if accum_rain > 0: duration_hours += 1 key = (area_code, lon, lat) station_stats[key] = { 'area_code': area_code, 'accum_rain': accum_rain, 'duration_hours': duration_hours } # 按 area_code 聚合:取区内最大降雨站点 district_max: Dict[str, Dict[str, Any]] = {} for key, stats in station_stats.items(): code = stats['area_code'] if region_code and code != region_code: continue if code not in district_max or stats['accum_rain'] > district_max[code]['rainfall']: district_max[code] = { 'district_code': code, 'rainfall': round(stats['accum_rain'], 1), 'duration_hours': stats['duration_hours'] } if not district_max: return [] # 查 xian_district 获取名称 codes = list(district_max.keys()) placeholders = ', '.join(['%s'] * len(codes)) sql = f"SELECT code, name FROM xian_district WHERE code IN ({placeholders}) AND is_delete = 0" district_rows = db_helper.execute_query(sql, tuple(codes)) code_to_name = {r['code']: r['name'] for r in district_rows} result = [] for code, info in district_max.items(): name = code_to_name.get(code) if name is None: continue # 跳过 xian_district 中不存在的代码 result.append({ 'district_name': name, 'district_code': code, 'rainfall': info['rainfall'], 'duration_hours': info['duration_hours'] }) # 按名称排序 result.sort(key=lambda x: x['district_name']) return result # 创建全局实例 rainfall_repository = RainfallRepository()