修改启动前检查方式

This commit is contained in:
wzy-warehouse
2026-05-30 15:03:39 +08:00
parent f4ee8fa795
commit d2ffaad7a3
4 changed files with 54 additions and 42 deletions
+4 -3
View File
@@ -9,7 +9,6 @@ from app.core.venv_manager import check_virtualenv
from app.core.dependency_manager import check_dependencies
from app.utils.logger import get_logger
from app.utils.thread_pool_manager import block_main_thread, thread_pool_manager
from app.core.rainfall_manager import rainfall_manager
class AppLauncher:
@@ -52,11 +51,13 @@ class AppLauncher:
def start():
"""启动应用服务"""
from app.core.rainfall_manager import rainfall_manager
logger = get_logger()
# 启动降雨站点监测
logger.info("启动降雨站点监测服务...")
rainfall_manager.monitoring_rainfall_station_id('2025-09-16 20:00:00')
# 阻塞主线程,防止程序立即退出
block_main_thread()
+9 -6
View File
@@ -8,8 +8,6 @@ from typing import Optional
from app.utils import thread_pool_manager
from app.utils.logger import get_logger
from app.repositories.rainfall_repository import rainfall_repository
from app.services.rainfall_grid_service import rainfall_grid_service
class RainfallManager:
@@ -42,12 +40,14 @@ class RainfallManager:
def _monitoring_loop(self, initial_query_time: datetime):
"""
监测循环,定期检查最大ID是否改变
Args:
initial_query_time: 初始查询时间
"""
from app.repositories.rainfall_repository import rainfall_repository
query_time = initial_query_time
while True:
try:
# 查询当前时间窗口内的最大ID
@@ -78,14 +78,17 @@ class RainfallManager:
def _generate_rainfall_grid_task(self, query_time: datetime, max_id: int):
"""
生成降雨栅格的任务函数
Args:
query_time: 查询时间
max_id: 最大ID
"""
from app.repositories.rainfall_repository import rainfall_repository
from app.services.rainfall_grid_service import rainfall_grid_service
try:
self.logger.info(f"开始生成降雨栅格,查询时间: {query_time}, ID: {max_id}")
# 1. 查询雨量站点数据
station_data = rainfall_repository.get_rainfall_stations_data(query_time)
+38 -32
View File
@@ -8,17 +8,7 @@ from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from io import BytesIO
import numpy as np
from scipy.spatial import Delaunay, ConvexHull
from scipy.interpolate import griddata
from scipy.ndimage import gaussian_filter
from PIL import Image
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, BoundaryNorm
from config import settings
from app.utils.logger import get_logger
from app.utils.redis_helper import redis_helper
class RainfallGridService:
@@ -54,16 +44,18 @@ class RainfallGridService:
# 栅格分辨率(度)
self.grid_resolution = 0.01 # 约1km
def _create_buffer_points(self, points_array: np.ndarray) -> np.ndarray:
def _create_buffer_points(self, points_array) -> 'np.ndarray':
"""
创建缓冲点:在原始站点外围生成虚拟点以扩展插值区域
Args:
points_array: 原始站点坐标数组
Returns:
缓冲点坐标数组
"""
import numpy as np
# 计算站点分布的中心
center = np.mean(points_array, axis=0)
@@ -89,37 +81,39 @@ class RainfallGridService:
def _calculate_adaptive_max_distance(
self,
points_array: np.ndarray,
points_array,
base_distance: float = 0.3,
min_distance: float = 0.15,
max_distance: float = 0.5
) -> float:
"""
根据站点密度自适应计算最大影响距离
Args:
points_array: 站点坐标数组
base_distance: 基础距离
min_distance: 最小距离
max_distance: 最大距离
Returns:
自适应的最大影响距离
"""
import numpy as np
from scipy.spatial import distance_matrix
if len(points_array) < 3:
return base_distance
# 计算站点间的平均距离
from scipy.spatial import distance_matrix
dist_matrix = distance_matrix(points_array, points_array)
# 排除对角线(自身距离为0
np.fill_diagonal(dist_matrix, np.inf)
avg_distance = np.mean(np.min(dist_matrix, axis=1))
# 根据平均距离调整max_distance
adaptive_distance = avg_distance * 3 # 约3倍平均站点间距
# 限制在合理范围内
return float(np.clip(adaptive_distance, min_distance, max_distance))
@@ -131,13 +125,17 @@ class RainfallGridService:
2. 自适应距离阈值
3. 边缘渐变处理
4. 高斯平滑减少突变
Args:
station_data: 站点数据列表
Returns:
插值结果字典
"""
import numpy as np
from scipy.spatial import Delaunay, ConvexHull, distance_matrix
from scipy.ndimage import gaussian_filter
# 提取站点坐标和降雨量
points_array = np.array([[s['lon'], s['lat']] for s in station_data])
values_array = np.array([s['rainfall'] for s in station_data])
@@ -184,7 +182,6 @@ class RainfallGridService:
hull_mask = hull_mask.reshape(grid_lon.shape)
# 计算置信度:基于到最近站点的距离
from scipy.spatial import distance_matrix
grid_valid = grid_points[hull_mask.ravel()]
if len(grid_valid) > 0:
dist_to_stations = distance_matrix(grid_valid, points_array)
@@ -289,29 +286,35 @@ class RainfallGridService:
def save_rainfall_grid_png(self, grid_data: Dict[str, Any], max_id: int) -> Optional[str]:
"""
将降雨栅格保存为PNG图片(背景透明)
Args:
grid_data: 栅格数据
max_id: 最大ID
Returns:
PNG文件相对路径,失败返回None
"""
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, BoundaryNorm
from PIL import Image
from config import settings
try:
grid_values = grid_data['grid_values']
lon_range = grid_data['lon_range']
lat_range = grid_data['lat_range']
# 创建自定义颜色映射
levels = self.rainfall_levels['levels']
colors = self.rainfall_levels['colors']
cmap = ListedColormap([
tuple(c / 255.0 for c in color) for color in colors
])
norm = BoundaryNorm(levels, cmap.N)
# 创建图形(设置dpi确保不拉伸)
fig, ax = plt.subplots(1, 1, figsize=(10, 10), dpi=100)
@@ -373,17 +376,20 @@ class RainfallGridService:
self.logger.error(f"保存PNG图片失败: {e}", exc_info=True)
return None
def store_to_redis(self, png_path: str, max_id: int,
def store_to_redis(self, png_path: str, max_id: int,
query_time, station_data: List[Dict[str, Any]]):
"""
将栅格信息存储到Redis
Args:
png_path: PNG文件相对路径
max_id: 最大ID
query_time: 查询时间(datetime对象或字符串)
station_data: 站点数据
"""
from config import settings
from app.utils.redis_helper import redis_helper
try:
redis_key = settings.REDIS_RAIN_STATION_GRID_KEY
redis_identifier_key = settings.REDIS_RAIN_STATION_IDENTIFIER_KEY