Files

77 lines
2.3 KiB
Python
Raw Permalink Normal View History

2026-06-06 08:38:19 +08:00
"""
FastAPI 服务创建与配置
"""
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from app.utils.api_deps import get_rainfall_model, get_earthquake_model, is_model_loaded
from app.schemas.api_schemas import HealthResponse
from app.config.paths import get_logger
2026-06-19 17:04:03 +08:00
from config import settings
2026-06-06 08:38:19 +08:00
logger = get_logger("api")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期:启动时预加载模型"""
logger.info("正在预加载DBN模型...")
get_rainfall_model()
get_earthquake_model()
logger.info("DBN模型预加载完成")
2026-06-19 17:04:03 +08:00
2026-06-20 15:50:24 +08:00
# 检测 QGIS 子进程环境
2026-06-19 17:04:03 +08:00
qgis_root = getattr(settings, "QGIS_ROOT", None)
if qgis_root:
try:
2026-06-20 15:50:24 +08:00
from app.services.qgis.qgis_env import is_qgis_available
if is_qgis_available(qgis_root):
logger.info("QGIS 环境检测通过(子进程模式)")
else:
logger.warning("QGIS 环境不可用(未找到 Python 3.12 解释器),专题图功能降级")
2026-06-19 17:04:03 +08:00
except Exception as e:
2026-06-20 15:50:24 +08:00
logger.error(f"QGIS 环境检测失败: {e}")
2026-06-19 17:04:03 +08:00
2026-06-06 08:38:19 +08:00
yield
2026-06-20 15:50:24 +08:00
# 清理资源
2026-06-19 17:04:03 +08:00
try:
2026-06-20 15:50:24 +08:00
from app.api.qgis_map_export import shutdown_thread_pool
shutdown_thread_pool()
2026-06-19 17:04:03 +08:00
except Exception:
pass
logger.info("应用关闭")
2026-06-06 08:38:19 +08:00
def create_app() -> FastAPI:
"""创建 FastAPI 应用实例"""
application = FastAPI(
title="西安灾害链预测服务",
description="基于动态贝叶斯网络的暴雨/地震灾害链预测API",
version="1.0.0",
lifespan=lifespan,
)
@application.middleware("http")
async def log_requests(request: Request, call_next):
"""请求日志中间件"""
start = time.time()
response = await call_next(request)
elapsed = time.time() - start
logger.info(f"{request.method} {request.url.path} -> {response.status_code} ({elapsed:.3f}s)")
return response
# 注册路由
from app.api import register_routers
register_routers(application)
@application.get("/health", response_model=HealthResponse, tags=["系统"])
async def health_check():
"""健康检查"""
status = is_model_loaded()
return HealthResponse(status="ok", **status)
return application