296 lines
11 KiB
Python
296 lines
11 KiB
Python
|
|
"""
|
|||
|
|
单张专题图测试脚本 - 独立版
|
|||
|
|
直接构建 model/config 并通过 subprocess 调用 qgis_runner.py
|
|||
|
|
"""
|
|||
|
|
import json, os, sys, tempfile, subprocess, re
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 配置(和 settings.toml 一致)
|
|||
|
|
# ============================================================
|
|||
|
|
DB_CONFIG = {
|
|||
|
|
"host": "47.92.216.173",
|
|||
|
|
"port": 7654,
|
|||
|
|
"username": "postgres",
|
|||
|
|
"password": "zhangsan",
|
|||
|
|
"database": "xian_new",
|
|||
|
|
}
|
|||
|
|
QGIS_ROOT = "D:/QGIS"
|
|||
|
|
XIAN_CENTER = [108.948024, 34.263161]
|
|||
|
|
GPKG_DIR = r"F:\project\xian\xian_algorithm_new\app\data\gpkg".replace("\\", "/")
|
|||
|
|
TEMPLATE_BASE = r"F:\project\xian\xian_algorithm_new\app\data\template"
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 模拟推理结果(inference_id=50)
|
|||
|
|
# ============================================================
|
|||
|
|
# 从实际数据库查询得知:
|
|||
|
|
# event_type='rainfall', condition={'region_code':'610116','rainfall':'120'},
|
|||
|
|
# occurred_time='2025-09-16 20:00:00'
|
|||
|
|
inference = {
|
|||
|
|
"id": 50,
|
|||
|
|
"event_type": "rainfall",
|
|||
|
|
"condition": {"region_code": "610116", "rainfall": "120"},
|
|||
|
|
"occurred_time": datetime(2025, 9, 16, 20, 0, 0),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 区域映射(从 settings.toml)
|
|||
|
|
# ============================================================
|
|||
|
|
AREA_MAP = {
|
|||
|
|
"610102": "新城区", "610103": "碑林区", "610104": "莲湖区",
|
|||
|
|
"610111": "灞桥区", "610112": "未央区", "610113": "雁塔区",
|
|||
|
|
"610114": "阎良区", "610115": "临潼区", "610116": "长安区",
|
|||
|
|
"610117": "高陵区", "610118": "鄠邑区", "610122": "蓝田县",
|
|||
|
|
"610124": "周至县",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
STATIC_LAYERS = {
|
|||
|
|
"水库": ("base.rivers", "rivers.gpkg"),
|
|||
|
|
"市州驻地": ("base.sx_capital", "sx_capital.gpkg"),
|
|||
|
|
"河流": ("base.river", "river.gpkg"),
|
|||
|
|
"active_fault": ("base.active_fault", "active_fault.gpkg"),
|
|||
|
|
"陕西省": ("base.sx", "sx.gpkg"),
|
|||
|
|
"乡镇驻地": ("base.sx_street", "sx_street.gpkg"),
|
|||
|
|
"区县驻地": ("base.sx_xa_county", "sx_xa_county.gpkg"),
|
|||
|
|
"县界": ("base.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
|
|||
|
|
"周边区县": ("base.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
|
|||
|
|
"周边市州": ("base.sx_zb_city", "sx_zb_city.gpkg"),
|
|||
|
|
"周边县区": ("base.sx_zb_county", "sx_zb_county.gpkg"),
|
|||
|
|
"traffic_expressway": ("base.traffic_expressway", "traffic_expressway.gpkg"),
|
|||
|
|
"traffic_provincial": ("base.traffic_provincial", "traffic_provincial.gpkg"),
|
|||
|
|
"traffic_railway": ("base.traffic_railway", "traffic_railway.gpkg"),
|
|||
|
|
"traffic_township": ("base.traffic_township", "traffic_township.gpkg"),
|
|||
|
|
"traffic_trunk_line": ("base.traffic_trunk_line", "traffic_trunk_line.gpkg"),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 辅助函数(从 qgis_map_export.py 直接复制)
|
|||
|
|
# ============================================================
|
|||
|
|
def format_disaster_time(occurred_time):
|
|||
|
|
if isinstance(occurred_time, datetime):
|
|||
|
|
return occurred_time.strftime("%Y%m%d%H%M%S")
|
|||
|
|
return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
|
|||
|
|
|
|||
|
|
def resolve_district(condition):
|
|||
|
|
code = condition.get("region_code")
|
|||
|
|
if code and str(code) in AREA_MAP:
|
|||
|
|
return AREA_MAP[str(code)]
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
def build_map_title(event_type, condition, template_name):
|
|||
|
|
district = resolve_district(condition)
|
|||
|
|
prefix = f"陕西西安{district}" if district else "陕西西安"
|
|||
|
|
if event_type == "rainfall":
|
|||
|
|
rainfall = condition.get("rainfall")
|
|||
|
|
if rainfall is not None and rainfall != "":
|
|||
|
|
return f"{prefix}{float(rainfall)}mm{template_name}"
|
|||
|
|
return f"{prefix}{template_name}"
|
|||
|
|
return f"{prefix}{template_name}"
|
|||
|
|
|
|||
|
|
def build_info_text(event_type, condition, occurred_time):
|
|||
|
|
"""构建信息面板文本(不显示灾害类型标签)"""
|
|||
|
|
lines = []
|
|||
|
|
if isinstance(occurred_time, datetime):
|
|||
|
|
time_str = f"{occurred_time.year}年{occurred_time.month:02d}月{occurred_time.day:02d}日{occurred_time.hour:02d}时{occurred_time.minute:02d}分"
|
|||
|
|
elif occurred_time:
|
|||
|
|
time_str = str(occurred_time)
|
|||
|
|
else:
|
|||
|
|
time_str = ""
|
|||
|
|
lines.append(f"时间:{time_str}")
|
|||
|
|
|
|||
|
|
if event_type == "rainfall":
|
|||
|
|
rainfall = condition.get("rainfall")
|
|||
|
|
if rainfall is not None and rainfall != "":
|
|||
|
|
lines.append(f"累计降雨量:{float(rainfall)}mm")
|
|||
|
|
duration = condition.get("duration")
|
|||
|
|
if duration is not None and duration != "":
|
|||
|
|
lines.append(f"已持续:{duration}")
|
|||
|
|
elif event_type == "earthquake":
|
|||
|
|
magnitude = condition.get("magnitude")
|
|||
|
|
if magnitude is not None and magnitude != "":
|
|||
|
|
lines.append(f"震级:{float(magnitude)}级")
|
|||
|
|
lon = condition.get("epicenter_lon")
|
|||
|
|
lat = condition.get("epicenter_lat")
|
|||
|
|
if lon is not None and lat is not None:
|
|||
|
|
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
|
|||
|
|
|
|||
|
|
return "\n".join(lines)
|
|||
|
|
|
|||
|
|
def derive_model_params(inference, batch_folder, template_path):
|
|||
|
|
event_type = inference["event_type"]
|
|||
|
|
condition = inference["condition"]
|
|||
|
|
occurred_time = inference["occurred_time"]
|
|||
|
|
|
|||
|
|
template_name = os.path.splitext(os.path.basename(template_path))[0]
|
|||
|
|
map_title = build_map_title(event_type, condition, template_name)
|
|||
|
|
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
|
|||
|
|
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
|
|||
|
|
|
|||
|
|
if isinstance(occurred_time, datetime):
|
|||
|
|
map_time = occurred_time.strftime("%Y-%m-%d %H:%M")
|
|||
|
|
else:
|
|||
|
|
map_time = str(occurred_time)
|
|||
|
|
|
|||
|
|
info_text = build_info_text(event_type, condition, occurred_time)
|
|||
|
|
|
|||
|
|
center_x, center_y = XIAN_CENTER
|
|||
|
|
if event_type == "earthquake":
|
|||
|
|
lon = condition.get("epicenter_lon", XIAN_CENTER[0])
|
|||
|
|
lat = condition.get("epicenter_lat", XIAN_CENTER[1])
|
|||
|
|
center_x, center_y = float(lon), float(lat)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"name": f"test_{inference['id']}_{safe_name}",
|
|||
|
|
"path": template_path,
|
|||
|
|
"outFile": out_file,
|
|||
|
|
"mapLayout": "A3",
|
|||
|
|
"mapTitle": map_title,
|
|||
|
|
"mapTime": map_time,
|
|||
|
|
"mapUnit": "西安市应急管理局",
|
|||
|
|
"info": info_text,
|
|||
|
|
"centerX": center_x,
|
|||
|
|
"centerY": center_y,
|
|||
|
|
"event": str(inference["id"]),
|
|||
|
|
"queueId": str(inference["id"]),
|
|||
|
|
"zoomRule": "11",
|
|||
|
|
"zoomValue": "50",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def build_qgis_config(batch_folder):
|
|||
|
|
static_layers_config = {}
|
|||
|
|
for name, (table, gpkg_file) in STATIC_LAYERS.items():
|
|||
|
|
static_layers_config[name] = {"file": gpkg_file, "table": table}
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"db": DB_CONFIG,
|
|||
|
|
"qgis": {"exportDpi": 300},
|
|||
|
|
"template_override": {
|
|||
|
|
"enabled": True,
|
|||
|
|
"original": {
|
|||
|
|
"host": "localhost",
|
|||
|
|
"port": 5432,
|
|||
|
|
"dbname": "yjzyk_xian",
|
|||
|
|
"schema": "base",
|
|||
|
|
},
|
|||
|
|
"actual": {
|
|||
|
|
"host": DB_CONFIG["host"],
|
|||
|
|
"port": DB_CONFIG["port"],
|
|||
|
|
"dbname": DB_CONFIG["database"],
|
|||
|
|
"schema": "qgis",
|
|||
|
|
"username": DB_CONFIG["username"],
|
|||
|
|
"password": DB_CONFIG["password"],
|
|||
|
|
},
|
|||
|
|
},
|
|||
|
|
"static_layers": {
|
|||
|
|
"enabled": True,
|
|||
|
|
"gpkg_dir": GPKG_DIR,
|
|||
|
|
"layers": static_layers_config,
|
|||
|
|
},
|
|||
|
|
"batch_folder": batch_folder,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 主逻辑
|
|||
|
|
# ============================================================
|
|||
|
|
event_type = inference["event_type"]
|
|||
|
|
disaster_time = format_disaster_time(inference["occurred_time"])
|
|||
|
|
batch_folder = os.path.join("G:/files", "xian/qgis/map", disaster_time).replace("\\", "/")
|
|||
|
|
os.makedirs(batch_folder, exist_ok=True)
|
|||
|
|
|
|||
|
|
template_dir = os.path.join(TEMPLATE_BASE, event_type)
|
|||
|
|
template_path = os.path.join(template_dir, "暴雨内涝潜在隐患点及人口分布图.qgz").replace("\\", "/")
|
|||
|
|
|
|||
|
|
model = derive_model_params(inference, batch_folder, template_path)
|
|||
|
|
config = build_qgis_config(batch_folder)
|
|||
|
|
|
|||
|
|
print("=" * 60)
|
|||
|
|
print(f"模板: {template_path}")
|
|||
|
|
print(f"输出: {model['outFile']}")
|
|||
|
|
print(f"标题: {model['mapTitle']}")
|
|||
|
|
print(f"时间: {model['mapTime']}")
|
|||
|
|
print(f"单位: {model['mapUnit']}")
|
|||
|
|
print(f"info:\n{model['info']}")
|
|||
|
|
print(f"中心: ({model['centerX']}, {model['centerY']})")
|
|||
|
|
print(f"event: {model['event']}, queueId: {model['queueId']}")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
request_data = json.dumps(
|
|||
|
|
{"config": config, "model": model},
|
|||
|
|
ensure_ascii=False,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
tmp_json = tempfile.NamedTemporaryFile(
|
|||
|
|
suffix=".json", delete=False, mode="w", encoding="utf-8"
|
|||
|
|
)
|
|||
|
|
tmp_json.write(request_data)
|
|||
|
|
tmp_json.close()
|
|||
|
|
|
|||
|
|
runner_script = os.path.join(
|
|||
|
|
os.path.dirname(__file__),
|
|||
|
|
r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
|
|||
|
|
).replace("\\", "/")
|
|||
|
|
# Fix: use absolute path
|
|||
|
|
runner_script = r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
|
|||
|
|
|
|||
|
|
bat_dir = os.path.join(tempfile.gettempdir(), "qgis_runner")
|
|||
|
|
os.makedirs(bat_dir, exist_ok=True)
|
|||
|
|
bat_path = os.path.join(bat_dir, "run_qgis_test.bat")
|
|||
|
|
|
|||
|
|
qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr").replace("/", "\\")
|
|||
|
|
python_dir = os.path.join(QGIS_ROOT, "apps", "Python312").replace("/", "\\")
|
|||
|
|
qt5_plugins = os.path.join(QGIS_ROOT, "apps", "Qt5", "plugins").replace("/", "\\")
|
|||
|
|
qtplugins = os.path.join(qgis_app_dir, "qtplugins").replace("/", "\\")
|
|||
|
|
gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal").replace("/", "\\")
|
|||
|
|
qgis_python_dir = os.path.join(qgis_app_dir, "python").replace("/", "\\")
|
|||
|
|
qgis_bin = os.path.join(qgis_app_dir, "bin").replace("/", "\\")
|
|||
|
|
qt5_bin = os.path.join(QGIS_ROOT, "apps", "Qt5", "bin").replace("/", "\\")
|
|||
|
|
gdal_lib = os.path.join(QGIS_ROOT, "apps", "gdal", "lib").replace("/", "\\")
|
|||
|
|
python_exe = os.path.join(python_dir, "python3.exe").replace("/", "\\")
|
|||
|
|
|
|||
|
|
bat_content = f"""@echo off
|
|||
|
|
set "PYTHONHOME={python_dir}"
|
|||
|
|
set "PYTHONPATH={qgis_python_dir}"
|
|||
|
|
set "QGIS_PREFIX_PATH={qgis_app_dir}"
|
|||
|
|
set "QT_PLUGIN_PATH={qtplugins};{qt5_plugins}"
|
|||
|
|
set "GDAL_DATA={gdal_data}"
|
|||
|
|
set "PYTHONUTF8=1"
|
|||
|
|
set "GDAL_FILENAME_IS_UTF8=YES"
|
|||
|
|
set "VSI_CACHE=TRUE"
|
|||
|
|
set "VSI_CACHE_SIZE=1000000"
|
|||
|
|
set "PATH={qgis_bin};{qt5_bin};{gdal_lib};%PATH%"
|
|||
|
|
"{python_exe}" "{runner_script}" "{tmp_json.name}"
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
with open(bat_path, "w", encoding="utf-8") as f:
|
|||
|
|
f.write(bat_content)
|
|||
|
|
|
|||
|
|
cmd = ["cmd.exe", "/c", bat_path]
|
|||
|
|
print(f"执行: {' '.join(cmd[:3])} ...")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
result = subprocess.run(cmd, capture_output=True, timeout=300)
|
|||
|
|
stderr_text = result.stderr.decode("utf-8", errors="replace")
|
|||
|
|
stdout_text = result.stdout.decode("utf-8", errors="replace")
|
|||
|
|
|
|||
|
|
print("=== stderr ===")
|
|||
|
|
for line in stderr_text.split("\n"):
|
|||
|
|
print(f" {line}")
|
|||
|
|
print()
|
|||
|
|
print("=== stdout ===")
|
|||
|
|
print(stdout_text[:500] if stdout_text else "(empty)")
|
|||
|
|
|
|||
|
|
if result.returncode != 0:
|
|||
|
|
print(f"\n!!! FAIL: exit={result.returncode}")
|
|||
|
|
else:
|
|||
|
|
print(f"\n=== SUCCESS ===")
|
|||
|
|
if os.path.isfile(model["outFile"]):
|
|||
|
|
print(f"文件: {model['outFile']} ({os.path.getsize(model['outFile'])/1024:.1f} KB)")
|
|||
|
|
else:
|
|||
|
|
print(f"文件不存在: {model['outFile']}")
|
|||
|
|
finally:
|
|||
|
|
try:
|
|||
|
|
os.remove(tmp_json.name)
|
|||
|
|
except OSError:
|
|||
|
|
pass
|