QGIS完成初步重构

This commit is contained in:
wzy-warehouse
2026-06-20 15:50:24 +08:00
parent d20b5744bb
commit 18d8bcb1a3
45 changed files with 1688 additions and 454 deletions
+42
View File
@@ -0,0 +1,42 @@
"""
在所有 qgis 动态表中添加 tid 自增主键列
QGIS 的 datasource key='tid' 需要此列
"""
import psycopg2
TABLES = [
'hazard_hydrops',
'lifeline_outfall',
'lifeline_pipe',
'risk_census_population',
'sx_xa_towns',
]
c = psycopg2.connect(
host='47.92.216.173', port=7654,
user='postgres', password='zhangsan',
database='xian_new'
)
c.autocommit = True
cur = c.cursor()
for t in TABLES:
try:
# 检查 tid 列是否存在
cur.execute(f"""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema='qgis' AND table_name='{t}' AND column_name='tid'
)
""")
exists = cur.fetchone()[0]
if exists:
print(f"qgis.{t}: tid 列已存在, 跳过")
else:
cur.execute(f'ALTER TABLE qgis."{t}" ADD COLUMN tid SERIAL')
print(f"qgis.{t}: tid 列添加成功")
except Exception as e:
print(f"qgis.{t}: 失败 - {e}")
c.close()
+72
View File
@@ -0,0 +1,72 @@
"""
批量检查所有 rainfall 模板的图层,看哪些动态图层需要显示、表是否存在
"""
import zipfile, re, os, psycopg2
TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
# 收集所有非GPKG动态图层的表名
all_dynamic = set()
for fname in sorted(os.listdir(TEMPLATE_DIR)):
if not fname.endswith('.qgz') or fname.startswith('tmp'):
continue
path = os.path.join(TEMPLATE_DIR, fname)
z = zipfile.ZipFile(path)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
for m in maplayer_re.finditer(content):
block = m.group(1)
name_m = re.search(r'<layername>([^<]+)</layername>', block)
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
provider = provider_m.group(1) if provider_m else '?'
if provider != 'postgres':
continue
ds = ds_m.group(1).strip() if ds_m else ''
table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
layer_name = name_m.group(1) if name_m else '?'
if table_m:
schema = table_m.group(1)
table = table_m.group(2)
key = f"{schema}.{table}"
all_dynamic.add((layer_name, key))
# 已知的GPKG静态层(会被替换为ogr)
static_tables = {
'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
}
# DB检查
c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
c.autocommit = True
cur = c.cursor()
print(f"{'图层名':20s} {'原表':35s} {'qgis中存在':12s} {'行数':>8s}")
print("-" * 80)
for layer_name, table_key in sorted(all_dynamic):
if table_key in static_tables:
continue # 会被GPKG替换,跳过
schema, table = table_key.split('.', 1)
try:
cur.execute(f'SELECT count(*) FROM qgis.{table}')
count = cur.fetchone()[0]
exists = "YES"
except:
exists = "NO"
count = 0
marker = " !!!" if exists == "NO" else ""
print(f"{layer_name:20s} {table_key:35s} {exists:12s} {count:>8,d}{marker}")
c.close()
+54
View File
@@ -0,0 +1,54 @@
"""检查模板中所有图层的渲染顺序(z-order)"""
import zipfile, re, os
TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
fname = "暴雨内涝潜在隐患点及人口分布图.qgz"
path = os.path.join(TEMPLATE_DIR, fname)
z = zipfile.ZipFile(path)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
# 提取所有 maplayer 块(保持顺序 — 这就是渲染顺序)
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
print(f"模板: {fname}")
print(f"{'#':>3s} {'图层名':20s} {'类型':10s} {'可见':4s} {'表名'}")
print("-" * 80)
for i, m in enumerate(maplayer_re.finditer(content)):
block = m.group(1)
# 图层名
name_m = re.search(r'<layername>([^<]+)</layername>', block)
name = name_m.group(1) if name_m else '?'
# provider
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
provider = provider_m.group(1) if provider_m else '?'
# datasource table
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
ds = ds_m.group(1).strip() if ds_m else ''
table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
table = f"{table_m.group(1)}.{table_m.group(2)}" if table_m else '?'
# 可见性
visible = 'Y' if 'visible="1"' in m.group(0) or 'visible="1"' in block else 'N'
# 判断图层类型
is_static = table in [
'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
]
layer_type = "底图" if is_static else "动态"
marker = " <<<" if not is_static and provider == 'postgres' else ""
print(f"{i+1:>3d} {name:20s} {provider:10s} {visible:4s} {table:35s} [{layer_type}]{marker}")
print()
print("提示: 图层按从上到下的顺序渲染(序号小的在底层,序号大的在顶层)")
print("动态图层如果在底图下方,会被底图完全遮盖")
+65
View File
@@ -0,0 +1,65 @@
"""检查布局 Map 项的图层配置"""
import zipfile, re, os
TEMPLATE_DIR = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
fname = "暴雨内涝潜在隐患点及人口分布图.qgz"
path = os.path.join(TEMPLATE_DIR, fname)
z = zipfile.ZipFile(path)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
# 提取 Layout 元素(找 Map 项)
# QGIS 有两种布局格式:<Composer> (QGIS 2.x style) 和 <Layout> (QGIS 3.x style)
layout_re = re.compile(r'<(?:Composer|Layout)[^>]*name="([^"]*)"[^>]*>(.*?)</(?:Composer|Layout)>', re.DOTALL)
for m in layout_re.finditer(content):
layout_name = m.group(1)
layout_content = m.group(2)
print(f"=== 布局: {layout_name} ===")
# 检查 Map 项
map_item_re = re.compile(r'<ComposerMap[^>]*>(.*?)</ComposerMap>', re.DOTALL)
for mm in map_item_re.finditer(layout_content):
map_content = mm.group(1)
# 检查是否有 lockedLayers
locked = re.findall(r'<lockedLayers>(.*?)</lockedLayers>', map_content, re.DOTALL)
if locked:
print(f" lockedLayers: {locked[0][:500]}")
# 检查 keepLayerSet
keep_set = re.findall(r'keepLayerSet="([^"]*)"', mm.group(0))
if keep_set:
print(f" keepLayerSet: {keep_set[0]}")
# 检查 followPreset
preset = re.findall(r'followPreset="([^"]*)"', mm.group(0))
if preset:
print(f" followPreset: {preset[0]}")
# 检查 followPresetName
preset_name = re.findall(r'followPresetName="([^"]*)"', mm.group(0))
if preset_name:
print(f" followPresetName: {preset_name[0]}")
# 检查 <layerSet>
layer_set = re.findall(r'<layerSet>(.*?)</layerSet>', map_content, re.DOTALL)
if layer_set:
print(f" layerSet: {layer_set[0][:500]}")
# 检查 <ComposerMapGrid>
grids = re.findall(r'<ComposerMapGrid[^>]*/>', map_content)
print(f" grids: {len(grids)}")
# 也检查 <Layout> 格式 (QGIS 3.x)
map_item2_re = re.compile(r'<LayoutItem[^>]*type="[^"]*map[^"]*"[^>]*>(.*?)</LayoutItem>', re.DOTALL | re.IGNORECASE)
for mm in map_item2_re.finditer(layout_content):
map_content = mm.group(1)
print(f" [LayoutItem Map] attributes: {mm.group(0)[:300]}")
# 打印模板中所有的 map theme / visibility preset
presets = re.findall(r'<(?:visibility-presets|map-theme-collection).*?</(?:visibility-presets|map-theme-collection)>', content, re.DOTALL)
if presets:
print("\n=== 可见性预设 ===")
print(presets[0][:500])
+71
View File
@@ -0,0 +1,71 @@
"""检查暴雨避难场所分布图的所有动态图层是否有数据"""
import zipfile, re, os, psycopg2
TEMPLATE_PATH = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall\暴雨避难场所分布图.qgz"
z = zipfile.ZipFile(TEMPLATE_PATH)
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
# 收集所有 PostgreSQL 动态图层
layers = []
for m in maplayer_re.finditer(content):
block = m.group(1)
name_m = re.search(r'<layername>([^<]+)</layername>', block)
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
provider = provider_m.group(1) if provider_m else '?'
if provider != 'postgres':
continue
name = name_m.group(1) if name_m else '?'
ds = ds_m.group(1).strip() if ds_m else ''
table_m = re.search(r'table="(\w+)"\."(\w+)"', ds)
table_key = f"{table_m.group(1)}.{table_m.group(2)}" if table_m else '?'
layers.append((name, table_key))
# 已知 GPKG 静态层
static_tables = {
'base.rivers', 'base.river', 'base.sx', 'base.sx_capital',
'base.sx_street', 'base.sx_xa_county', 'base.sx_xa_county_boundary',
'base.sx_zb_county_boundary', 'base.sx_zb_city', 'base.sx_zb_county',
'base.active_fault', 'base.traffic_expressway', 'base.traffic_provincial',
'base.traffic_railway', 'base.traffic_township', 'base.traffic_trunk_line',
}
c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
c.autocommit = True
cur = c.cursor()
print(f"模板: 暴雨避难场所分布图")
print(f"{'图层名':20s} {'原表':40s} {'qgis有数据':10s} {'行数':>6s}")
print("-" * 85)
for name, table_key in layers:
if table_key in static_tables:
continue
schema, table = table_key.split('.', 1)
mapped_table = 'hazard_hydrops' if table == 'hazard_waterlogging' else table
try:
cur.execute(f'SELECT count(*) FROM qgis.{mapped_table}')
count = cur.fetchone()[0]
exists = "YES" if count > 0 else "EMPTY"
except:
try:
cur.execute(f'SELECT count(*) FROM {schema}.{table}')
count = cur.fetchone()[0]
exists = f"{schema}"
except:
exists = "NO"
count = 0
marker = " <-- 无数据!" if count == 0 else ""
print(f"{name:20s} {table_key:40s} {exists:10s} {count:>6,d}{marker}")
c.close()
+38
View File
@@ -0,0 +1,38 @@
import psycopg2
c = psycopg2.connect(host='47.92.216.173', port=7654, user='postgres', password='zhangsan', database='xian_new')
cur = c.cursor()
tables = ['lifeline_outfall', 'lifeline_pipe', 'risk_census_population', 'hazard_waterlogging', 'sx_street', 'sx_xa_county']
print("=== qgis schema ===")
for t in tables:
try:
cur.execute(f'SELECT COUNT(*) FROM qgis."{t}"')
count = cur.fetchone()[0]
cur.execute(f"SELECT GeometryType(geom) FROM qgis.\"{t}\" LIMIT 1")
g = cur.fetchone()
print(f" qgis.{t}: {count} 行, geom={g[0] if g else 'N/A'}")
except Exception as e:
print(f" qgis.{t}: 不存在")
print("\n=== base schema (检查积水点原始表) ===")
for t in ['hazard_waterlogging']:
try:
cur.execute(f'SELECT COUNT(*) FROM base."{t}"')
print(f" base.{t}: {cur.fetchone()[0]}")
except:
print(f" base.{t}: 不存在")
# 查找所有含 water 或 hazard 的表
print("\n=== 搜索含 water/hazard 的表 ===")
cur.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE (table_name LIKE '%water%' OR table_name LIKE '%hazard%')
AND table_schema IN ('qgis', 'base', 'public')
ORDER BY table_schema, table_name
""")
for r in cur.fetchall():
print(f" {r[0]}.{r[1]}")
c.close()
+42
View File
@@ -0,0 +1,42 @@
"""提取模板 .qgs 中所有图层名称、类型、数据源"""
import zipfile, re, os, sys
template_dir = r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall"
for fname in sorted(os.listdir(template_dir)):
if not fname.endswith('.qgz') or fname.startswith('tmp'):
continue
path = os.path.join(template_dir, fname)
z = zipfile.ZipFile(path)
# 找到 .qgs 文件
qgs_name = [n for n in z.namelist() if n.endswith('.qgs')][0]
content = z.read(qgs_name).decode('utf-8')
# 提取 maplayer 块
maplayer_re = re.compile(r'<maplayer[^>]*>(.*?)</maplayer>', re.DOTALL)
layers = []
for m in maplayer_re.finditer(content):
block = m.group(1)
# 图层名
name_m = re.search(r'<layername>([^<]+)</layername>', block)
# provider
provider_m = re.search(r'<provider[^>]*>(\w+)</provider>', block)
# datasource (截取前120字符)
ds_m = re.search(r'<datasource>(.*?)</datasource>', block, re.DOTALL)
name = name_m.group(1) if name_m else '?'
provider = provider_m.group(1) if provider_m else '?'
ds = ds_m.group(1).strip()[:120] if ds_m else '?'
layers.append((name, provider, ds))
print(f"\n{'='*60}")
print(f"模板: {fname}")
print(f"{'='*60}")
for name, provider, ds in layers:
print(f" [{provider:10s}] {name}")
if ds:
print(f" ds: {ds}")
# 只分析第一个模板(所有 rainfall 模板结构相同)
if fname == '暴雨内涝潜在隐患点及人口分布图.qgz':
break
+16
View File
@@ -0,0 +1,16 @@
import zipfile, re
z = zipfile.ZipFile(r"F:\project\xian\xian_algorithm_new\app\data\template\rainfall\暴雨避难场所分布图.qgz")
c = z.read([n for n in z.namelist() if n.endswith('.qgs')][0]).decode()
layers = re.findall(r'<maplayer[^>]*>(.*?)</maplayer>', c, re.DOTALL)
for i, l in enumerate(layers):
nm = re.search(r'<layername>([^<]+)</layername>', l)
pv = re.search(r'<provider[^>]*>(\w+)</provider>', l)
ds = re.search(r'<datasource>(.*?)</datasource>', l, re.DOTALL)
name = nm.group(1) if nm else '?'
prov = pv.group(1) if pv else '?'
ds_text = ds.group(1).strip()[:120] if ds else ''
print(f"{i:2d}. [{prov:10s}] {name}")
if ds_text:
print(f" ds: {ds_text}")
+295
View File
@@ -0,0 +1,295 @@
"""
单张专题图测试脚本 - 独立版
直接构建 model/config 并通过 subprocess 调用 qgis_runner.py
"""
import json, os, sys, tempfile, subprocess, re
from datetime import datetime
# ============================================================
# 配置(和 settings.toml 一致)
# ============================================================
DB_CONFIG = {
"host": "47.92.216.173",
"port": 7654,
"username": "postgres",
"password": "zhangsan",
"database": "xian_new",
}
QGIS_ROOT = "D:/QGIS"
XIAN_CENTER = [108.948024, 34.263161]
GPKG_DIR = r"F:\project\xian\xian_algorithm_new\app\data\gpkg".replace("\\", "/")
TEMPLATE_BASE = r"F:\project\xian\xian_algorithm_new\app\data\template"
# ============================================================
# 模拟推理结果(inference_id=50
# ============================================================
# 从实际数据库查询得知:
# event_type='rainfall', condition={'region_code':'610116','rainfall':'120'},
# occurred_time='2025-09-16 20:00:00'
inference = {
"id": 50,
"event_type": "rainfall",
"condition": {"region_code": "610116", "rainfall": "120"},
"occurred_time": datetime(2025, 9, 16, 20, 0, 0),
}
# ============================================================
# 区域映射(从 settings.toml
# ============================================================
AREA_MAP = {
"610102": "新城区", "610103": "碑林区", "610104": "莲湖区",
"610111": "灞桥区", "610112": "未央区", "610113": "雁塔区",
"610114": "阎良区", "610115": "临潼区", "610116": "长安区",
"610117": "高陵区", "610118": "鄠邑区", "610122": "蓝田县",
"610124": "周至县",
}
STATIC_LAYERS = {
"水库": ("base.rivers", "rivers.gpkg"),
"市州驻地": ("base.sx_capital", "sx_capital.gpkg"),
"河流": ("base.river", "river.gpkg"),
"active_fault": ("base.active_fault", "active_fault.gpkg"),
"陕西省": ("base.sx", "sx.gpkg"),
"乡镇驻地": ("base.sx_street", "sx_street.gpkg"),
"区县驻地": ("base.sx_xa_county", "sx_xa_county.gpkg"),
"县界": ("base.sx_xa_county_boundary", "sx_xa_county_boundary.gpkg"),
"周边区县": ("base.sx_zb_county_boundary", "sx_zb_county_boundary.gpkg"),
"周边市州": ("base.sx_zb_city", "sx_zb_city.gpkg"),
"周边县区": ("base.sx_zb_county", "sx_zb_county.gpkg"),
"traffic_expressway": ("base.traffic_expressway", "traffic_expressway.gpkg"),
"traffic_provincial": ("base.traffic_provincial", "traffic_provincial.gpkg"),
"traffic_railway": ("base.traffic_railway", "traffic_railway.gpkg"),
"traffic_township": ("base.traffic_township", "traffic_township.gpkg"),
"traffic_trunk_line": ("base.traffic_trunk_line", "traffic_trunk_line.gpkg"),
}
# ============================================================
# 辅助函数(从 qgis_map_export.py 直接复制)
# ============================================================
def format_disaster_time(occurred_time):
if isinstance(occurred_time, datetime):
return occurred_time.strftime("%Y%m%d%H%M%S")
return str(occurred_time).replace("-", "").replace(":", "").replace(" ", "")[:14]
def resolve_district(condition):
code = condition.get("region_code")
if code and str(code) in AREA_MAP:
return AREA_MAP[str(code)]
return ""
def build_map_title(event_type, condition, template_name):
district = resolve_district(condition)
prefix = f"陕西西安{district}" if district else "陕西西安"
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
return f"{prefix}{float(rainfall)}mm{template_name}"
return f"{prefix}{template_name}"
return f"{prefix}{template_name}"
def build_info_text(event_type, condition, occurred_time):
"""构建信息面板文本(不显示灾害类型标签)"""
lines = []
if isinstance(occurred_time, datetime):
time_str = f"{occurred_time.year}{occurred_time.month:02d}{occurred_time.day:02d}{occurred_time.hour:02d}{occurred_time.minute:02d}"
elif occurred_time:
time_str = str(occurred_time)
else:
time_str = ""
lines.append(f"时间:{time_str}")
if event_type == "rainfall":
rainfall = condition.get("rainfall")
if rainfall is not None and rainfall != "":
lines.append(f"累计降雨量:{float(rainfall)}mm")
duration = condition.get("duration")
if duration is not None and duration != "":
lines.append(f"已持续:{duration}")
elif event_type == "earthquake":
magnitude = condition.get("magnitude")
if magnitude is not None and magnitude != "":
lines.append(f"震级:{float(magnitude)}")
lon = condition.get("epicenter_lon")
lat = condition.get("epicenter_lat")
if lon is not None and lat is not None:
lines.append(f"位置:经度{float(lon)}°, 纬度{float(lat)}°")
return "\n".join(lines)
def derive_model_params(inference, batch_folder, template_path):
event_type = inference["event_type"]
condition = inference["condition"]
occurred_time = inference["occurred_time"]
template_name = os.path.splitext(os.path.basename(template_path))[0]
map_title = build_map_title(event_type, condition, template_name)
safe_name = re.sub(r'[\\/:*?"<>|]', '_', template_name)
out_file = os.path.join(batch_folder, f"{safe_name}.jpg").replace("\\", "/")
if isinstance(occurred_time, datetime):
map_time = occurred_time.strftime("%Y-%m-%d %H:%M")
else:
map_time = str(occurred_time)
info_text = build_info_text(event_type, condition, occurred_time)
center_x, center_y = XIAN_CENTER
if event_type == "earthquake":
lon = condition.get("epicenter_lon", XIAN_CENTER[0])
lat = condition.get("epicenter_lat", XIAN_CENTER[1])
center_x, center_y = float(lon), float(lat)
return {
"name": f"test_{inference['id']}_{safe_name}",
"path": template_path,
"outFile": out_file,
"mapLayout": "A3",
"mapTitle": map_title,
"mapTime": map_time,
"mapUnit": "西安市应急管理局",
"info": info_text,
"centerX": center_x,
"centerY": center_y,
"event": str(inference["id"]),
"queueId": str(inference["id"]),
"zoomRule": "11",
"zoomValue": "50",
}
def build_qgis_config(batch_folder):
static_layers_config = {}
for name, (table, gpkg_file) in STATIC_LAYERS.items():
static_layers_config[name] = {"file": gpkg_file, "table": table}
return {
"db": DB_CONFIG,
"qgis": {"exportDpi": 300},
"template_override": {
"enabled": True,
"original": {
"host": "localhost",
"port": 5432,
"dbname": "yjzyk_xian",
"schema": "base",
},
"actual": {
"host": DB_CONFIG["host"],
"port": DB_CONFIG["port"],
"dbname": DB_CONFIG["database"],
"schema": "qgis",
"username": DB_CONFIG["username"],
"password": DB_CONFIG["password"],
},
},
"static_layers": {
"enabled": True,
"gpkg_dir": GPKG_DIR,
"layers": static_layers_config,
},
"batch_folder": batch_folder,
}
# ============================================================
# 主逻辑
# ============================================================
event_type = inference["event_type"]
disaster_time = format_disaster_time(inference["occurred_time"])
batch_folder = os.path.join("G:/files", "xian/qgis/map", disaster_time).replace("\\", "/")
os.makedirs(batch_folder, exist_ok=True)
template_dir = os.path.join(TEMPLATE_BASE, event_type)
template_path = os.path.join(template_dir, "暴雨内涝潜在隐患点及人口分布图.qgz").replace("\\", "/")
model = derive_model_params(inference, batch_folder, template_path)
config = build_qgis_config(batch_folder)
print("=" * 60)
print(f"模板: {template_path}")
print(f"输出: {model['outFile']}")
print(f"标题: {model['mapTitle']}")
print(f"时间: {model['mapTime']}")
print(f"单位: {model['mapUnit']}")
print(f"info:\n{model['info']}")
print(f"中心: ({model['centerX']}, {model['centerY']})")
print(f"event: {model['event']}, queueId: {model['queueId']}")
print("=" * 60)
request_data = json.dumps(
{"config": config, "model": model},
ensure_ascii=False,
)
tmp_json = tempfile.NamedTemporaryFile(
suffix=".json", delete=False, mode="w", encoding="utf-8"
)
tmp_json.write(request_data)
tmp_json.close()
runner_script = os.path.join(
os.path.dirname(__file__),
r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
).replace("\\", "/")
# Fix: use absolute path
runner_script = r"F:\project\xian\xian_algorithm_new\app\services\qgis\qgis_runner.py"
bat_dir = os.path.join(tempfile.gettempdir(), "qgis_runner")
os.makedirs(bat_dir, exist_ok=True)
bat_path = os.path.join(bat_dir, "run_qgis_test.bat")
qgis_app_dir = os.path.join(QGIS_ROOT, "apps", "qgis-ltr").replace("/", "\\")
python_dir = os.path.join(QGIS_ROOT, "apps", "Python312").replace("/", "\\")
qt5_plugins = os.path.join(QGIS_ROOT, "apps", "Qt5", "plugins").replace("/", "\\")
qtplugins = os.path.join(qgis_app_dir, "qtplugins").replace("/", "\\")
gdal_data = os.path.join(QGIS_ROOT, "apps", "gdal", "share", "gdal").replace("/", "\\")
qgis_python_dir = os.path.join(qgis_app_dir, "python").replace("/", "\\")
qgis_bin = os.path.join(qgis_app_dir, "bin").replace("/", "\\")
qt5_bin = os.path.join(QGIS_ROOT, "apps", "Qt5", "bin").replace("/", "\\")
gdal_lib = os.path.join(QGIS_ROOT, "apps", "gdal", "lib").replace("/", "\\")
python_exe = os.path.join(python_dir, "python3.exe").replace("/", "\\")
bat_content = f"""@echo off
set "PYTHONHOME={python_dir}"
set "PYTHONPATH={qgis_python_dir}"
set "QGIS_PREFIX_PATH={qgis_app_dir}"
set "QT_PLUGIN_PATH={qtplugins};{qt5_plugins}"
set "GDAL_DATA={gdal_data}"
set "PYTHONUTF8=1"
set "GDAL_FILENAME_IS_UTF8=YES"
set "VSI_CACHE=TRUE"
set "VSI_CACHE_SIZE=1000000"
set "PATH={qgis_bin};{qt5_bin};{gdal_lib};%PATH%"
"{python_exe}" "{runner_script}" "{tmp_json.name}"
"""
with open(bat_path, "w", encoding="utf-8") as f:
f.write(bat_content)
cmd = ["cmd.exe", "/c", bat_path]
print(f"执行: {' '.join(cmd[:3])} ...")
print()
try:
result = subprocess.run(cmd, capture_output=True, timeout=300)
stderr_text = result.stderr.decode("utf-8", errors="replace")
stdout_text = result.stdout.decode("utf-8", errors="replace")
print("=== stderr ===")
for line in stderr_text.split("\n"):
print(f" {line}")
print()
print("=== stdout ===")
print(stdout_text[:500] if stdout_text else "(empty)")
if result.returncode != 0:
print(f"\n!!! FAIL: exit={result.returncode}")
else:
print(f"\n=== SUCCESS ===")
if os.path.isfile(model["outFile"]):
print(f"文件: {model['outFile']} ({os.path.getsize(model['outFile'])/1024:.1f} KB)")
else:
print(f"文件不存在: {model['outFile']}")
finally:
try:
os.remove(tmp_json.name)
except OSError:
pass