进行路径计算

This commit is contained in:
wzy-warehouse
2026-06-29 11:37:04 +08:00
parent 1f01ceb062
commit e9169dfd13
6 changed files with 745 additions and 0 deletions
+3
View File
@@ -0,0 +1,3 @@
from app.models.hazard.hazard_zone_calculator import HazardZoneCalculator, hazard_zone_calculator
__all__ = ['HazardZoneCalculator', 'hazard_zone_calculator']
+413
View File
@@ -0,0 +1,413 @@
"""
灾害影响范围计算器
根据灾害类型和静态因子计算隐患点的影响范围(zone)和影响路径(path)
论文支撑:
- 滑坡: He et al. (2023) Landslides — 经验到达角法
- 泥石流: Baggio et al. (2021) NHESS — 河流关联缓冲区
- 山洪: Costache et al. (2022) STOTEN — 多级缓冲区
- 内涝: Wang et al. (2022) Water Res. Mgmt — TWI简化法
- 崩塌: Guerin et al. (2022) Eng. Geology — 锥体传播模型
"""
import math
import os
from typing import Optional, Tuple, Dict, Any
import yaml
from app.config.paths import CONFIG_DIR
# 配置文件路径
_PARAMS_PATH = os.path.join(CONFIG_DIR, 'hazard', 'hazard_zone_params.yaml')
# 地球半径 (WGS84)
_EARTH_RADIUS_M = 6371000.0
def _load_params() -> Dict[str, Any]:
"""加载灾害影响范围计算参数"""
with open(_PARAMS_PATH, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _to_wkt_polygon(coords: list) -> str:
"""
将坐标列表转换为 WKT POLYGON 字符串
Args:
coords: [(lon1, lat1), (lon2, lat2), ...] 首尾需闭合
"""
points = ', '.join([f'{c[0]:.8f} {c[1]:.8f}' for c in coords])
return f'POLYGON(({points}))'
def _to_wkt_linestring(coords: list) -> str:
"""将坐标列表转换为 WKT LINESTRING 字符串"""
points = ', '.join([f'{c[0]:.8f} {c[1]:.8f}' for c in coords])
return f'LINESTRING({points})'
def _offset_to_lonlat(lon: float, lat: float, dx_m: float, dy_m: float) -> Tuple[float, float]:
"""
将米级偏移转换回经纬度
Args:
lon, lat: 中心点经纬度
dx_m: 东向偏移(m)
dy_m: 北向偏移(m)
Returns:
(new_lon, new_lat)
"""
lat_rad = math.radians(lat)
dlon = math.degrees(dx_m / (_EARTH_RADIUS_M * math.cos(lat_rad)))
dlat = math.degrees(dy_m / _EARTH_RADIUS_M)
return lon + dlon, lat + dlat
def _make_fan_wkt(lon: float, lat: float, direction_deg: float,
spread_deg: float, radius_m: float,
segments: int = 24) -> str:
"""
创建扇形/锥形影响范围 WKT
Args:
lon, lat: 隐患点坐标
direction_deg: 扇形主方向(坡向, 0=北, 90=东)
spread_deg: 扇形总开角(度)
radius_m: 半径(m)
segments: 弧线段数
Returns:
POLYGON WKT 字符串
"""
half_spread = spread_deg / 2.0
coords = [(lon, lat)] # 中心点
for i in range(segments + 1):
angle_deg = direction_deg - half_spread + (spread_deg * i / segments)
angle_rad = math.radians(angle_deg)
dx = radius_m * math.sin(angle_rad)
dy = radius_m * math.cos(angle_rad)
coords.append(_offset_to_lonlat(lon, lat, dx, dy))
coords.append((lon, lat)) # 闭合回中心
return _to_wkt_polygon(coords)
def _make_circle_wkt(lon: float, lat: float, radius_m: float,
segments: int = 48) -> str:
"""
创建圆形缓冲区 WKT(用正多边形近似圆)
Args:
lon, lat: 中心点
radius_m: 半径(m)
segments: 边数
"""
coords = []
for i in range(segments):
angle_rad = 2 * math.pi * i / segments
dx = radius_m * math.sin(angle_rad)
dy = radius_m * math.cos(angle_rad)
coords.append(_offset_to_lonlat(lon, lat, dx, dy))
coords.append(coords[0]) # 闭合
return _to_wkt_polygon(coords)
class HazardZoneCalculator:
"""
灾害影响范围计算器
根据隐患点的灾害类型和静态因子,计算影响范围(WKT多边形)和影响路径(WKT线)。
每种灾害类型使用不同的经验公式,参数从 hazard_zone_params.yaml 读取。
"""
def __init__(self):
self.params = _load_params()
# ==================== 公开接口 ====================
def calculate(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
lon: float, lat: float,
static_factors: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
"""
计算单个隐患点的影响范围
Args:
source_id: 隐患点ID
disaster_type: 灾害类型 (滑坡/泥石流/山洪/内涝/崩塌)
scale_grade: 规模等级 (小型/中型/大型)
lon: 经度
lat: 纬度
static_factors: 静态因子字典,从 xian_risk_factors.static_factors 获取
Returns:
(zone_wkt, path_wkt) — zone为POLYGON WKT, path为LINESTRING WKT或None
"""
# 选择对应灾害类型的计算方法
method_map = {
'滑坡': self._calc_landslide,
'泥石流': self._calc_debris_flow,
'山洪': self._calc_flash_flood,
'内涝': self._calc_waterlogging,
'崩塌': self._calc_rockfall,
}
method = method_map.get(disaster_type, self._calc_default)
return method(source_id, disaster_type, scale_grade, static_factors, lon, lat)
# ==================== 滑坡 ====================
def _calc_landslide(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""滑坡: 经验到达角法 + 扇形展开"""
p = self.params['landslide']
dem = float(factors.get('dem_value', 0))
aspect = float(factors.get('aspect_value', 0))
river_dist = float(factors.get('river_distance', 9999))
fault_dist = float(factors.get('fault_distance', 9999))
# 到达角
angle_key = scale_grade if scale_grade in p['reach_angle'] else '中型'
reach_angle = p['reach_angle'][angle_key]
# 半径: H / tan(到达角)
H = p['height_drop_m']
radius = H / math.tan(math.radians(reach_angle))
# 修正
if river_dist < p['river_distance_threshold']:
radius *= p['river_erosion_enhance']
if fault_dist < p['fault_distance_threshold']:
radius *= p['fault_enhance']
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
# 扇形 — 沿坡向展开
if aspect and aspect > 0:
zone_wkt = _make_fan_wkt(lon, lat, aspect, p['fan_angle'], radius)
# 滑坡路径: 从隐患点沿坡向延伸到堆积区边缘
end_lon, end_lat = _offset_to_lonlat(
lon, lat,
radius * math.sin(math.radians(aspect)),
radius * math.cos(math.radians(aspect))
)
path_wkt = _to_wkt_linestring([(lon, lat), (end_lon, end_lat)])
else:
zone_wkt = _make_circle_wkt(lon, lat, radius)
path_wkt = None
return zone_wkt, path_wkt
# ==================== 泥石流 ====================
def _calc_debris_flow(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""
泥石流: 沿最近河流方向建立影响区
path_geom 为隐患点到最近河流的连线,代表潜在流动路径
"""
p = self.params['debris_flow']
slope = float(factors.get('slope_value', 0))
radius = p['base_radius_m'] + slope * p['slope_factor']
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
zone_wkt = _make_circle_wkt(lon, lat, radius)
# 查询最近河流段 + 构建完整流动路径
river_geom_wkt = self._get_nearest_river_geom(lon, lat, p['max_river_search_m'])
if river_geom_wkt:
# 取河流上最近点
river_point = self._get_nearest_river_point(lon, lat, p['max_river_search_m'])
if river_point:
path_wkt = _to_wkt_linestring([
(lon, lat),
(river_point[0], river_point[1])
])
else:
path_wkt = river_geom_wkt # 降级:直接展示河流段
else:
path_wkt = None
return zone_wkt, path_wkt
# ==================== 山洪 ====================
def _calc_flash_flood(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""
山洪: 根据距河流距离 + 河流等级确定缓冲区
path_geom 为关联的河流段
"""
p = self.params['flash_flood']
river_dist = float(factors.get('river_distance', 9999))
thresholds = p['river_dist_thresholds']
buffer_map = p['buffer_by_level']
# 按距离选缓冲半径
if river_dist < thresholds[0]:
radius = buffer_map[1]
elif river_dist < thresholds[1]:
radius = buffer_map[2]
elif river_dist < thresholds[2]:
radius = buffer_map[3]
else:
radius = buffer_map.get(4, 80)
zone_wkt = _make_circle_wkt(lon, lat, radius)
# 山洪路径: 关联的最近河流段(淹没路径沿河展开)
path_wkt = self._get_nearest_river_geom(lon, lat, p['max_river_search_m'])
return zone_wkt, path_wkt
# ==================== 内涝 ====================
def _calc_waterlogging(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""内涝: TWI简化 + 不透水率 + 管网修正"""
p = self.params['waterlogging']
slope = float(factors.get('slope_value', 1))
impervious = float(factors.get('impervious_surface', 0))
pipe_density = float(factors.get('pipe_density', 0))
slope_rad = max(math.radians(slope), 0.001)
# TWI_proxy = ln(1 / tanβ)
twi = math.log(1.0 / math.tan(slope_rad))
radius = (p['base_radius_m'] * twi
* (1.0 + impervious * p['impervious_factor'])
* max(p['pipe_lower_bound'], 1.0 - pipe_density * p['pipe_density_factor']))
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
zone_wkt = _make_circle_wkt(lon, lat, radius)
return zone_wkt, None
# ==================== 崩塌 ====================
def _calc_rockfall(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""崩塌: 锥体传播模型 (CONEFALL简化)"""
p = self.params['rockfall']
aspect = float(factors.get('aspect_value', 0))
fault_dist = float(factors.get('fault_distance', 9999))
# 到达角
angle_key = scale_grade if scale_grade in p['reach_angle'] else '中型'
reach_angle = p['reach_angle'][angle_key]
# 断裂带惩罚:减少到达角 = 更平缓 = 影响更大
if fault_dist < p['fault_penalty_threshold']:
reach_angle -= p['fault_angle_penalty']
# 半径
H = p['height_drop_m']
radius = H / math.tan(math.radians(max(reach_angle, 10)))
# 断裂带增强
if fault_dist < p['fault_enhance_threshold']:
radius *= p['fault_radius_enhance']
radius = max(p['min_radius_m'], min(radius, p['max_radius_m']))
# 锥形 — 沿坡向,窄扇形
if aspect and aspect > 0:
zone_wkt = _make_fan_wkt(lon, lat, aspect, 45, radius)
else:
zone_wkt = _make_circle_wkt(lon, lat, radius)
return zone_wkt, None
# ==================== 默认 ====================
def _calc_default(self, source_id: int, disaster_type: str,
scale_grade: Optional[str],
factors: Dict[str, Any],
lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]:
"""未匹配灾害类型的兜底:固定半径圆形"""
p = self.params['default']
zone_wkt = _make_circle_wkt(lon, lat, p['radius_m'])
return zone_wkt, None
# ==================== 数据库查询辅助 ====================
@staticmethod
def _get_nearest_river_point(lon: float, lat: float,
max_dist_m: float) -> Optional[Tuple[float, float]]:
"""
查询最近河流上距离隐患点最近的点的坐标
"""
from app.utils.db_helper import db_helper
sql = """
SELECT ST_X(ST_ClosestPoint(r.geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))) AS rx,
ST_Y(ST_ClosestPoint(r.geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))) AS ry
FROM xian_rivers r
WHERE r.is_delete = 0
AND ST_DWithin(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography,
%s)
ORDER BY ST_Distance(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography)
LIMIT 1
"""
row = db_helper.execute_query_one(
sql, (lon, lat, lon, lat, lon, lat, max_dist_m, lon, lat)
)
if row and row['rx'] and row['ry']:
return float(row['rx']), float(row['ry'])
return None
@staticmethod
def _get_nearest_river_geom(lon: float, lat: float,
max_dist_m: float) -> Optional[str]:
"""
查询最近河流段的 WKT
Returns:
LINESTRING WKT 字符串或 None
"""
from app.utils.db_helper import db_helper
sql = """
SELECT ST_AsText(r.geom) AS wkt
FROM xian_rivers r
WHERE r.is_delete = 0
AND ST_DWithin(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography,
%s)
ORDER BY ST_Distance(r.geom::geography,
ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography)
LIMIT 1
"""
row = db_helper.execute_query_one(sql, (lon, lat, max_dist_m, lon, lat))
if row and row['wkt']:
return row['wkt']
return None
# 全局实例
hazard_zone_calculator = HazardZoneCalculator()