""" 离散化模块 负责将连续值转换为离散状态 """ import os import yaml from typing import Optional, List, Dict, Any, Tuple from app.config.paths import DBN_CONFIG_DIR, get_logger logger = get_logger("dbn") class Discretizer: """离散化工具类""" def __init__(self, config_dir: Optional[str] = None): """ 初始化离散化器 Args: config_dir: 配置文件目录,默认为 app/config/dbn """ if config_dir is None: config_dir = str(DBN_CONFIG_DIR) self.config_dir = config_dir self.config = self._load_config() def _load_config(self) -> Dict[str, Any]: """加载离散化配置""" config_path = os.path.join(self.config_dir, 'discretization.yaml') if not os.path.exists(config_path): logger.warning(f"离散化配置文件不存在: {config_path}") return {} with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) return config def discretize(self, factor_name: str, value: float, region_code: Optional[str] = None) -> str: """ 将连续值离散化 Args: factor_name: 因子名称 value: 连续值 region_code: 行政区划代码,用于区域覆盖 Returns: 离散状态标签 """ if factor_name not in self.config: logger.warning(f"因子 {factor_name} 没有离散化配置") return "unknown" factor_config = self.config[factor_name] # 检查是否是分类变量(有mapping字段) if 'mapping' in factor_config: return self._discretize_categorical(factor_config, value) # 检查是否有区域覆盖 if region_code and 'region_overrides' in factor_config: if region_code in factor_config['region_overrides']: override_config = factor_config['region_overrides'][region_code] return self._discretize_continuous(override_config, value) # 使用默认配置 if 'default' in factor_config: return self._discretize_continuous(factor_config['default'], value) elif 'bins' in factor_config: return self._discretize_continuous(factor_config, value) logger.warning(f"因子 {factor_name} 的配置格式不正确") return "unknown" def _discretize_categorical(self, config: Dict[str, Any], value: float) -> str: """ 离散化分类变量 Args: config: 配置 value: 值 Returns: 离散状态标签 """ mapping = config.get('mapping', {}) default = config.get('default', 'unknown') # 将值转换为整数 int_value = int(value) return mapping.get(int_value, default) def _discretize_continuous(self, config: Dict[str, Any], value: float) -> str: """ 离散化连续变量 Args: config: 配置 value: 值 Returns: 离散状态标签 """ bins = config.get('bins', []) labels = config.get('labels', []) if not bins or not labels: logger.warning("离散化配置缺少bins或labels") return "unknown" # 确保bins和labels长度匹配 if len(bins) != len(labels) + 1: logger.warning(f"bins长度({len(bins)})应该比labels长度({len(labels)})多1") return "unknown" # 进行分箱 for i in range(len(bins) - 1): if bins[i] <= value < bins[i + 1]: return labels[i] # 如果值超出范围,返回最后一个标签 if value >= bins[-1]: return labels[-1] return labels[0] def discretize_rain_intensity(self, rainfall_mm_h: float) -> str: """ 离散化降雨强度 Args: rainfall_mm_h: 降雨强度(mm/h) Returns: 离散状态标签 """ return self.discretize('rain_intensity', rainfall_mm_h) def discretize_duration(self, duration_hours: float) -> str: """ 离散化持续时间 Args: duration_hours: 持续时间(小时) Returns: 离散状态标签 """ return self.discretize('duration', duration_hours) def discretize_accum_rain(self, accum_rain_mm: float) -> str: """ 离散化累计降雨量 Args: accum_rain_mm: 累计降雨量(mm) Returns: 离散状态标签 """ return self.discretize('accum_rain', accum_rain_mm) # ---- 地震触发层离散化 ---- def discretize_magnitude(self, magnitude: float) -> str: """ 离散化地震震级 Args: magnitude: 震级(Richter) Returns: 离散状态标签 """ return self.discretize('magnitude', magnitude) def discretize_epicenter_distance(self, distance_km: float) -> str: """ 离散化震中距 Args: distance_km: 震中距(km) Returns: 离散状态标签 """ return self.discretize('epicenter_distance', distance_km) def discretize_seismic_intensity(self, intensity: float) -> str: """ 离散化地震烈度 Args: intensity: 地震烈度(中国烈度表数值) Returns: 离散状态标签 """ return self.discretize('seismic_intensity', intensity) def discretize_elevation(self, elevation_m: float, region_code: Optional[str] = None) -> str: """ 离散化高程 Args: elevation_m: 高程(米) region_code: 行政区划代码 Returns: 离散状态标签 """ return self.discretize('elevation', elevation_m, region_code) def discretize_slope(self, slope_deg: float) -> str: """ 离散化坡度 Args: slope_deg: 坡度(度) Returns: 离散状态标签 """ return self.discretize('slope', slope_deg) def discretize_aspect(self, aspect_deg: float) -> str: """ 离散化坡向 Args: aspect_deg: 坡向(度) Returns: 离散状态标签 """ return self.discretize('aspect', aspect_deg) def discretize_soil_type(self, soil_type_code: int) -> str: """ 离散化土壤类型 Args: soil_type_code: 土壤类型代码 Returns: 离散状态标签 """ return self.discretize('soil_type', soil_type_code) def discretize_lithology(self, lithology_code: int) -> str: """ 离散化岩性 Args: lithology_code: 岩性代码 Returns: 离散状态标签 """ return self.discretize('lithology', lithology_code) def discretize_landuse(self, landuse_code: int) -> str: """ 离散化土地利用类型 Args: landuse_code: 土地利用类型代码 Returns: 离散状态标签 """ return self.discretize('landuse', landuse_code) def discretize_terrain(self, terrain_code: int) -> str: """ 离散化地形分类 Args: terrain_code: 地形分类代码 Returns: 离散状态标签 """ return self.discretize('terrain', terrain_code) def discretize_impervious(self, impervious_ratio: float) -> str: """ 离散化不透水面 Args: impervious_ratio: 不透水面比例 Returns: 离散状态标签 """ return self.discretize('impervious', impervious_ratio) def discretize_ndvi(self, ndvi_value: float) -> str: """ 离散化植被指数 Args: ndvi_value: NDVI值 Returns: 离散状态标签 """ return self.discretize('ndvi', ndvi_value) def discretize_sand_content(self, sand_percent: float) -> str: """ 离散化土壤含沙量 Args: sand_percent: 含沙量百分比 Returns: 离散状态标签 """ return self.discretize('sand_content', sand_percent) def discretize_ph(self, ph_value: float) -> str: """ 离散化土壤PH值 Args: ph_value: PH值 Returns: 离散状态标签 """ return self.discretize('ph', ph_value) def discretize_soil_moisture(self, moisture_percent: float) -> str: """ 离散化土壤湿度 Args: moisture_percent: 湿度百分比 Returns: 离散状态标签 """ return self.discretize('soil_moisture', moisture_percent) def discretize_organic_carbon(self, carbon_percent: float) -> str: """ 离散化有机碳 Args: carbon_percent: 有机碳百分比 Returns: 离散状态标签 """ return self.discretize('organic_carbon', carbon_percent) def discretize_dist_to_river(self, distance_m: float) -> str: """ 离散化距离河道距离 Args: distance_m: 距离(米) Returns: 离散状态标签 """ return self.discretize('dist_to_river', distance_m) def discretize_dist_to_fault(self, distance_m: float) -> str: """ 离散化距离断裂带距离 Args: distance_m: 距离(米) Returns: 离散状态标签 """ return self.discretize('dist_to_fault', distance_m) def discretize_pipe_density(self, density: float, region_code: Optional[str] = None) -> str: """ 离散化供水管网密度 Args: density: 管网密度(m/m²) region_code: 行政区划代码 Returns: 离散状态标签 """ return self.discretize('pipe_density', density, region_code) def discretize_all_factors(self, factors: Dict[str, Any], region_code: Optional[str] = None) -> Dict[str, str]: """ 离散化所有因子 Args: factors: 因子字典,key为因子名称,value为连续值 region_code: 行政区划代码 Returns: 离散化后的因子字典 """ result = {} # 暴雨触发层 if 'rain_intensity' in factors: result['rain_intensity'] = self.discretize_rain_intensity(factors['rain_intensity']) if 'duration' in factors: result['duration'] = self.discretize_duration(factors['duration']) if 'accum_rain' in factors: result['accum_rain'] = self.discretize_accum_rain(factors['accum_rain']) # 地震触发层 if 'magnitude' in factors: result['magnitude'] = self.discretize_magnitude(factors['magnitude']) if 'epicenter_distance' in factors: result['epicenter_distance'] = self.discretize_epicenter_distance(factors['epicenter_distance']) if 'seismic_intensity' in factors: result['seismic_intensity'] = self.discretize_seismic_intensity(factors['seismic_intensity']) # 环境层 if 'elevation' in factors: result['elevation'] = self.discretize_elevation(factors['elevation'], region_code) if 'slope' in factors: result['slope'] = self.discretize_slope(factors['slope']) if 'aspect' in factors: result['aspect'] = self.discretize_aspect(factors['aspect']) if 'soil_type' in factors: result['soil_type'] = self.discretize_soil_type(factors['soil_type']) if 'lithology' in factors: result['lithology'] = self.discretize_lithology(factors['lithology']) if 'landuse' in factors: result['landuse'] = self.discretize_landuse(factors['landuse']) if 'terrain' in factors: result['terrain'] = self.discretize_terrain(factors['terrain']) if 'impervious' in factors: result['impervious'] = self.discretize_impervious(factors['impervious']) if 'ndvi' in factors: result['ndvi'] = self.discretize_ndvi(factors['ndvi']) if 'sand_content' in factors: result['sand_content'] = self.discretize_sand_content(factors['sand_content']) if 'ph' in factors: result['ph'] = self.discretize_ph(factors['ph']) if 'soil_moisture' in factors: result['soil_moisture'] = self.discretize_soil_moisture(factors['soil_moisture']) if 'organic_carbon' in factors: result['organic_carbon'] = self.discretize_organic_carbon(factors['organic_carbon']) if 'dist_to_river' in factors: result['dist_to_river'] = self.discretize_dist_to_river(factors['dist_to_river']) if 'dist_to_fault' in factors: result['dist_to_fault'] = self.discretize_dist_to_fault(factors['dist_to_fault']) if 'pipe_density' in factors: result['pipe_density'] = self.discretize_pipe_density(factors['pipe_density'], region_code) return result # 创建全局实例 discretizer = Discretizer()