diff --git a/app/config/hazard/__init__.py b/app/config/hazard/__init__.py new file mode 100644 index 0000000..4f96d12 --- /dev/null +++ b/app/config/hazard/__init__.py @@ -0,0 +1 @@ +# hazard 配置模块 diff --git a/app/config/hazard/hazard_zone_params.yaml b/app/config/hazard/hazard_zone_params.yaml new file mode 100644 index 0000000..c9ac5c7 --- /dev/null +++ b/app/config/hazard/hazard_zone_params.yaml @@ -0,0 +1,93 @@ +# 灾害影响范围计算参数配置 +# 2026-06-29 初始版本 +# +# 五种灾害类型各有独立参数,未匹配到灾害类型时使用 default +# 所有半径单位为米(m) +# +# 论文支撑: +# - 滑坡/崩塌: He et al. (2023) Landslides, Corominas (1996) CGJ +# - 泥石流: Baggio et al. (2021) NHESS, Cicoira et al. (2022) ESurf +# - 山洪: Costache et al. (2022) STOTEN, Zhao et al. (2023) J.Hydrology +# - 内涝: Wang et al. (2022) Water Res. Mgmt, Jamali et al. (2020) Water + +# ============================================ +# 滑坡 — 经验到达角法 + 地形修正 +# ============================================ + +landslide: + height_drop_m: 50 # 假设崩滑高差(m) + reach_angle: # 到达角经验值(度) — He et al.(2023) 表2 + 小型: 31 + 中型: 28 + 大型: 25 + 特大型: 23 + fan_angle: 45 # 侧向扇形展开角(度) + river_erosion_enhance: 1.3 # 河流距离<200m时的增强系数 + river_distance_threshold: 200 # 河流侵蚀增强触发距离(m) + fault_enhance: 1.2 # 断裂带距离<500m时的增强系数 + fault_distance_threshold: 500 # 断裂带增强触发距离(m) + min_radius_m: 100 # 最小影响半径 + max_radius_m: 2000 # 最大影响半径 + +# ============================================ +# 泥石流 — 河流关联缓冲区 +# ============================================ + +debris_flow: + base_radius_m: 200 # 基础影响半径 + slope_factor: 10 # 坡度影响系数 (半径 = base + slope * factor) + river_buffer_m: 80 # 河道缓冲区宽度 + max_river_search_m: 2000 # 最近河流搜索范围 + min_radius_m: 100 + max_radius_m: 2000 + +# ============================================ +# 山洪 — 多级河流缓冲区 +# ============================================ + +flash_flood: + river_dist_thresholds: [100, 300, 500] # 距河流距离阈值(m) + buffer_by_level: # 对应各级缓冲区(m) + 1: 500 # 干流 + 2: 300 # 一级支流 + 3: 150 # 二级支流 + 4: 80 # 三级以下 + 5: 80 + max_river_search_m: 3000 # 最近河流搜索范围 + +# ============================================ +# 内涝 — TWI简化 + 不透水率 + 管网修正 +# ============================================ + +waterlogging: + base_radius_m: 100 # 基础积水半径 + impervious_factor: 3 # 不透水率修正系数 + pipe_density_factor: 400 # 管网密度修正系数 + pipe_lower_bound: 0.3 # 管网修正下限 + min_radius_m: 50 + max_radius_m: 800 + +# ============================================ +# 崩塌 — 锥体传播模型 (CONEFALL简化) +# ============================================ + +rockfall: + height_drop_m: 50 # 假设崩落高差(m) + reach_angle: # 到达角经验值(度) — Guerin et al.(2022) + 小型: 33 + 中型: 30 + 大型: 27 + 特大型: 25 + fault_angle_penalty: 5 # 断裂带<300m时到达角减少量(度) + fault_penalty_threshold: 300 # 断裂带惩罚触发距离(m) + fault_radius_enhance: 1.5 # 断裂带<200m时半径增强系数 + fault_enhance_threshold: 200 # 半径增强触发距离(m) + min_radius_m: 50 + max_radius_m: 1000 + +# ============================================ +# 默认参数 — 未匹配灾害类型时的兜底 +# ============================================ + +default: + radius_m: 200 diff --git a/app/models/hazard/__init__.py b/app/models/hazard/__init__.py new file mode 100644 index 0000000..653063a --- /dev/null +++ b/app/models/hazard/__init__.py @@ -0,0 +1,3 @@ +from app.models.hazard.hazard_zone_calculator import HazardZoneCalculator, hazard_zone_calculator + +__all__ = ['HazardZoneCalculator', 'hazard_zone_calculator'] diff --git a/app/models/hazard/hazard_zone_calculator.py b/app/models/hazard/hazard_zone_calculator.py new file mode 100644 index 0000000..48ec26f --- /dev/null +++ b/app/models/hazard/hazard_zone_calculator.py @@ -0,0 +1,413 @@ +""" +灾害影响范围计算器 +根据灾害类型和静态因子计算隐患点的影响范围(zone)和影响路径(path) + +论文支撑: +- 滑坡: He et al. (2023) Landslides — 经验到达角法 +- 泥石流: Baggio et al. (2021) NHESS — 河流关联缓冲区 +- 山洪: Costache et al. (2022) STOTEN — 多级缓冲区 +- 内涝: Wang et al. (2022) Water Res. Mgmt — TWI简化法 +- 崩塌: Guerin et al. (2022) Eng. Geology — 锥体传播模型 +""" +import math +import os +from typing import Optional, Tuple, Dict, Any + +import yaml + +from app.config.paths import CONFIG_DIR + +# 配置文件路径 +_PARAMS_PATH = os.path.join(CONFIG_DIR, 'hazard', 'hazard_zone_params.yaml') + +# 地球半径 (WGS84) +_EARTH_RADIUS_M = 6371000.0 + + +def _load_params() -> Dict[str, Any]: + """加载灾害影响范围计算参数""" + with open(_PARAMS_PATH, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + + +def _to_wkt_polygon(coords: list) -> str: + """ + 将坐标列表转换为 WKT POLYGON 字符串 + + Args: + coords: [(lon1, lat1), (lon2, lat2), ...] 首尾需闭合 + """ + points = ', '.join([f'{c[0]:.8f} {c[1]:.8f}' for c in coords]) + return f'POLYGON(({points}))' + + +def _to_wkt_linestring(coords: list) -> str: + """将坐标列表转换为 WKT LINESTRING 字符串""" + points = ', '.join([f'{c[0]:.8f} {c[1]:.8f}' for c in coords]) + return f'LINESTRING({points})' + + +def _offset_to_lonlat(lon: float, lat: float, dx_m: float, dy_m: float) -> Tuple[float, float]: + """ + 将米级偏移转换回经纬度 + + Args: + lon, lat: 中心点经纬度 + dx_m: 东向偏移(m) + dy_m: 北向偏移(m) + + Returns: + (new_lon, new_lat) + """ + lat_rad = math.radians(lat) + dlon = math.degrees(dx_m / (_EARTH_RADIUS_M * math.cos(lat_rad))) + dlat = math.degrees(dy_m / _EARTH_RADIUS_M) + return lon + dlon, lat + dlat + + +def _make_fan_wkt(lon: float, lat: float, direction_deg: float, + spread_deg: float, radius_m: float, + segments: int = 24) -> str: + """ + 创建扇形/锥形影响范围 WKT + + Args: + lon, lat: 隐患点坐标 + direction_deg: 扇形主方向(坡向, 0=北, 90=东) + spread_deg: 扇形总开角(度) + radius_m: 半径(m) + segments: 弧线段数 + + Returns: + POLYGON WKT 字符串 + """ + half_spread = spread_deg / 2.0 + coords = [(lon, lat)] # 中心点 + + for i in range(segments + 1): + angle_deg = direction_deg - half_spread + (spread_deg * i / segments) + angle_rad = math.radians(angle_deg) + dx = radius_m * math.sin(angle_rad) + dy = radius_m * math.cos(angle_rad) + coords.append(_offset_to_lonlat(lon, lat, dx, dy)) + + coords.append((lon, lat)) # 闭合回中心 + return _to_wkt_polygon(coords) + + +def _make_circle_wkt(lon: float, lat: float, radius_m: float, + segments: int = 48) -> str: + """ + 创建圆形缓冲区 WKT(用正多边形近似圆) + + Args: + lon, lat: 中心点 + radius_m: 半径(m) + segments: 边数 + """ + coords = [] + for i in range(segments): + angle_rad = 2 * math.pi * i / segments + dx = radius_m * math.sin(angle_rad) + dy = radius_m * math.cos(angle_rad) + coords.append(_offset_to_lonlat(lon, lat, dx, dy)) + coords.append(coords[0]) # 闭合 + return _to_wkt_polygon(coords) + + +class HazardZoneCalculator: + """ + 灾害影响范围计算器 + + 根据隐患点的灾害类型和静态因子,计算影响范围(WKT多边形)和影响路径(WKT线)。 + 每种灾害类型使用不同的经验公式,参数从 hazard_zone_params.yaml 读取。 + """ + + def __init__(self): + self.params = _load_params() + + # ==================== 公开接口 ==================== + + def calculate(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + lon: float, lat: float, + static_factors: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: + """ + 计算单个隐患点的影响范围 + + Args: + source_id: 隐患点ID + disaster_type: 灾害类型 (滑坡/泥石流/山洪/内涝/崩塌) + scale_grade: 规模等级 (小型/中型/大型) + lon: 经度 + lat: 纬度 + static_factors: 静态因子字典,从 xian_risk_factors.static_factors 获取 + + Returns: + (zone_wkt, path_wkt) — zone为POLYGON WKT, path为LINESTRING WKT或None + """ + # 选择对应灾害类型的计算方法 + method_map = { + '滑坡': self._calc_landslide, + '泥石流': self._calc_debris_flow, + '山洪': self._calc_flash_flood, + '内涝': self._calc_waterlogging, + '崩塌': self._calc_rockfall, + } + method = method_map.get(disaster_type, self._calc_default) + return method(source_id, disaster_type, scale_grade, static_factors, lon, lat) + + # ==================== 滑坡 ==================== + + def _calc_landslide(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + factors: Dict[str, Any], + lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]: + """滑坡: 经验到达角法 + 扇形展开""" + p = self.params['landslide'] + + dem = float(factors.get('dem_value', 0)) + aspect = float(factors.get('aspect_value', 0)) + river_dist = float(factors.get('river_distance', 9999)) + fault_dist = float(factors.get('fault_distance', 9999)) + + # 到达角 + angle_key = scale_grade if scale_grade in p['reach_angle'] else '中型' + reach_angle = p['reach_angle'][angle_key] + + # 半径: H / tan(到达角) + H = p['height_drop_m'] + radius = H / math.tan(math.radians(reach_angle)) + + # 修正 + if river_dist < p['river_distance_threshold']: + radius *= p['river_erosion_enhance'] + if fault_dist < p['fault_distance_threshold']: + radius *= p['fault_enhance'] + + radius = max(p['min_radius_m'], min(radius, p['max_radius_m'])) + + # 扇形 — 沿坡向展开 + if aspect and aspect > 0: + zone_wkt = _make_fan_wkt(lon, lat, aspect, p['fan_angle'], radius) + # 滑坡路径: 从隐患点沿坡向延伸到堆积区边缘 + end_lon, end_lat = _offset_to_lonlat( + lon, lat, + radius * math.sin(math.radians(aspect)), + radius * math.cos(math.radians(aspect)) + ) + path_wkt = _to_wkt_linestring([(lon, lat), (end_lon, end_lat)]) + else: + zone_wkt = _make_circle_wkt(lon, lat, radius) + path_wkt = None + + return zone_wkt, path_wkt + + # ==================== 泥石流 ==================== + + def _calc_debris_flow(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + factors: Dict[str, Any], + lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]: + """ + 泥石流: 沿最近河流方向建立影响区 + + path_geom 为隐患点到最近河流的连线,代表潜在流动路径 + """ + p = self.params['debris_flow'] + + slope = float(factors.get('slope_value', 0)) + + radius = p['base_radius_m'] + slope * p['slope_factor'] + radius = max(p['min_radius_m'], min(radius, p['max_radius_m'])) + + zone_wkt = _make_circle_wkt(lon, lat, radius) + + # 查询最近河流段 + 构建完整流动路径 + river_geom_wkt = self._get_nearest_river_geom(lon, lat, p['max_river_search_m']) + if river_geom_wkt: + # 取河流上最近点 + river_point = self._get_nearest_river_point(lon, lat, p['max_river_search_m']) + if river_point: + path_wkt = _to_wkt_linestring([ + (lon, lat), + (river_point[0], river_point[1]) + ]) + else: + path_wkt = river_geom_wkt # 降级:直接展示河流段 + else: + path_wkt = None + + return zone_wkt, path_wkt + + # ==================== 山洪 ==================== + + def _calc_flash_flood(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + factors: Dict[str, Any], + lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]: + """ + 山洪: 根据距河流距离 + 河流等级确定缓冲区 + + path_geom 为关联的河流段 + """ + p = self.params['flash_flood'] + + river_dist = float(factors.get('river_distance', 9999)) + + thresholds = p['river_dist_thresholds'] + buffer_map = p['buffer_by_level'] + + # 按距离选缓冲半径 + if river_dist < thresholds[0]: + radius = buffer_map[1] + elif river_dist < thresholds[1]: + radius = buffer_map[2] + elif river_dist < thresholds[2]: + radius = buffer_map[3] + else: + radius = buffer_map.get(4, 80) + + zone_wkt = _make_circle_wkt(lon, lat, radius) + + # 山洪路径: 关联的最近河流段(淹没路径沿河展开) + path_wkt = self._get_nearest_river_geom(lon, lat, p['max_river_search_m']) + + return zone_wkt, path_wkt + + # ==================== 内涝 ==================== + + def _calc_waterlogging(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + factors: Dict[str, Any], + lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]: + """内涝: TWI简化 + 不透水率 + 管网修正""" + p = self.params['waterlogging'] + + slope = float(factors.get('slope_value', 1)) + impervious = float(factors.get('impervious_surface', 0)) + pipe_density = float(factors.get('pipe_density', 0)) + + slope_rad = max(math.radians(slope), 0.001) + + # TWI_proxy = ln(1 / tanβ) + twi = math.log(1.0 / math.tan(slope_rad)) + + radius = (p['base_radius_m'] * twi + * (1.0 + impervious * p['impervious_factor']) + * max(p['pipe_lower_bound'], 1.0 - pipe_density * p['pipe_density_factor'])) + + radius = max(p['min_radius_m'], min(radius, p['max_radius_m'])) + + zone_wkt = _make_circle_wkt(lon, lat, radius) + + return zone_wkt, None + + # ==================== 崩塌 ==================== + + def _calc_rockfall(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + factors: Dict[str, Any], + lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]: + """崩塌: 锥体传播模型 (CONEFALL简化)""" + p = self.params['rockfall'] + + aspect = float(factors.get('aspect_value', 0)) + fault_dist = float(factors.get('fault_distance', 9999)) + + # 到达角 + angle_key = scale_grade if scale_grade in p['reach_angle'] else '中型' + reach_angle = p['reach_angle'][angle_key] + + # 断裂带惩罚:减少到达角 = 更平缓 = 影响更大 + if fault_dist < p['fault_penalty_threshold']: + reach_angle -= p['fault_angle_penalty'] + + # 半径 + H = p['height_drop_m'] + radius = H / math.tan(math.radians(max(reach_angle, 10))) + + # 断裂带增强 + if fault_dist < p['fault_enhance_threshold']: + radius *= p['fault_radius_enhance'] + + radius = max(p['min_radius_m'], min(radius, p['max_radius_m'])) + + # 锥形 — 沿坡向,窄扇形 + if aspect and aspect > 0: + zone_wkt = _make_fan_wkt(lon, lat, aspect, 45, radius) + else: + zone_wkt = _make_circle_wkt(lon, lat, radius) + + return zone_wkt, None + + # ==================== 默认 ==================== + + def _calc_default(self, source_id: int, disaster_type: str, + scale_grade: Optional[str], + factors: Dict[str, Any], + lon: float, lat: float) -> Tuple[Optional[str], Optional[str]]: + """未匹配灾害类型的兜底:固定半径圆形""" + p = self.params['default'] + zone_wkt = _make_circle_wkt(lon, lat, p['radius_m']) + return zone_wkt, None + + # ==================== 数据库查询辅助 ==================== + + @staticmethod + def _get_nearest_river_point(lon: float, lat: float, + max_dist_m: float) -> Optional[Tuple[float, float]]: + """ + 查询最近河流上距离隐患点最近的点的坐标 + """ + from app.utils.db_helper import db_helper + + sql = """ + SELECT ST_X(ST_ClosestPoint(r.geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))) AS rx, + ST_Y(ST_ClosestPoint(r.geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))) AS ry + FROM xian_rivers r + WHERE r.is_delete = 0 + AND ST_DWithin(r.geom::geography, + ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography, + %s) + ORDER BY ST_Distance(r.geom::geography, + ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography) + LIMIT 1 + """ + row = db_helper.execute_query_one( + sql, (lon, lat, lon, lat, lon, lat, max_dist_m, lon, lat) + ) + if row and row['rx'] and row['ry']: + return float(row['rx']), float(row['ry']) + return None + + @staticmethod + def _get_nearest_river_geom(lon: float, lat: float, + max_dist_m: float) -> Optional[str]: + """ + 查询最近河流段的 WKT + + Returns: + LINESTRING WKT 字符串或 None + """ + from app.utils.db_helper import db_helper + + sql = """ + SELECT ST_AsText(r.geom) AS wkt + FROM xian_rivers r + WHERE r.is_delete = 0 + AND ST_DWithin(r.geom::geography, + ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography, + %s) + ORDER BY ST_Distance(r.geom::geography, + ST_SetSRID(ST_MakePoint(%s, %s), 4326)::geography) + LIMIT 1 + """ + row = db_helper.execute_query_one(sql, (lon, lat, max_dist_m, lon, lat)) + if row and row['wkt']: + return row['wkt'] + return None + + +# 全局实例 +hazard_zone_calculator = HazardZoneCalculator() diff --git a/app/repositories/hazard_repository.py b/app/repositories/hazard_repository.py new file mode 100644 index 0000000..cd40017 --- /dev/null +++ b/app/repositories/hazard_repository.py @@ -0,0 +1,120 @@ +""" +灾害影响范围数据仓库 +负责 xian_hazard_zones 表的数据读取和写入 +""" +from typing import List, Dict, Any, Optional + +from app.utils.db_helper import db_helper + + +class HazardRepository: + """灾害影响范围数据访问层""" + + # ==================== 隐患点查询 ==================== + + @staticmethod + def get_all_hidden_danger_spots() -> List[Dict[str, Any]]: + """ + 获取所有隐患点(含静态因子) + + Returns: + 隐患点列表,每个元素含: source_id, disaster_name, disaster_type, + scale_grade, county, lon, lat, static_factors + """ + sql = """ + SELECT + h.id AS source_id, + h.disaster_name, + h.disaster_type, + h.scale_grade, + h.county, + rf.lon, + rf.lat, + rf.static_factors + FROM xian_hidden_danger_spots h + JOIN xian_risk_factors rf + ON rf.source_id = h.id AND rf.source_type = 1 + WHERE h.is_delete = 0 + AND rf.is_delete = 0 + AND h.disaster_type IN ('滑坡', '泥石流', '山洪', '内涝', '崩塌') + AND rf.lon IS NOT NULL + AND rf.lat IS NOT NULL + ORDER BY h.id + """ + rows = db_helper.execute_query(sql) + return [{ + 'source_id': r['source_id'], + 'disaster_name': r['disaster_name'], + 'disaster_type': r['disaster_type'], + 'scale_grade': r['scale_grade'], + 'county': r['county'], + 'lon': float(r['lon']), + 'lat': float(r['lat']), + 'static_factors': r.get('static_factors') or {} + } for r in rows] + + # ==================== 影响范围写入 ==================== + + @staticmethod + def delete_existing_zones() -> int: + """逻辑删除已有影响范围记录""" + sql = "UPDATE xian_hazard_zones SET is_delete = 1 WHERE is_delete = 0" + return db_helper.execute_update(sql) + + @staticmethod + def insert_hazard_zone(source_id: int, disaster_type: str, + zone_wkt: str, path_wkt: Optional[str]) -> int: + """ + 插入单条影响范围记录 + + Args: + source_id: 隐患点ID + disaster_type: 灾害类型 + zone_wkt: 影响范围 POLYGON WKT + path_wkt: 影响路径 LINESTRING WKT (可为None) + + Returns: + 新插入记录的ID + """ + if path_wkt: + sql = """ + INSERT INTO xian_hazard_zones (source_id, disaster_type, zone_geom, path_geom) + VALUES (%s, %s, ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326)) + RETURNING id + """ + row = db_helper.execute_query_one(sql, (source_id, disaster_type, zone_wkt, path_wkt)) + else: + sql = """ + INSERT INTO xian_hazard_zones (source_id, disaster_type, zone_geom) + VALUES (%s, %s, ST_GeomFromText(%s, 4326)) + RETURNING id + """ + row = db_helper.execute_query_one(sql, (source_id, disaster_type, zone_wkt)) + return row['id'] if row else 0 + + @staticmethod + def batch_insert_zones(records: List[Dict[str, Any]]) -> int: + """ + 批量插入影响范围记录 + + Args: + records: [{source_id, disaster_type, zone_wkt, path_wkt}, ...] + + Returns: + 成功插入的记录数 + """ + count = 0 + for rec in records: + hid = HazardRepository.insert_hazard_zone( + rec['source_id'], + rec['disaster_type'], + rec['zone_wkt'], + rec.get('path_wkt') + ) + if hid > 0: + count += 1 + return count + + +# 全局实例 +hazard_repository = HazardRepository() diff --git a/app/script/calculate_hazard_zones.py b/app/script/calculate_hazard_zones.py new file mode 100644 index 0000000..4688a5d --- /dev/null +++ b/app/script/calculate_hazard_zones.py @@ -0,0 +1,115 @@ +""" +计算所有隐患点的灾害影响范围并填充 xian_hazard_zones 表 + +仅计算隐患点 (xian_hidden_danger_spots),不计算风险点。 +灾害类型包括: 滑坡、泥石流、山洪、内涝、崩塌 + +""" +import sys +import os +from datetime import datetime + +# 确保项目根目录在 sys.path 中 +_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +if _project_root not in sys.path: + sys.path.insert(0, _project_root) + +from app.utils.logger import LoggerManager +from app.repositories.hazard_repository import hazard_repository +from app.models.hazard.hazard_zone_calculator import hazard_zone_calculator + +_BATCH_SIZE = 200 # 每处理200条输出一次进度 + +logger = LoggerManager.get_logger("hazard_zones", + os.path.join(_project_root, "logs")) + + +def main(): + logger.info("=" * 60) + logger.info("开始计算隐患点灾害影响范围") + logger.info(f"启动时间: {datetime.now().isoformat()}") + logger.info("=" * 60) + + # 1. 获取所有隐患点 + spots = hazard_repository.get_all_hidden_danger_spots() + total = len(spots) + logger.info(f"查询到 {total} 个隐患点") + if total == 0: + logger.warning("没有隐患点数据,退出") + return + + # 按灾害类型统计 + type_counts = {} + for s in spots: + dt = s['disaster_type'] + type_counts[dt] = type_counts.get(dt, 0) + 1 + logger.info(f"灾害类型分布: {type_counts}") + + # 2. 逐点计算 + records = [] + success_count = 0 + skip_count = 0 + error_count = 0 + + for i, spot in enumerate(spots): + sid = spot['source_id'] + dtype = spot['disaster_type'] + dname = spot['disaster_name'] + scale = spot.get('scale_grade') + + try: + zone_wkt, path_wkt = hazard_zone_calculator.calculate( + source_id=sid, + disaster_type=dtype, + scale_grade=scale, + lon=spot['lon'], + lat=spot['lat'], + static_factors=spot['static_factors'] + ) + + if zone_wkt: + records.append({ + 'source_id': sid, + 'disaster_type': dtype, + 'zone_wkt': zone_wkt, + 'path_wkt': path_wkt + }) + success_count += 1 + else: + skip_count += 1 + logger.debug(f"跳过 [{sid}] {dname}({dtype}): 无法计算影响范围") + + except Exception as e: + error_count += 1 + logger.error(f"计算失败 [{sid}] {dname}({dtype}): {e}") + + # 进度输出 + if (i + 1) % _BATCH_SIZE == 0 or (i + 1) == total: + logger.info(f"进度: {i + 1}/{total} " + f"(成功:{success_count} 跳过:{skip_count} 失败:{error_count})") + + # 3. 写入数据库 + logger.info(f"计算完成。开始写入数据库,共 {len(records)} 条记录") + + # 先逻辑删除旧数据 + deleted = hazard_repository.delete_existing_zones() + logger.info(f"已标记 {deleted} 条旧记录为删除") + + # 批量写入新数据 + inserted = hazard_repository.batch_insert_zones(records) + logger.info(f"成功写入 {inserted} 条新记录") + + # 4. 汇总 + logger.info("=" * 60) + logger.info("执行完成") + logger.info(f" 隐患点总数: {total}") + logger.info(f" 计算成功: {success_count}") + logger.info(f" 跳过: {skip_count}") + logger.info(f" 计算失败: {error_count}") + logger.info(f" 写入记录: {inserted}") + logger.info(f"完成时间: {datetime.now().isoformat()}") + logger.info("=" * 60) + + +if __name__ == '__main__': + main()