""" 灾害影响范围数据仓库 负责 xian_hazard_zones 表的数据读取和写入 """ from typing import List, Dict, Any, Optional from app.utils.db_helper import db_helper class HazardRepository: """灾害影响范围数据访问层""" # ==================== 隐患点查询 ==================== @staticmethod def get_all_hidden_danger_spots() -> List[Dict[str, Any]]: """ 获取所有隐患点(含静态因子) Returns: 隐患点列表,每个元素含: source_id, disaster_name, disaster_type, scale_grade, county, lon, lat, static_factors """ sql = """ SELECT h.id AS source_id, h.disaster_name, h.disaster_type, h.scale_grade, h.county, rf.lon, rf.lat, rf.static_factors FROM xian_hidden_danger_spots h JOIN xian_risk_factors rf ON rf.source_id = h.id AND rf.source_type = 1 WHERE h.is_delete = 0 AND rf.is_delete = 0 AND h.disaster_type IN ('滑坡', '泥石流', '山洪', '内涝', '崩塌') AND rf.lon IS NOT NULL AND rf.lat IS NOT NULL ORDER BY h.id """ rows = db_helper.execute_query(sql) return [{ 'source_id': r['source_id'], 'disaster_name': r['disaster_name'], 'disaster_type': r['disaster_type'], 'scale_grade': r['scale_grade'], 'county': r['county'], 'lon': float(r['lon']), 'lat': float(r['lat']), 'static_factors': r.get('static_factors') or {} } for r in rows] # ==================== 影响范围写入 ==================== @staticmethod def delete_existing_zones() -> int: """逻辑删除已有影响范围记录""" sql = "UPDATE xian_hazard_zones SET is_delete = 1 WHERE is_delete = 0" return db_helper.execute_update(sql) @staticmethod def insert_hazard_zone(source_id: int, disaster_type: str, zone_wkt: str, path_wkt: Optional[str]) -> int: """ 插入单条影响范围记录 Args: source_id: 隐患点ID disaster_type: 灾害类型 zone_wkt: 影响范围 POLYGON WKT path_wkt: 影响路径 LINESTRING WKT (可为None) Returns: 新插入记录的ID """ if path_wkt: sql = """ INSERT INTO xian_hazard_zones (source_id, disaster_type, zone_geom, path_geom) VALUES (%s, %s, ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326)) RETURNING id """ row = db_helper.execute_query_one(sql, (source_id, disaster_type, zone_wkt, path_wkt)) else: sql = """ INSERT INTO xian_hazard_zones (source_id, disaster_type, zone_geom) VALUES (%s, %s, ST_GeomFromText(%s, 4326)) RETURNING id """ row = db_helper.execute_query_one(sql, (source_id, disaster_type, zone_wkt)) return row['id'] if row else 0 @staticmethod def batch_insert_zones(records: List[Dict[str, Any]]) -> int: """ 批量插入影响范围记录 Args: records: [{source_id, disaster_type, zone_wkt, path_wkt}, ...] Returns: 成功插入的记录数 """ count = 0 for rec in records: hid = HazardRepository.insert_hazard_zone( rec['source_id'], rec['disaster_type'], rec['zone_wkt'], rec.get('path_wkt') ) if hid > 0: count += 1 return count # 全局实例 hazard_repository = HazardRepository()