Files
xian_algorithm_new/app/services/rainfall_service.py
T
2026-05-06 13:07:58 +08:00

606 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
降雨数据Service - 业务逻辑层
"""
import numpy as np
from scipy.spatial import Delaunay, ConvexHull
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime
from app.repositories.rainfall_repository import RainfallRepository
from app.utils.logger import setup_logging
logger = setup_logging()
class InterpolationService:
"""插值服务类"""
@staticmethod
def _create_buffer_points(
points_array: np.ndarray
) -> np.ndarray:
"""
创建缓冲点:在原始站点外围生成虚拟点以扩展插值区域
Args:
points_array: 原始站点坐标数组
Returns:
缓冲点坐标数组
"""
# 计算站点分布的中心
center = np.mean(points_array, axis=0)
# 计算站点到中心的最大距离
distances_from_center = np.sqrt(np.sum((points_array - center) ** 2, axis=1))
np.max(distances_from_center)
# 在站点外围生成缓冲点(沿着各个方向扩展)
buffer_points = []
num_angles = 360 # 每隔1度生成一个缓冲点
for angle_deg in range(0, 360, 360 // num_angles):
angle_rad = np.radians(angle_deg)
# 在凸包边界外扩展
for scale in [1.05, 1.1, 1.15]:
# 找到该方向上最远的站点
direction = np.array([np.cos(angle_rad), np.sin(angle_rad)])
projections = points_array @ direction
max_idx = np.argmax(projections)
# 在该方向上扩展
base_point = points_array[max_idx]
buffer_point = center + (base_point - center) * scale
buffer_points.append(buffer_point)
return np.array(buffer_points)
@staticmethod
def gaussian_smoothing(
grid_data: np.ndarray,
sigma: float = 1.5
) -> np.ndarray:
"""
高斯平滑滤波,减少边缘突变
Args:
grid_data: 栅格数据
sigma: 高斯核标准差
Returns:
平滑后的栅格数据
"""
from scipy.ndimage import gaussian_filter
# 只对有效数据进行平滑
valid_mask = ~np.isnan(grid_data)
if not np.any(valid_mask):
return grid_data
# 填充NaN值以便平滑
filled_data = grid_data.copy()
mean_val = np.nanmean(grid_data)
filled_data[~valid_mask] = mean_val
# 应用高斯滤波
smoothed = gaussian_filter(filled_data, sigma=sigma)
# 恢复原始NaN区域
result = np.where(valid_mask, smoothed, np.nan)
return result
@staticmethod
def calculate_adaptive_max_distance(
points_array: np.ndarray,
base_distance: float = 0.3,
min_distance: float = 0.15,
max_distance: float = 0.5
) -> float:
"""
根据站点密度自适应计算最大影响距离
Args:
points_array: 站点坐标数组
base_distance: 基础距离
min_distance: 最小距离
max_distance: 最大距离
Returns:
自适应的最大影响距离
"""
if len(points_array) < 3:
return base_distance
# 计算站点间的平均距离
from scipy.spatial import distance_matrix
dist_matrix = distance_matrix(points_array, points_array)
# 排除对角线(自身距离为0
np.fill_diagonal(dist_matrix, np.inf)
avg_distance = np.mean(np.min(dist_matrix, axis=1))
# 根据平均距离调整max_distance
adaptive_distance = avg_distance * 3 # 约3倍平均站点间距
# 限制在合理范围内
return np.clip(adaptive_distance, min_distance, max_distance)
@staticmethod
def inverse_distance_weighting(
points: List[Tuple[float, float]],
values: List[float],
grid_lon: np.ndarray,
grid_lat: np.ndarray,
power: float = 2.0,
max_distance: float = None,
use_adaptive_distance: bool = True,
apply_smoothing: bool = True,
smoothing_sigma: float = 1.0
) -> np.ndarray:
"""
反距离权重插值 (IDW) - 优化版本
改进:
1. 高斯核衰减替代简单幂律
2. 自适应距离阈值
3. 边缘渐变处理
4. 高斯平滑减少突变
Args:
points: 已知点坐标 [(lon, lat), ...]
values: 已知点的值 [rainfall, ...]
grid_lon: 网格经度数组
grid_lat: 网格纬度数组
power: 距离幂次(基础值)
max_distance: 最大影响距离(度),None则自适应计算
use_adaptive_distance: 是否使用自适应距离
apply_smoothing: 是否应用平滑
smoothing_sigma: 平滑强度
Returns:
插值后的栅格数据,无效区域为 NaN
"""
points_array = np.array(points)
values_array = np.array(values)
# 创建网格
lon_grid, lat_grid = np.meshgrid(grid_lon, grid_lat)
result = np.full_like(lon_grid, np.nan)
# 自适应计算最大距离
if use_adaptive_distance or max_distance is None:
actual_max_distance = InterpolationService.calculate_adaptive_max_distance(
points_array
)
if max_distance is not None:
actual_max_distance = min(actual_max_distance, max_distance)
else:
actual_max_distance = max_distance
logger.info(f"使用最大影响距离: {actual_max_distance:.3f}")
# 计算站点的凸包(带边缘缓冲)
hull_mask = None
confidence_mask = None # 置信度掩码
if len(points_array) >= 3:
try:
# 创建缓冲站点:在原始站点外围添加虚拟点
buffer_points = InterpolationService._create_buffer_points(
points_array
)
# 合并原始站点和缓冲站点
all_points = np.vstack([points_array, buffer_points])
# 计算凸包
hull = ConvexHull(all_points)
hull_points = all_points[hull.vertices]
tri = Delaunay(hull_points)
# 向量化判断所有网格点是否在凸包内
grid_points = np.column_stack([lon_grid.ravel(), lat_grid.ravel()])
hull_indices = tri.find_simplex(grid_points)
hull_mask = hull_indices >= 0
hull_mask = hull_mask.reshape(lon_grid.shape)
# 计算置信度:基于到最近站点的距离
# 在凸包内但远离站点的区域降低置信度
from scipy.spatial import distance_matrix
grid_valid = grid_points[hull_mask.ravel()]
if len(grid_valid) > 0:
dist_to_stations = distance_matrix(grid_valid, points_array)
min_distances = np.min(dist_to_stations, axis=1)
# 创建置信度掩码(距离越远,置信度越低)
confidence = np.ones(len(grid_points))
confidence[hull_mask.ravel()] = np.exp(-min_distances / actual_max_distance)
confidence_mask = confidence.reshape(lon_grid.shape)
else:
confidence_mask = np.ones_like(lon_grid)
except Exception as e:
logger.warning(f"凸包计算失败: {e},使用全区域插值")
hull_mask = np.ones_like(lon_grid, dtype=bool)
confidence_mask = np.ones_like(lon_grid)
else:
hull_mask = np.ones_like(lon_grid, dtype=bool)
confidence_mask = np.ones_like(lon_grid)
# 向量化计算所有网格点到所有站点的距离
lon_diff = lon_grid[:, :, np.newaxis] - points_array[np.newaxis, np.newaxis, :, 0]
lat_diff = lat_grid[:, :, np.newaxis] - points_array[np.newaxis, np.newaxis, :, 1]
distances = np.sqrt(lon_diff**2 + lat_diff**2)
# 过滤超出最大距离的站点
valid_mask = distances <= actual_max_distance
# 对于每个网格点,检查是否有有效站点
has_valid_stations = np.any(valid_mask, axis=2)
# 合并凸包掩码和有效站点掩码
final_mask = hull_mask & has_valid_stations
# 避免除零
distances = np.where(valid_mask, distances, np.inf)
distances = np.maximum(distances, 1e-10)
# 优化的权重计算:结合幂律和高斯衰减
# 近处使用幂律,远处使用高斯衰减使过渡更平滑
power_weights = 1.0 / (distances ** power)
gaussian_weights = np.exp(-0.5 * (distances / (actual_max_distance * 0.5)) ** 2)
# 混合权重:距离越远,高斯权重占比越大
distance_ratio = distances / actual_max_distance
mix_factor = np.clip(distance_ratio, 0, 1)
weights = (1 - mix_factor) * power_weights + mix_factor * gaussian_weights
weights = np.where(valid_mask, weights, 0)
# 加权平均
weighted_sum = np.sum(weights * values_array[np.newaxis, np.newaxis, :], axis=2)
weight_total = np.sum(weights, axis=2)
# 计算基础插值结果(使用 errstate 忽略预期的除零警告,np.where 已安全过滤)
with np.errstate(divide='ignore', invalid='ignore'):
result = np.where(
final_mask & (weight_total > 0),
weighted_sum / weight_total,
np.nan
)
# 应用置信度调整:边缘区域向邻近值渐变
if confidence_mask is not None:
# 计算全局平均降雨量作为边缘区域的基准
valid_rainfall = result[final_mask]
if len(valid_rainfall) > 0:
mean_rainfall = np.mean(valid_rainfall)
# 边缘区域向平均值渐变
result = np.where(
final_mask,
result,
np.nan
)
# 根据置信度调整结果,低置信度区域向均值靠拢
adjusted_result = result * confidence_mask + mean_rainfall * (1 - confidence_mask)
result = np.where(final_mask, adjusted_result, np.nan)
# 应用高斯平滑减少边缘突变
if apply_smoothing:
result = InterpolationService.gaussian_smoothing(result, sigma=smoothing_sigma)
return result
@staticmethod
def get_rainfall_color(rainfall: float, duration: int = 12) -> str:
"""
根据降雨量获取颜色(按照国标)
Args:
rainfall: 降雨量(mm)
duration: 持续时间(12或24小时)
Returns:
颜色字符串 "rgba(r,g,b,a)"
"""
# 国标降雨等级颜色映射
if rainfall < 0.1:
return "rgba(200,200,200,0)" # 透明 - 微量降雨(零星小雨)
elif rainfall < 10 if duration == 12 else 9.9:
return "rgba(0,0,255,0.4)" # 浅蓝 - 小雨
elif rainfall < 15 if duration == 12 else 25:
return "rgba(0,255,255,0.5)" # 青色 - 中雨
elif rainfall < 30 if duration == 12 else 50:
return "rgba(0,255,0,0.6)" # 绿色 - 大雨
elif rainfall < 70 if duration == 12 else 100:
return "rgba(255,255,0,0.7)" # 黄色 - 暴雨
elif rainfall < 140 if duration == 12 else 250:
return "rgba(255,165,0,0.8)" # 橙色 - 大暴雨
else:
return "rgba(255,0,0,0.9)" # 红色 - 特大暴雨
class GeoJSONService:
"""GeoJSON生成服务"""
@staticmethod
def create_feature_collection(
grid_metadata: Dict[str, Any],
rainfall_array: np.ndarray,
grid_lon: np.ndarray,
grid_lat: np.ndarray,
duration: int = 12
) -> Dict[str, Any]:
"""
创建GeoJSON FeatureCollection用于Cesium渲染
Args:
grid_metadata: 栅格元数据
rainfall_array: 降雨量数组
grid_lon: 经度网格
grid_lat: 纬度网格
duration: 持续时间(12或24小时)
Returns:
GeoJSON格式的FeatureCollection
"""
features = []
# 将栅格数据转换为矩形要素
for i in range(len(grid_lat) - 1):
for j in range(len(grid_lon) - 1):
rainfall_value = float(rainfall_array[i, j])
# 跳过无数据的区域
if np.isnan(rainfall_value) or rainfall_value < 0:
continue
# 创建矩形多边形
lon_min = float(grid_lon[j])
lon_max = float(grid_lon[j + 1])
lat_min = float(grid_lat[i])
lat_max = float(grid_lat[i + 1])
feature = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[lon_min, lat_min],
[lon_max, lat_min],
[lon_max, lat_max],
[lon_min, lat_max],
[lon_min, lat_min]
]]
},
"properties": {
"rainfall": round(rainfall_value, 2),
"level": RainfallService._get_rainfall_level(rainfall_value, duration),
"color": InterpolationService.get_rainfall_color(rainfall_value, duration)
}
}
features.append(feature)
return {
"type": "FeatureCollection",
"features": features,
"metadata": {
"resolution": grid_metadata['resolution'],
"grid_size": [len(grid_lon), len(grid_lat)],
"bounds": {
"min_lon": float(grid_lon.min()),
"max_lon": float(grid_lon.max()),
"min_lat": float(grid_lat.min()),
"max_lat": float(grid_lat.max())
}
}
}
class RainfallService:
"""降雨数据业务服务类"""
def __init__(self):
self.repository = RainfallRepository()
self.interpolation_service = InterpolationService()
self.geojson_service = GeoJSONService()
def get_stations_data(
self,
query_time: datetime,
duration: int = 12
) -> List[Dict[str, Any]]:
"""
获取站点降雨数据
Args:
query_time: 查询时间(自动查询前12小时或24小时数据)
duration: 持续时间(12或24小时)
Returns:
站点数据列表
"""
return self.repository.query_stations_rainfall(query_time, duration)
def generate_rainfall_grid(
self,
query_time: datetime,
resolution: float = 0.01,
duration: int = 12
) -> Dict[str, Any]:
"""
生成降雨栅格数据
Args:
query_time: 查询时间(自动查询前12小时或24小时数据)
resolution: 栅格分辨率
duration: 持续时间(12或24小时)
Returns:
GeoJSON格式的栅格数据
"""
logger.info(f"查询降雨数据: {query_time}, 持续时间: {duration}小时")
# 查询站点数据(自动查询前12小时或24小时数据)
stations_data = self.get_stations_data(query_time, duration)
if not stations_data:
return None
# 提取站点坐标和降雨量(过滤空值)
valid_stations = [row for row in stations_data if row['rainfall'] is not None]
if not valid_stations:
logger.warning("所有站点的降雨量数据均为空")
return None
points = [(row['lon'], row['lat']) for row in valid_stations]
values = [float(row['rainfall']) for row in valid_stations]
# 确定栅格范围
lon_min, lon_max = 107, 110
lat_min, lat_max = 33, 35
# 创建栅格网格
num_lon = int((lon_max - lon_min) / resolution) + 1
num_lat = int((lat_max - lat_min) / resolution) + 1
grid_lon = np.linspace(lon_min, lon_max, num_lon)
grid_lat = np.linspace(lat_min, lat_max, num_lat)
logger.info(f"生成栅格: {num_lon}x{num_lat}, 分辨率: {resolution}")
# 执行IDW插值(优化版本:自适应距离、混合权重、平滑处理)
rainfall_grid = self.interpolation_service.inverse_distance_weighting(
points=points,
values=values,
grid_lon=grid_lon,
grid_lat=grid_lat,
power=2.0,
max_distance=0.35, # 最大影响距离0.35度(约35公里)
use_adaptive_distance=True, # 启用自适应距离
apply_smoothing=True, # 启用平滑处理
smoothing_sigma=1.2 # 平滑强度
)
# 创建栅格元数据
grid_metadata = {
"query_time": query_time.isoformat(),
"resolution": resolution,
"station_count": len(stations_data),
"grid_size": [num_lon, num_lat]
}
# 转换为GeoJSON格式
geojson_data = self.geojson_service.create_feature_collection(
grid_metadata, rainfall_grid, grid_lon, grid_lat, duration
)
logger.info("降雨栅格数据生成成功")
return geojson_data
def get_rainfall_at_point(
self,
longitude: float,
latitude: float,
query_time: datetime,
duration: int = 12
) -> Optional[Dict[str, Any]]:
"""
查询指定点位的降雨量(使用IDW插值)
Args:
longitude: 经度
latitude: 纬度
query_time: 查询时间(自动查询前12小时或24小时数据)
duration: 持续时间(12或24小时)
Returns:
点位降雨量信息
"""
# 获取站点数据(自动查询前12小时或24小时数据)
stations_data = self.get_stations_data(query_time, duration)
if not stations_data:
return None
# 提取站点坐标和降雨量
points = [(row['lon'], row['lat']) for row in stations_data]
values = [float(row['rainfall']) for row in stations_data]
# 使用IDW插值计算该点的降雨量
target_point = np.array([[longitude, latitude]])
points_array = np.array(points)
# 计算距离
distances = np.sqrt(
(points_array[:, 0] - longitude) ** 2 +
(points_array[:, 1] - latitude) ** 2
)
# 避免除零
min_dist = 1e-10
distances = np.maximum(distances, min_dist)
# IDW公式
power = 2.0
weights = 1.0 / (distances ** power)
rainfall_value = np.sum(weights * values) / np.sum(weights)
# 返回结果
return {
"longitude": longitude,
"latitude": latitude,
"rainfall": round(float(rainfall_value), 2),
"level": self._get_rainfall_level(rainfall_value, duration),
"color": InterpolationService.get_rainfall_color(rainfall_value, duration),
"station_count": len(stations_data),
"query_time": query_time.isoformat(),
"duration": duration
}
@staticmethod
def _get_rainfall_level(rainfall: float, duration: int = 12) -> str:
"""
获取降雨等级(按照国标)
Args:
rainfall: 降雨量(mm)
duration: 持续时间(12或24小时)
Returns:
降雨等级字符串
"""
if duration == 12:
# 12小时降雨等级标准
if rainfall < 0.1:
return "微量降雨"
elif rainfall < 5.0:
return "小雨"
elif rainfall < 15.0:
return "中雨"
elif rainfall < 30.0:
return "大雨"
elif rainfall < 70.0:
return "暴雨"
elif rainfall < 140.0:
return "大暴雨"
else:
return "特大暴雨"
else: # 24小时
# 24小时降雨等级标准
if rainfall < 0.1:
return "微量降雨"
elif rainfall < 10.0:
return "小雨"
elif rainfall < 25.0:
return "中雨"
elif rainfall < 50.0:
return "大雨"
elif rainfall < 100.0:
return "暴雨"
elif rainfall < 250.0:
return "大暴雨"
else:
return "特大暴雨"