507 lines
20 KiB
Python
507 lines
20 KiB
Python
"""
|
|
QGIS 专题图导出接口
|
|
- 查询该记录的 occurred_time,格式化为时间戳(如 20260619143000)作为文件夹名
|
|
- 同一 occurred_time 视为同一场灾害,共享文件夹,只产图一次
|
|
- 线程池并发产图(默认 4 worker)
|
|
- 输出:FILE_STORE_DIR/xian/qgis/map/{disasterTime}/{模板名称}.jpg
|
|
- 标题格式:陕西西安{区县名称}{震级/降雨量}{模板名称}
|
|
"""
|
|
import asyncio
|
|
import concurrent.futures
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.config.paths import get_logger
|
|
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
|
|
from app.repositories import qgis_repository
|
|
from app.schemas.api_schemas import QgisMapExportResponse, QgisMapExportRequest
|
|
from app.utils.api_deps import get_prediction_semaphore
|
|
from app.utils.db_helper import db_helper
|
|
from config import settings
|
|
|
|
logger = get_logger("api.qgis")
|
|
|
|
router = APIRouter(prefix="/qgis", tags=["专题图导出"])
|
|
|
|
# 去重锁
|
|
_in_progress_locks: dict[str, asyncio.Lock] = {}
|
|
_locks_lock = asyncio.Lock()
|
|
|
|
# 西安市中心
|
|
xian_center = getattr( settings, "XIAN_CENTER", [108.948024, 34.263161])
|
|
|
|
|
|
# ============================================================
|
|
# region_code → 区县名称映射
|
|
# ============================================================
|
|
|
|
REGION_CODE_MAP = settings.area.to_dict()
|
|
|
|
|
|
def _resolve_district(condition: dict) -> str:
|
|
code = condition.get("region_code")
|
|
if code and str(code) in REGION_CODE_MAP:
|
|
return REGION_CODE_MAP[str(code)]
|
|
return ""
|
|
|
|
|
|
def _build_map_title(event_type: str, condition: dict, template_name: str) -> str:
|
|
district = _resolve_district(condition)
|
|
prefix = f"陕西西安{district}" if district else "陕西西安"
|
|
if event_type == "earthquake":
|
|
magnitude = condition.get("magnitude")
|
|
if magnitude is not None:
|
|
return f"{prefix}{float(magnitude)}级{template_name}"
|
|
return f"{prefix}{template_name}"
|
|
elif event_type == "rainfall":
|
|
rainfall = condition.get("rainfall")
|
|
if rainfall is not None and rainfall != "":
|
|
return f"{prefix}{float(rainfall)}mm{template_name}"
|
|
return f"{prefix}{template_name}"
|
|
return f"{prefix}{template_name}"
|
|
|
|
|
|
def _build_info_text(event_type: str, condition: dict, occurred_time) -> str:
|
|
lines = []
|
|
if isinstance(occurred_time, datetime):
|
|
time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分"
|
|
elif occurred_time:
|
|
time_str = str(occurred_time)
|
|
else:
|
|
time_str = datetime.now().strftime("%Y年%m月%d日%H时%M分")
|
|
lines.append(f"时间:{time_str}")
|
|
if event_type == "rainfall":
|
|
rainfall = condition.get("rainfall")
|
|
if rainfall is not None and rainfall != "":
|
|
lines.append(f"累计降雨量:{float(rainfall)}mm")
|
|
duration = condition.get("duration")
|
|
if duration is not None and duration != "":
|
|
lines.append(f"已持续:{duration}")
|
|
elif event_type == "earthquake":
|
|
magnitude = condition.get("magnitude")
|
|
if magnitude is not None and magnitude != "":
|
|
lines.append(f"震级:{float(magnitude)}级")
|
|
lon = condition.get("epicenter_lon")
|
|
lat = condition.get("epicenter_lat")
|
|
if lon is not None and lat is not None:
|
|
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _derive_model_params(
|
|
inference: dict, batch_folder: str, template_path: str
|
|
) -> dict:
|
|
"""从推理结果 + 模板路径推导专题图生成所需的全部参数。"""
|
|
event_type = inference["event_type"]
|
|
condition = inference["condition"]
|
|
occurred_time = inference["occurred_time"]
|
|
|
|
map_unit = getattr(settings, "QGIS_DEFAULTS_MAP_UNIT", "西安市应急管理局")
|
|
|
|
map_layout = getattr(settings, "QGIS_DEFAULTS_MAP_LAYOUT", "A3")
|
|
zoom_rule = getattr(settings, "QGIS_DEFAULTS_ZOOM_RULE", "11")
|
|
zoom_value = getattr(settings, "QGIS_DEFAULTS_ZOOM_VALUE", "50")
|
|
|
|
# 从模板文件名推导标题和输出文件名(去掉 .qgz 后缀)
|
|
template_name = os.path.splitext(os.path.basename(template_path))[0]
|
|
map_title = _build_map_title(event_type, condition, template_name)
|
|
|
|
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
|
|
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
|
|
|
|
# 制图时间统一用当前日期
|
|
map_time = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
center_x, center_y = _extract_center_from_condition(
|
|
event_type, condition
|
|
)
|
|
|
|
info_text = _build_info_text(event_type, condition, occurred_time)
|
|
|
|
return {
|
|
"name": f"inference_{inference['id']}",
|
|
"path": template_path,
|
|
"outFile": out_file,
|
|
"mapLayout": map_layout,
|
|
"mapTitle": map_title,
|
|
"mapTime": map_time,
|
|
"mapUint": map_unit,
|
|
"info": info_text,
|
|
"centerX": center_x,
|
|
"centerY": center_y,
|
|
"event": str(inference["id"]),
|
|
"queueId": str(inference["id"]),
|
|
"zoomRule": zoom_rule,
|
|
"zoomValue": zoom_value,
|
|
}
|
|
|
|
|
|
def _extract_center_from_condition(event_type: str, condition: dict) -> tuple:
|
|
"""从 condition 提取地图中心坐标"""
|
|
if event_type == "earthquake":
|
|
lon = condition.get("epicenter_lon", xian_center[0])
|
|
lat = condition.get("epicenter_lat", xian_center[1])
|
|
return float(lon), float(lat)
|
|
else:
|
|
return xian_center[0], xian_center[1]
|
|
|
|
|
|
# ============================================================
|
|
# 构建 QGIS 服务配置字典
|
|
# ============================================================
|
|
|
|
def _build_qgis_config(batch_folder: str) -> dict:
|
|
"""构建 QGIS 服务配置(含批次输出目录)"""
|
|
gpkg_dir = get_gpkg_dir()
|
|
|
|
return {
|
|
"db": {
|
|
"host": settings.DB_HOST,
|
|
"port": settings.DB_PORT,
|
|
"database": settings.DB_NAME,
|
|
"username": settings.DB_USER,
|
|
"password": settings.DB_PASSWORD,
|
|
},
|
|
"qgis": {
|
|
"exportDpi": getattr(settings, "QGIS_EXPORT_DPI", 300),
|
|
},
|
|
"static_layers": build_static_layers_config(gpkg_dir),
|
|
"batch_folder": batch_folder,
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# 接口实现
|
|
# ============================================================
|
|
|
|
@router.post("/export/map", response_model=QgisMapExportResponse, summary="QGIS 批量专题图导出")
|
|
async def export_map(req: QgisMapExportRequest):
|
|
"""
|
|
根据模拟ID批量导出专题图。同一 occurred_time 视为同一场灾害,共享文件夹
|
|
"""
|
|
from app.services.qgis.qgis_env import is_qgis_available
|
|
if not is_qgis_available():
|
|
raise HTTPException(status_code=503, detail="QGIS 环境不可用(未找到 QGIS Python 3.12 解释器)")
|
|
|
|
semaphore = get_prediction_semaphore()
|
|
|
|
async with semaphore:
|
|
inference_id = req.inferenceId
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
inference = await loop.run_in_executor(
|
|
None, qgis_repository.query_inference_result, inference_id
|
|
)
|
|
|
|
event_type = inference["event_type"]
|
|
batch_key = str(inference_id)
|
|
|
|
# 构建批次文件夹路径(用 inferenceId 作为唯一文件夹名)
|
|
file_store = getattr(settings, "FILE_STORE_DIR", "G:/files").replace("\\", "/")
|
|
output_tmpl = getattr(settings, "QGIS_OUTPUT_DIR", "xian/qgis/map/:eventType/:inferenceId")
|
|
output_dir = output_tmpl.replace(":eventType", event_type).replace(":inferenceId", batch_key)
|
|
batch_folder = os.path.join(file_store, output_dir).replace("\\", "/")
|
|
|
|
# 去重锁
|
|
async with _locks_lock:
|
|
if batch_key not in _in_progress_locks:
|
|
_in_progress_locks[batch_key] = asyncio.Lock()
|
|
task_lock = _in_progress_locks[batch_key]
|
|
|
|
async with task_lock:
|
|
# 精确判断在模板扫描后(比较文件数 vs 模板数)
|
|
|
|
logger.info(
|
|
f"推理结果查询成功: id={inference['id']}, "
|
|
f"type={event_type}, "
|
|
f"occurred_time={inference['occurred_time']}"
|
|
)
|
|
|
|
os.makedirs(batch_folder, exist_ok=True)
|
|
|
|
config = _build_qgis_config(batch_folder)
|
|
|
|
# 扫描模板文件夹下所有 .qgz 文件
|
|
template_base = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
|
|
"app", "data", "template"
|
|
)
|
|
template_dir = os.path.join(template_base, event_type)
|
|
template_files = sorted([
|
|
f for f in os.listdir(template_dir)
|
|
if f.endswith(".qgz") and not f.startswith("tmp")
|
|
])
|
|
# 优先模板排到前面
|
|
priority_keywords = getattr(settings, "QGIS_PRIORITY_TEMPLATES", [])
|
|
if priority_keywords:
|
|
def _priority_order(name: str) -> tuple:
|
|
for i, kw in enumerate(priority_keywords):
|
|
if kw in name:
|
|
return (0, i) # 优先组,按配置顺序
|
|
return (1, name) # 非优先组,按文件名
|
|
template_files.sort(key=_priority_order)
|
|
priority_count = sum(
|
|
1 for f in template_files
|
|
if any(kw in f for kw in priority_keywords)
|
|
)
|
|
logger.info(f"优先模板: {priority_count} 张排前面")
|
|
|
|
if not template_files:
|
|
raise FileNotFoundError(f"模板文件夹为空: {template_dir}")
|
|
|
|
# 检查已产出的图片,只生成缺失的
|
|
existing = set()
|
|
if os.path.isdir(batch_folder):
|
|
existing = {f for f in os.listdir(batch_folder) if f.endswith(".jpg")}
|
|
|
|
if len(existing) == len(template_files):
|
|
logger.info(f"[跳过] {len(existing)}/{len(template_files)} 张已全部产出")
|
|
return QgisMapExportResponse(
|
|
code=200,
|
|
message=f"已存在,共{len(existing)}张",
|
|
data=batch_key,
|
|
)
|
|
|
|
missing_templates = []
|
|
for tpl_file in template_files:
|
|
out_name = os.path.splitext(tpl_file)[0] + ".jpg"
|
|
if out_name not in existing:
|
|
missing_templates.append(tpl_file)
|
|
|
|
if len(missing_templates) < len(template_files):
|
|
logger.info(
|
|
f"[增量] 已有{len(existing)}张, 缺{len(missing_templates)}张"
|
|
)
|
|
template_files = missing_templates
|
|
|
|
# 构建所有模型参数(批量模式)
|
|
models = []
|
|
for tpl_file in template_files:
|
|
tpl_path = os.path.join(template_dir, tpl_file).replace("\\", "/")
|
|
model = _derive_model_params(inference, batch_folder, tpl_path)
|
|
logger.info(
|
|
f"模板: {tpl_path}, "
|
|
f"输出: {model['outFile']}, "
|
|
f"标题: {model['mapTitle']}, "
|
|
f"中心: ({model['centerX']}, {model['centerY']})"
|
|
)
|
|
models.append(model)
|
|
|
|
# 一次性提交所有模型到 QGIS 子进程(并行多进程,实时写进度)
|
|
_generate_batch_maps(models, config, batch_key, inference_id, file_store)
|
|
|
|
return QgisMapExportResponse(
|
|
code=200,
|
|
message=f"任务已完成,共{len(models)}张专题图",
|
|
data=batch_key,
|
|
)
|
|
|
|
except ValueError as e:
|
|
logger.warning(f"参数错误: {e}")
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except FileNotFoundError as e:
|
|
logger.error(f"模板文件不存在: {e}")
|
|
raise HTTPException(status_code=404, detail=f"模板文件不存在: {e}")
|
|
except Exception as e:
|
|
logger.error(f"专题图导出失败: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"导出失败: {e}")
|
|
finally:
|
|
pass
|
|
|
|
|
|
def _generate_batch_maps(models: list, config: dict, batch_key: str,
|
|
inference_id: int = None, file_store: str = None) -> None:
|
|
"""并行启动多个 QGIS 子进程,实时读取每张图进度并写 DB"""
|
|
import json, math, concurrent.futures, subprocess, tempfile, threading
|
|
from app.services.qgis.qgis_env import (
|
|
get_qgis_python_path, get_runner_script, build_qgis_subprocess_env,
|
|
)
|
|
|
|
max_workers = getattr(settings, "QGIS_PARALLEL_WORKERS", 4)
|
|
workers = min(max_workers, len(models))
|
|
chunk_size = math.ceil(len(models) / workers)
|
|
|
|
chunks = []
|
|
for i in range(0, len(models), chunk_size):
|
|
chunks.append(models[i:i + chunk_size])
|
|
|
|
logger.info(
|
|
f"[批量产图] {len(models)} 张图 → {len(chunks)} 个并行子进程 "
|
|
f"(每进程 {chunk_size} 张)"
|
|
)
|
|
|
|
errors = []
|
|
all_results = []
|
|
db_lock = threading.Lock() # 保护 DB 写入
|
|
|
|
def _run_chunk(chunk_models: list, chunk_idx: int):
|
|
"""单个子进程,逐张读取进度并实时写 DB"""
|
|
request = json.dumps({"config": config, "models": chunk_models}, ensure_ascii=False)
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w", encoding="utf-8")
|
|
tmp.write(request)
|
|
tmp.close()
|
|
|
|
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
|
|
python_path = get_qgis_python_path(qgis_root)
|
|
runner = get_runner_script()
|
|
cmd = [python_path, runner, tmp.name]
|
|
|
|
proc = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
|
|
env=build_qgis_subprocess_env(qgis_root),
|
|
text=True, encoding="utf-8", errors="replace",
|
|
)
|
|
|
|
chunk_results = []
|
|
for line in proc.stdout:
|
|
line = line.strip()
|
|
if line.startswith("PROGRESS:"):
|
|
try:
|
|
r = json.loads(line[len("PROGRESS:"):])
|
|
chunk_results.append(r)
|
|
# ★ 每张图产出后立即写 DB
|
|
if inference_id and file_store and "error" not in r:
|
|
_write_single_path(inference_id, r.get("output", ""), file_store, db_lock)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
proc.wait()
|
|
try:
|
|
os.remove(tmp.name)
|
|
except OSError:
|
|
pass
|
|
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"[子进程{chunk_idx}] 失败 (exit={proc.returncode})")
|
|
|
|
return chunk_results
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor:
|
|
futures = {executor.submit(_run_chunk, c, i): i for i, c in enumerate(chunks)}
|
|
for future in concurrent.futures.as_completed(futures):
|
|
chunk_idx = futures[future]
|
|
try:
|
|
results = future.result()
|
|
all_results.extend(results)
|
|
except Exception as e:
|
|
logger.error(f"[批量产图] 子进程 {chunk_idx} 失败: {e}")
|
|
errors.append(str(e))
|
|
|
|
if errors and not all_results:
|
|
raise RuntimeError(f"所有子进程均失败: {'; '.join(errors[:2])}")
|
|
|
|
success_count = sum(1 for r in all_results if "error" not in r)
|
|
fail_count = len(all_results) - success_count
|
|
logger.info(f"[批量产图] 完成: 成功={success_count}, 失败={fail_count}")
|
|
for r in all_results:
|
|
if "error" not in r:
|
|
logger.info(f" OK {r.get('output', 'N/A')}")
|
|
else:
|
|
logger.error(f" FAIL {r['name']}: {r.get('error', 'unknown')}")
|
|
if success_count == 0 and fail_count > 0:
|
|
first_err = all_results[0].get("error", "unknown")
|
|
raise RuntimeError(f"所有模型均失败 ({fail_count}张): {first_err}")
|
|
|
|
|
|
def _generate_maps_subprocess(chunk_models: list, config: dict, chunk_idx: int) -> list:
|
|
"""单个 QGIS 子进程,处理一批模板,返回结果列表"""
|
|
import json
|
|
import subprocess
|
|
import tempfile
|
|
from app.services.qgis.qgis_env import (
|
|
get_qgis_python_path, get_runner_script, build_qgis_subprocess_env,
|
|
)
|
|
|
|
request_data = json.dumps(
|
|
{"config": config, "models": chunk_models},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
tmp_json = tempfile.NamedTemporaryFile(
|
|
suffix=".json", delete=False, mode="w", encoding="utf-8"
|
|
)
|
|
tmp_json.write(request_data)
|
|
tmp_json.close()
|
|
|
|
try:
|
|
from config import settings
|
|
qgis_root = getattr(settings, "QGIS_ROOT", "D:/QGIS")
|
|
python_path = get_qgis_python_path(qgis_root)
|
|
if not python_path:
|
|
raise RuntimeError("未找到 QGIS Python 3.12 解释器")
|
|
runner_script = get_runner_script()
|
|
cmd = [python_path, runner_script, tmp_json.name]
|
|
|
|
logger.info(f"[子进程{chunk_idx}] 启动: {len(chunk_models)} 张图")
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
timeout=600,
|
|
env=build_qgis_subprocess_env(qgis_root),
|
|
)
|
|
finally:
|
|
try:
|
|
os.remove(tmp_json.name)
|
|
except OSError:
|
|
pass
|
|
|
|
stdout_text = result.stdout.decode("utf-8", errors="replace").strip()
|
|
parsed_output = None
|
|
if stdout_text:
|
|
for line in reversed(stdout_text.split("\n")):
|
|
line = line.strip()
|
|
if line.startswith("{"):
|
|
try:
|
|
parsed_output = json.loads(line)
|
|
break
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if parsed_output is not None:
|
|
results = parsed_output.get("results", [])
|
|
ok = sum(1 for r in results if "error" not in r)
|
|
logger.info(f"[子进程{chunk_idx}] 完成: {ok}/{len(results)}")
|
|
return results
|
|
|
|
if result.returncode != 0:
|
|
stderr_text = result.stderr.decode("utf-8", errors="replace").strip()
|
|
raise RuntimeError(f"[子进程{chunk_idx}] 失败: {stderr_text[:200]}")
|
|
|
|
logger.warning(f"[子进程{chunk_idx}] 无输出")
|
|
return []
|
|
|
|
|
|
# ============================================================
|
|
# file_path 数据库记录
|
|
# ============================================================
|
|
|
|
def _relative_path(absolute_path: str, file_store: str) -> str:
|
|
"""绝对路径 → 相对于 FILE_STORE_DIR 的路径"""
|
|
fs = file_store.replace("\\", "/").rstrip("/")
|
|
ap = absolute_path.replace("\\", "/")
|
|
if ap.startswith(fs + "/"):
|
|
return ap[len(fs) + 1:]
|
|
return ap
|
|
|
|
|
|
def _write_single_path(inference_id: int, out_path: str, file_store: str, lock) -> None:
|
|
"""单张图产出后立即写入进度表"""
|
|
if not out_path or not os.path.isfile(out_path):
|
|
return
|
|
rel = _relative_path(out_path, file_store)
|
|
if not rel:
|
|
return
|
|
try:
|
|
from app.repositories.qgis_repository import qgis_repository
|
|
with lock:
|
|
qgis_repository.insert_file_paths(inference_id, [rel])
|
|
except Exception as e:
|
|
logger.error(f"[实时写入] 失败: {e}")
|