Files
xian_algorithm_new/app/services/qgis/qgis_daemon.py
T
2026-06-21 15:24:09 +08:00

197 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
QGIS 常驻 Daemon 进程。
由主进程在 FastAPI lifespan 中启动,通过 stdin/stdout JSON 行协议通信。
启动时预加载所有模板到内存缓存,后续请求秒级响应。
协议(每行一个完整 JSON):
输入(stdin: {"config": {...}, "models": [...], "result_file": "path/to/result.json"}
输出(result_file: {"results": [...]}
控制: {"action": "shutdown"}
生命周期:
1. FastAPI 启动 → 启动 daemon 子进程
2. daemon 初始化 QGIS,加载磁盘缓存
3. 收到请求 → 处理模板 → 结果写入 result_file → stdout 输出完成标记
4. FastAPI 关闭 → 发送 shutdown → daemon 退出
"""
import json
import os
import sys
import time
# ============================================================
# 环境初始化(必须在 QGIS import 之前)
# ============================================================
QGIS_ROOT = os.environ.get("QGIS_ROOT", "D:/QGIS")
def _detect_qgis_app_dir():
for name in ("qgis-ltr", "qgis"):
d = os.path.join(QGIS_ROOT, "apps", name)
if os.path.isdir(d):
return d
raise RuntimeError(f"未找到 QGIS 应用目录: {QGIS_ROOT}\\apps\\qgis-ltr 或 qgis")
def _setup_environment():
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
if sys.platform == "win32":
import ctypes
qgis_app_dir = _detect_qgis_app_dir()
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
if os.path.isdir(dll_dir):
os.add_dll_directory(dll_dir)
_preload_dlls = [
"qgis_core.dll", "qgispython.dll",
"Qt5Core.dll", "Qt5Gui.dll", "Qt5Widgets.dll",
"Qt5Network.dll", "Qt5Svg.dll", "Qt5Xml.dll",
"Qt5Concurrent.dll", "Qt5PrintSupport.dll",
]
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
for dll_name in _preload_dlls:
dll_path = os.path.join(dll_dir, dll_name)
if os.path.isfile(dll_path):
try:
ctypes.WinDLL(dll_path)
except OSError:
pass
def _setup_python_path():
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
if project_root not in sys.path:
sys.path.insert(0, project_root)
qgis_app_dir = _detect_qgis_app_dir()
qgis_python = os.path.join(qgis_app_dir, "python")
if os.path.isdir(qgis_python) and qgis_python not in sys.path:
sys.path.insert(0, qgis_python)
def _scan_templates() -> list[str]:
"""扫描所有模板文件,返回绝对路径列表"""
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
template_base = os.path.join(project_root, "app", "data", "template")
templates = []
for event_type in ["rainfall", "earthquake"]:
tdir = os.path.join(template_base, event_type)
if not os.path.isdir(tdir):
continue
for tf in sorted(os.listdir(tdir)):
if tf.endswith(".qgz") and not tf.startswith("tmp"):
templates.append(os.path.join(tdir, tf).replace("\\", "/"))
return templates
# ============================================================
# 主逻辑
# ============================================================
def main():
_setup_environment()
_setup_python_path()
from qgis.core import QgsApplication, QgsProject
# 初始化 QGIS(只做一次)
qgis_app_dir = _detect_qgis_app_dir()
QgsApplication.setPrefixPath(qgis_app_dir, True)
qgs_app = QgsApplication([], False)
qgs_app.initQgis()
from app.services.qgis.map_service import MapService
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
print("[qgis_daemon] QGIS 已启动", file=sys.stderr, flush=True)
# ── 请求处理循环 ──
print("[qgis_daemon] 等待请求...", file=sys.stderr, flush=True)
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
except json.JSONDecodeError as e:
resp = json.dumps({"status": "error", "message": f"JSON 解析失败: {e}"})
sys.stdout.write(resp + "\n")
sys.stdout.flush()
continue
action = request.get("action", "map")
if action == "shutdown":
print("[qgis_daemon] 收到 shutdown,退出", file=sys.stderr, flush=True)
break
if action == "map":
config = request["config"]
models = request["models"]
result_file = request.get("result_file", "")
results = []
t_batch = time.time()
if "static_layers" not in config or not config["static_layers"].get("gpkg_dir"):
config["static_layers"] = build_static_layers_config(get_gpkg_dir())
service = MapService(config)
for i, model in enumerate(models):
t_model = time.time()
try:
name = service.generate(model)
results.append({"name": name, "output": model["outFile"]})
elapsed = time.time() - t_model
print(f"[qgis_daemon] [{i+1}/{len(models)}] {name} ({elapsed:.1f}s)",
file=sys.stderr, flush=True)
except Exception as e:
elapsed = time.time() - t_model
results.append({"name": model.get("name", ""), "output": "", "error": str(e)})
print(f"[qgis_daemon] [{i+1}/{len(models)}] 失败: {e}",
file=sys.stderr, flush=True)
total = time.time() - t_batch
ok = sum(1 for r in results if "error" not in r)
print(f"[qgis_daemon] 完成: {ok}/{len(models)}, {total:.1f}s",
file=sys.stderr, flush=True)
resp = {"results": results}
# 写入结果文件(可靠),然后 stdout 发完成信号
if result_file:
try:
with open(result_file, "w", encoding="utf-8") as f:
json.dump(resp, f, ensure_ascii=False)
except Exception as e:
print(f"[qgis_daemon] 写结果文件失败: {e}", file=sys.stderr, flush=True)
sys.stdout.write("OK\n")
sys.stdout.flush()
# 清理
project = QgsProject.instance()
project.clear()
print("[qgis_daemon] 已退出", file=sys.stderr, flush=True)
if __name__ == "__main__":
main()