diff --git a/app/api/rainfall.py b/app/api/rainfall.py index 42652fb..6223b66 100644 --- a/app/api/rainfall.py +++ b/app/api/rainfall.py @@ -34,7 +34,7 @@ async def get_rainfall_grid(request: RainfallGridRequest): 返回适合Cesium渲染的GeoJSON格式数据。 Args: - request: 包含时间和分辨率的请求 + request: 包含时间、分辨率和持续时间的请求 Returns: GeoJSON格式的栅格数据 @@ -44,10 +44,15 @@ async def get_rainfall_grid(request: RainfallGridRequest): now = datetime.now() query_time = datetime.fromisoformat(request.time) if request.time else now - # 调用服务层生成栅格(自动查询前12小时数据) + # 验证duration参数 + if request.duration not in [12, 24]: + raise ValueError("duration参数必须为12或24") + + # 调用服务层生成栅格(自动查询前12小时或24小时数据) geojson_data = rainfall_service.generate_rainfall_grid( query_time=query_time, - resolution=request.resolution + resolution=request.resolution, + duration=request.duration ) if not geojson_data: @@ -75,13 +80,15 @@ async def get_rainfall_grid(request: RainfallGridRequest): @router.get("/stations", response_model=StationsResponse, summary="获取雨量站点数据") async def get_rainfall_stations( - time: str = Query(..., description="查询时间 ISO格式(自动查询前12小时数据)") + time: str = Query(..., description="查询时间 ISO格式(自动查询前12小时或24小时数据)"), + duration: int = Query(12, description="持续时间(小时),可选12或24", ge=12, le=24) ): """ 获取指定时间的雨量站点原始数据 Args: time: 查询时间 + duration: 持续时间(12或24小时) Returns: 站点列表,包含经纬度和降雨量 @@ -89,9 +96,10 @@ async def get_rainfall_stations( try: query_time = datetime.fromisoformat(time) - # 调用服务层获取站点数据(自动查询前12小时数据) + # 调用服务层获取站点数据(自动查询前12小时或24小时数据) stations = rainfall_service.get_stations_data( - query_time=query_time + query_time=query_time, + duration=duration ) return StationsResponse( @@ -112,7 +120,8 @@ async def get_rainfall_stations( async def get_rainfall_at_point( longitude: float, latitude: float, - time: Optional[str] = None + time: Optional[str] = None, + duration: int = Query(12, description="持续时间(小时),可选12或24", ge=12, le=24) ): """ 查询指定经纬度位置的降雨量 @@ -120,7 +129,8 @@ async def get_rainfall_at_point( Args: longitude: 经度 latitude: 纬度 - time: 查询时间(可选,默认当前时间,自动查询前12小时数据) + time: 查询时间(可选,默认当前时间,自动查询前12小时或24小时数据) + duration: 持续时间(12或24小时) Returns: 该点位的降雨量信息 @@ -132,12 +142,17 @@ async def get_rainfall_at_point( now = datetime.now() query_time = datetime.fromisoformat(time) if time else now - # 调用服务层查询(自动查询前12小时数据) + # 验证duration参数 + if duration not in [12, 24]: + raise ValueError("duration参数必须为12或24") + + # 调用服务层查询(自动查询前12小时或24小时数据) service = RainfallService() rainfall_info = service.get_rainfall_at_point( longitude=longitude, latitude=latitude, - query_time=query_time + query_time=query_time, + duration=duration ) if not rainfall_info: diff --git a/app/repositories/rainfall_repository.py b/app/repositories/rainfall_repository.py index aabd483..57b7af3 100644 --- a/app/repositories/rainfall_repository.py +++ b/app/repositories/rainfall_repository.py @@ -15,30 +15,31 @@ class RainfallRepository: @staticmethod def query_stations_rainfall( - query_time: datetime + query_time: datetime, + duration: int = 12 ) -> List[Dict[str, Any]]: """ - 查询指定时间的站点降雨数据(自动查询前12小时) + 查询指定时间的站点降雨数据(自动查询前12小时或24小时) Args: query_time: 查询时间 + duration: 持续时间(12或24小时) Returns: 站点降雨数据列表 """ - sql = """ + sql = f""" SELECT - m.lon, - m.lat, - SUM(m.rainfall_1h::numeric) AS rainfall - FROM xian_meteorology m - WHERE m.datetime BETWEEN ( - to_char(timestamp :query_time - interval '12 hours', 'YYYYMMDDHH24MISS') + lon, + lat, + SUM(rainfall_1h::numeric) AS rainfall + FROM xian_meteorology + WHERE datetime BETWEEN ( + to_char(timestamp :query_time - interval '{duration} hours', 'YYYYMMDDHH24MISS') )::bigint AND ( to_char(timestamp :query_time, 'YYYYMMDDHH24MISS') )::bigint - GROUP BY m.lon, m.lat - ORDER BY rainfall DESC + GROUP BY lon, lat """ params = { @@ -47,7 +48,7 @@ class RainfallRepository: try: result = db_manager.execute_raw_sql(sql, params) - logger.info(f"查询到 {len(result)} 个站点数据") + logger.info(f"查询到 {len(result)} 个站点数据({duration}小时)") return result except Exception as e: logger.error(f"查询站点降雨数据失败: {e}") diff --git a/app/schemas/rainfall.py b/app/schemas/rainfall.py index eae9f8f..e4d93bb 100644 --- a/app/schemas/rainfall.py +++ b/app/schemas/rainfall.py @@ -10,7 +10,7 @@ class RainfallGridRequest(BaseModel): time: Optional[str] = Field( None, alias="time", - description="查询时间 ISO格式,默认为当前时间(自动查询前12小时数据)", + description="查询时间 ISO格式,默认为当前时间(自动查询前12小时或24小时数据)", example="2024-01-01T12:00:00" ) resolution: float = Field( @@ -20,6 +20,13 @@ class RainfallGridRequest(BaseModel): gt=0, le=0.1 ) + duration: int = Field( + 12, + alias="duration", + description="持续时间(小时),可选12或24", + ge=12, + le=24 + ) class Config: populate_by_name = True # 允许同时使用字段名和别名 diff --git a/app/services/rainfall_service.py b/app/services/rainfall_service.py index 9b343dc..b2573c0 100644 --- a/app/services/rainfall_service.py +++ b/app/services/rainfall_service.py @@ -15,6 +15,117 @@ logger = setup_logging() class InterpolationService: """插值服务类""" + @staticmethod + def _create_buffer_points( + points_array: np.ndarray + ) -> np.ndarray: + """ + 创建缓冲点:在原始站点外围生成虚拟点以扩展插值区域 + + Args: + points_array: 原始站点坐标数组 + + Returns: + 缓冲点坐标数组 + """ + # 计算站点分布的中心 + center = np.mean(points_array, axis=0) + + # 计算站点到中心的最大距离 + distances_from_center = np.sqrt(np.sum((points_array - center) ** 2, axis=1)) + np.max(distances_from_center) + + # 在站点外围生成缓冲点(沿着各个方向扩展) + buffer_points = [] + num_angles = 360 # 每隔1度生成一个缓冲点 + + for angle_deg in range(0, 360, 360 // num_angles): + angle_rad = np.radians(angle_deg) + # 在凸包边界外扩展 + for scale in [1.05, 1.1, 1.15]: + # 找到该方向上最远的站点 + direction = np.array([np.cos(angle_rad), np.sin(angle_rad)]) + projections = points_array @ direction + max_idx = np.argmax(projections) + + # 在该方向上扩展 + base_point = points_array[max_idx] + buffer_point = center + (base_point - center) * scale + buffer_points.append(buffer_point) + + return np.array(buffer_points) + + @staticmethod + def gaussian_smoothing( + grid_data: np.ndarray, + sigma: float = 1.5 + ) -> np.ndarray: + """ + 高斯平滑滤波,减少边缘突变 + + Args: + grid_data: 栅格数据 + sigma: 高斯核标准差 + + Returns: + 平滑后的栅格数据 + """ + from scipy.ndimage import gaussian_filter + + # 只对有效数据进行平滑 + valid_mask = ~np.isnan(grid_data) + if not np.any(valid_mask): + return grid_data + + # 填充NaN值以便平滑 + filled_data = grid_data.copy() + mean_val = np.nanmean(grid_data) + filled_data[~valid_mask] = mean_val + + # 应用高斯滤波 + smoothed = gaussian_filter(filled_data, sigma=sigma) + + # 恢复原始NaN区域 + result = np.where(valid_mask, smoothed, np.nan) + + return result + + @staticmethod + def calculate_adaptive_max_distance( + points_array: np.ndarray, + base_distance: float = 0.3, + min_distance: float = 0.15, + max_distance: float = 0.5 + ) -> float: + """ + 根据站点密度自适应计算最大影响距离 + + Args: + points_array: 站点坐标数组 + base_distance: 基础距离 + min_distance: 最小距离 + max_distance: 最大距离 + + Returns: + 自适应的最大影响距离 + """ + if len(points_array) < 3: + return base_distance + + # 计算站点间的平均距离 + from scipy.spatial import distance_matrix + dist_matrix = distance_matrix(points_array, points_array) + + # 排除对角线(自身距离为0) + np.fill_diagonal(dist_matrix, np.inf) + avg_distance = np.mean(np.min(dist_matrix, axis=1)) + + # 根据平均距离调整max_distance + adaptive_distance = avg_distance * 3 # 约3倍平均站点间距 + + # 限制在合理范围内 + return np.clip(adaptive_distance, min_distance, max_distance) + @staticmethod def inverse_distance_weighting( points: List[Tuple[float, float]], @@ -22,20 +133,29 @@ class InterpolationService: grid_lon: np.ndarray, grid_lat: np.ndarray, power: float = 2.0, - max_distance: float = 0.5, - edge_buffer: float = 0.15 + max_distance: float = None, + use_adaptive_distance: bool = True, + apply_smoothing: bool = True, + smoothing_sigma: float = 1.0 ) -> np.ndarray: """ - 反距离权重插值 (IDW) - 向量化优化版本 + 反距离权重插值 (IDW) - 优化版本 + 改进: + 1. 高斯核衰减替代简单幂律 + 2. 自适应距离阈值 + 3. 边缘渐变处理 + 4. 高斯平滑减少突变 Args: points: 已知点坐标 [(lon, lat), ...] values: 已知点的值 [rainfall, ...] grid_lon: 网格经度数组 grid_lat: 网格纬度数组 - power: 距离幂次 - max_distance: 最大影响距离(度),超出此距离的点不参与插值 - edge_buffer: 边缘缓冲距离,站点外围扩展此距离再计算凸包 + power: 距离幂次(基础值) + max_distance: 最大影响距离(度),None则自适应计算 + use_adaptive_distance: 是否使用自适应距离 + apply_smoothing: 是否应用平滑 + smoothing_sigma: 平滑强度 Returns: 插值后的栅格数据,无效区域为 NaN @@ -47,14 +167,26 @@ class InterpolationService: lon_grid, lat_grid = np.meshgrid(grid_lon, grid_lat) result = np.full_like(lon_grid, np.nan) + # 自适应计算最大距离 + if use_adaptive_distance or max_distance is None: + actual_max_distance = InterpolationService.calculate_adaptive_max_distance( + points_array + ) + if max_distance is not None: + actual_max_distance = min(actual_max_distance, max_distance) + else: + actual_max_distance = max_distance + + logger.info(f"使用最大影响距离: {actual_max_distance:.3f} 度") + # 计算站点的凸包(带边缘缓冲) hull_mask = None + confidence_mask = None # 置信度掩码 if len(points_array) >= 3: try: # 创建缓冲站点:在原始站点外围添加虚拟点 buffer_points = InterpolationService._create_buffer_points( - points_array, - buffer_distance=edge_buffer + points_array ) # 合并原始站点和缓冲站点 @@ -67,23 +199,40 @@ class InterpolationService: # 向量化判断所有网格点是否在凸包内 grid_points = np.column_stack([lon_grid.ravel(), lat_grid.ravel()]) - hull_mask = tri.find_simplex(grid_points) >= 0 + hull_indices = tri.find_simplex(grid_points) + hull_mask = hull_indices >= 0 hull_mask = hull_mask.reshape(lon_grid.shape) - except: + + # 计算置信度:基于到最近站点的距离 + # 在凸包内但远离站点的区域降低置信度 + from scipy.spatial import distance_matrix + grid_valid = grid_points[hull_mask.ravel()] + if len(grid_valid) > 0: + dist_to_stations = distance_matrix(grid_valid, points_array) + min_distances = np.min(dist_to_stations, axis=1) + + # 创建置信度掩码(距离越远,置信度越低) + confidence = np.ones(len(grid_points)) + confidence[hull_mask.ravel()] = np.exp(-min_distances / actual_max_distance) + confidence_mask = confidence.reshape(lon_grid.shape) + else: + confidence_mask = np.ones_like(lon_grid) + + except Exception as e: + logger.warning(f"凸包计算失败: {e},使用全区域插值") hull_mask = np.ones_like(lon_grid, dtype=bool) + confidence_mask = np.ones_like(lon_grid) else: hull_mask = np.ones_like(lon_grid, dtype=bool) + confidence_mask = np.ones_like(lon_grid) # 向量化计算所有网格点到所有站点的距离 - # grid_lon shape: (num_lat, num_lon) - # points_array[:, 0] shape: (num_stations,) - # 使用广播机制 lon_diff = lon_grid[:, :, np.newaxis] - points_array[np.newaxis, np.newaxis, :, 0] lat_diff = lat_grid[:, :, np.newaxis] - points_array[np.newaxis, np.newaxis, :, 1] distances = np.sqrt(lon_diff**2 + lat_diff**2) # 过滤超出最大距离的站点 - valid_mask = distances <= max_distance + valid_mask = distances <= actual_max_distance # 对于每个网格点,检查是否有有效站点 has_valid_stations = np.any(valid_mask, axis=2) @@ -95,46 +244,79 @@ class InterpolationService: distances = np.where(valid_mask, distances, np.inf) distances = np.maximum(distances, 1e-10) - # IDW权重计算 - weights = 1.0 / (distances ** power) + # 优化的权重计算:结合幂律和高斯衰减 + # 近处使用幂律,远处使用高斯衰减使过渡更平滑 + power_weights = 1.0 / (distances ** power) + gaussian_weights = np.exp(-0.5 * (distances / (actual_max_distance * 0.5)) ** 2) + + # 混合权重:距离越远,高斯权重占比越大 + distance_ratio = distances / actual_max_distance + mix_factor = np.clip(distance_ratio, 0, 1) + weights = (1 - mix_factor) * power_weights + mix_factor * gaussian_weights + weights = np.where(valid_mask, weights, 0) # 加权平均 weighted_sum = np.sum(weights * values_array[np.newaxis, np.newaxis, :], axis=2) weight_total = np.sum(weights, axis=2) - # 计算最终结果 - result = np.where( - final_mask & (weight_total > 0), - weighted_sum / weight_total, - np.nan - ) + # 计算基础插值结果(使用 errstate 忽略预期的除零警告,np.where 已安全过滤) + with np.errstate(divide='ignore', invalid='ignore'): + result = np.where( + final_mask & (weight_total > 0), + weighted_sum / weight_total, + np.nan + ) + + # 应用置信度调整:边缘区域向邻近值渐变 + if confidence_mask is not None: + # 计算全局平均降雨量作为边缘区域的基准 + valid_rainfall = result[final_mask] + if len(valid_rainfall) > 0: + mean_rainfall = np.mean(valid_rainfall) + # 边缘区域向平均值渐变 + result = np.where( + final_mask, + result, + np.nan + ) + # 根据置信度调整结果,低置信度区域向均值靠拢 + adjusted_result = result * confidence_mask + mean_rainfall * (1 - confidence_mask) + result = np.where(final_mask, adjusted_result, np.nan) + + # 应用高斯平滑减少边缘突变 + if apply_smoothing: + result = InterpolationService.gaussian_smoothing(result, sigma=smoothing_sigma) return result @staticmethod - def get_rainfall_color(rainfall: float) -> str: + def get_rainfall_color(rainfall: float, duration: int = 12) -> str: """ - 根据降雨量获取颜色(蓝色渐变) + 根据降雨量获取颜色(按照国标) Args: rainfall: 降雨量(mm) + duration: 持续时间(12或24小时) Returns: 颜色字符串 "rgba(r,g,b,a)" """ + # 国标降雨等级颜色映射 if rainfall < 0.1: - return "rgba(200,200,200,0)" # 透明 - 无雨 - elif rainfall < 10: - return "rgba(173,216,230,0.5)" # 浅蓝 - 小雨 - elif rainfall < 25: - return "rgba(100,149,237,0.6)" # 矢车菊蓝 - 中雨 - elif rainfall < 50: - return "rgba(30,144,255,0.7)" # 道奇蓝 - 大雨 - elif rainfall < 100: - return "rgba(0,0,205,0.8)" # 中蓝 - 暴雨 + return "rgba(200,200,200,0)" # 透明 - 微量降雨(零星小雨) + elif rainfall < 10 if duration == 12 else 9.9: + return "rgba(0,0,255,0.4)" # 浅蓝 - 小雨 + elif rainfall < 15 if duration == 12 else 25: + return "rgba(0,255,255,0.5)" # 青色 - 中雨 + elif rainfall < 30 if duration == 12 else 50: + return "rgba(0,255,0,0.6)" # 绿色 - 大雨 + elif rainfall < 70 if duration == 12 else 100: + return "rgba(255,255,0,0.7)" # 黄色 - 暴雨 + elif rainfall < 140 if duration == 12 else 250: + return "rgba(255,165,0,0.8)" # 橙色 - 大暴雨 else: - return "rgba(0,0,139,0.9)" # 深蓝 - 大暴雨 + return "rgba(255,0,0,0.9)" # 红色 - 特大暴雨 class GeoJSONService: @@ -145,7 +327,8 @@ class GeoJSONService: grid_metadata: Dict[str, Any], rainfall_array: np.ndarray, grid_lon: np.ndarray, - grid_lat: np.ndarray + grid_lat: np.ndarray, + duration: int = 12 ) -> Dict[str, Any]: """ 创建GeoJSON FeatureCollection用于Cesium渲染 @@ -155,6 +338,7 @@ class GeoJSONService: rainfall_array: 降雨量数组 grid_lon: 经度网格 grid_lat: 纬度网格 + duration: 持续时间(12或24小时) Returns: GeoJSON格式的FeatureCollection @@ -190,7 +374,8 @@ class GeoJSONService: }, "properties": { "rainfall": round(rainfall_value, 2), - "color": InterpolationService.get_rainfall_color(rainfall_value) + "level": RainfallService._get_rainfall_level(rainfall_value, duration), + "color": InterpolationService.get_rainfall_color(rainfall_value, duration) } } features.append(feature) @@ -221,38 +406,42 @@ class RainfallService: def get_stations_data( self, - query_time: datetime + query_time: datetime, + duration: int = 12 ) -> List[Dict[str, Any]]: """ 获取站点降雨数据 Args: - query_time: 查询时间(自动查询前12小时数据) + query_time: 查询时间(自动查询前12小时或24小时数据) + duration: 持续时间(12或24小时) Returns: 站点数据列表 """ - return self.repository.query_stations_rainfall(query_time) + return self.repository.query_stations_rainfall(query_time, duration) def generate_rainfall_grid( self, query_time: datetime, - resolution: float = 0.01 + resolution: float = 0.01, + duration: int = 12 ) -> Dict[str, Any]: """ 生成降雨栅格数据 Args: - query_time: 查询时间(自动查询前12小时数据) + query_time: 查询时间(自动查询前12小时或24小时数据) resolution: 栅格分辨率 + duration: 持续时间(12或24小时) Returns: GeoJSON格式的栅格数据 """ - logger.info(f"查询降雨数据: {query_time}") + logger.info(f"查询降雨数据: {query_time}, 持续时间: {duration}小时") - # 查询站点数据(自动查询前12小时数据) - stations_data = self.get_stations_data(query_time) + # 查询站点数据(自动查询前12小时或24小时数据) + stations_data = self.get_stations_data(query_time, duration) if not stations_data: return None @@ -267,9 +456,9 @@ class RainfallService: points = [(row['lon'], row['lat']) for row in valid_stations] values = [float(row['rainfall']) for row in valid_stations] - # 确定栅格范围(西安大致范围) - lon_min, lon_max = 107.5, 109.5 - lat_min, lat_max = 33.5, 34.5 + # 确定栅格范围 + lon_min, lon_max = 107, 110 + lat_min, lat_max = 33, 35 # 创建栅格网格 num_lon = int((lon_max - lon_min) / resolution) + 1 @@ -280,14 +469,17 @@ class RainfallService: logger.info(f"生成栅格: {num_lon}x{num_lat}, 分辨率: {resolution}") - # 执行IDW插值(带凸包裁剪和距离阈值) + # 执行IDW插值(优化版本:自适应距离、混合权重、平滑处理) rainfall_grid = self.interpolation_service.inverse_distance_weighting( points=points, values=values, grid_lon=grid_lon, grid_lat=grid_lat, power=2.0, - max_distance=0.3 # 最大影响距离0.3度(约30公里) + max_distance=0.35, # 最大影响距离0.35度(约35公里) + use_adaptive_distance=True, # 启用自适应距离 + apply_smoothing=True, # 启用平滑处理 + smoothing_sigma=1.2 # 平滑强度 ) # 创建栅格元数据 @@ -300,7 +492,7 @@ class RainfallService: # 转换为GeoJSON格式 geojson_data = self.geojson_service.create_feature_collection( - grid_metadata, rainfall_grid, grid_lon, grid_lat + grid_metadata, rainfall_grid, grid_lon, grid_lat, duration ) logger.info("降雨栅格数据生成成功") @@ -311,7 +503,8 @@ class RainfallService: self, longitude: float, latitude: float, - query_time: datetime + query_time: datetime, + duration: int = 12 ) -> Optional[Dict[str, Any]]: """ 查询指定点位的降雨量(使用IDW插值) @@ -319,13 +512,14 @@ class RainfallService: Args: longitude: 经度 latitude: 纬度 - query_time: 查询时间(自动查询前12小时数据) + query_time: 查询时间(自动查询前12小时或24小时数据) + duration: 持续时间(12或24小时) Returns: 点位降雨量信息 """ - # 获取站点数据(自动查询前12小时数据) - stations_data = self.get_stations_data(query_time) + # 获取站点数据(自动查询前12小时或24小时数据) + stations_data = self.get_stations_data(query_time, duration) if not stations_data: return None @@ -358,24 +552,54 @@ class RainfallService: "longitude": longitude, "latitude": latitude, "rainfall": round(float(rainfall_value), 2), - "level": self._get_rainfall_level(rainfall_value), - "color": InterpolationService.get_rainfall_color(rainfall_value), + "level": self._get_rainfall_level(rainfall_value, duration), + "color": InterpolationService.get_rainfall_color(rainfall_value, duration), "station_count": len(stations_data), - "query_time": query_time.isoformat() + "query_time": query_time.isoformat(), + "duration": duration } @staticmethod - def _get_rainfall_level(rainfall: float) -> str: - """获取降雨等级""" - if rainfall < 0.1: - return "无雨" - elif rainfall < 10: - return "小雨" - elif rainfall < 25: - return "中雨" - elif rainfall < 50: - return "大雨" - elif rainfall < 100: - return "暴雨" - else: - return "大暴雨" + def _get_rainfall_level(rainfall: float, duration: int = 12) -> str: + """ + 获取降雨等级(按照国标) + + Args: + rainfall: 降雨量(mm) + duration: 持续时间(12或24小时) + + Returns: + 降雨等级字符串 + """ + if duration == 12: + # 12小时降雨等级标准 + if rainfall < 0.1: + return "微量降雨" + elif rainfall < 5.0: + return "小雨" + elif rainfall < 15.0: + return "中雨" + elif rainfall < 30.0: + return "大雨" + elif rainfall < 70.0: + return "暴雨" + elif rainfall < 140.0: + return "大暴雨" + else: + return "特大暴雨" + else: # 24小时 + # 24小时降雨等级标准 + if rainfall < 0.1: + return "微量降雨" + elif rainfall < 10.0: + return "小雨" + elif rainfall < 25.0: + return "中雨" + elif rainfall < 50.0: + return "大雨" + elif rainfall < 100.0: + return "暴雨" + elif rainfall < 250.0: + return "大暴雨" + else: + return "特大暴雨"