Files
2026-06-21 22:30:04 +08:00

267 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
QGIS 常驻 Daemon 进程。
由主进程在 FastAPI lifespan 中启动,通过 stdin/stdout JSON 行协议通信。
启动时预加载所有模板到内存缓存,后续请求秒级响应。
协议(每行一个完整 JSON):
输入(stdin: {"config": {...}, "models": [...], "result_file": "path/to/result.json"}
输出(result_file: {"results": [...]}
控制: {"action": "shutdown"}
生命周期:
1. FastAPI 启动 → 启动 daemon 子进程
2. daemon 初始化 QGIS,加载磁盘缓存
3. 收到请求 → 处理模板 → 结果写入 result_file → stdout 输出完成标记
4. FastAPI 关闭 → 发送 shutdown → daemon 退出
"""
import json
import os
import sys
import time
# ============================================================
# 环境初始化(必须在 QGIS import 之前)
# ============================================================
IS_WINDOWS = sys.platform == "win32"
def _detect_qgis_root():
"""自动检测 QGIS 安装根目录(跨平台)"""
root = os.environ.get("QGIS_ROOT")
if root and os.path.isdir(root):
return root
if IS_WINDOWS:
for candidate in ["D:/QGIS", "C:/OSGeo4W", "C:/QGIS"]:
if os.path.isdir(candidate):
return candidate
else:
for candidate in ["/usr", "/opt/QGIS", "/home/QGIS", "/usr/local"]:
if _is_valid_qgis_root_linux(candidate):
return candidate
return None
def _is_valid_qgis_root_linux(path: str) -> bool:
"""验证 Linux 路径是否为有效的 QGIS 安装根目录"""
if not os.path.isdir(path):
return False
return (
os.path.isdir(os.path.join(path, "share", "qgis"))
or any(os.path.isdir(os.path.join(path, "apps", n)) for n in ("qgis-ltr", "qgis"))
or os.path.isfile(os.path.join(path, "bin", "qgis"))
)
QGIS_ROOT = _detect_qgis_root()
def _detect_qgis_app_dir():
"""自动检测 QGIS 应用目录(跨平台)"""
if QGIS_ROOT is None:
raise RuntimeError("未检测到 QGIS 安装目录,请设置 QGIS_ROOT 环境变量")
if IS_WINDOWS:
for name in ("qgis-ltr", "qgis"):
d = os.path.join(QGIS_ROOT, "apps", name)
if os.path.isdir(d):
return d
raise RuntimeError(
f"未找到 QGIS 应用目录: {QGIS_ROOT}\\apps\\qgis-ltr 或 qgis"
)
# Linux: 先检查独立安装器的 app 目录
for name in ("qgis-ltr", "qgis"):
d = os.path.join(QGIS_ROOT, "apps", name)
if os.path.isdir(d):
return d
# Linux apt 安装:返回 prefix 目录
for prefix in ("/usr", "/opt/QGIS"):
if os.path.isdir(os.path.join(prefix, "share", "qgis")):
return prefix
raise RuntimeError(
f"未找到 QGIS 应用目录: {QGIS_ROOT} 下无 apps/qgis-ltr 或 share/qgis"
)
def _setup_environment():
"""设置 QGIS 运行所需的环境变量和 DLL 搜索路径(跨平台)"""
os.environ["PYTHONUTF8"] = "1"
os.environ["GDAL_FILENAME_IS_UTF8"] = "YES"
os.environ["VSI_CACHE"] = "TRUE"
os.environ["VSI_CACHE_SIZE"] = "1000000"
if not IS_WINDOWS:
return
# ── Windows: DLL 预加载 ──
import ctypes
qgis_app_dir = _detect_qgis_app_dir()
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
if os.path.isdir(dll_dir):
os.add_dll_directory(dll_dir)
_preload_dlls = [
"qgis_core.dll", "qgispython.dll",
"Qt5Core.dll", "Qt5Gui.dll", "Qt5Widgets.dll",
"Qt5Network.dll", "Qt5Svg.dll", "Qt5Xml.dll",
"Qt5Concurrent.dll", "Qt5PrintSupport.dll",
]
for dll_dir in [
os.path.join(qgis_app_dir, "bin"),
os.path.join(QGIS_ROOT, "apps", "Qt5", "bin"),
os.path.join(QGIS_ROOT, "apps", "gdal", "lib"),
]:
for dll_name in _preload_dlls:
dll_path = os.path.join(dll_dir, dll_name)
if os.path.isfile(dll_path):
try:
ctypes.WinDLL(dll_path)
except OSError:
pass
def _setup_python_path():
"""将项目根目录和 QGIS Python 路径加入 sys.path(跨平台)"""
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
if project_root not in sys.path:
sys.path.insert(0, project_root)
qgis_app_dir = _detect_qgis_app_dir()
if IS_WINDOWS:
qgis_python = os.path.join(qgis_app_dir, "python")
if os.path.isdir(qgis_python) and qgis_python not in sys.path:
sys.path.insert(0, qgis_python)
else:
for p in [
"/usr/lib/python3/dist-packages",
"/usr/lib/python3.10/dist-packages",
"/usr/lib/python3.11/dist-packages",
"/usr/lib/python3.12/dist-packages",
]:
if os.path.isdir(p) and p not in sys.path:
sys.path.insert(0, p)
def _scan_templates() -> list[str]:
"""扫描所有模板文件,返回绝对路径列表"""
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
template_base = os.path.join(project_root, "app", "data", "template")
templates = []
for event_type in ["rainfall", "earthquake"]:
tdir = os.path.join(template_base, event_type)
if not os.path.isdir(tdir):
continue
for tf in sorted(os.listdir(tdir)):
if tf.endswith(".qgz") and not tf.startswith("tmp"):
templates.append(os.path.join(tdir, tf).replace("\\", "/"))
return templates
# ============================================================
# 主逻辑
# ============================================================
def main():
_setup_environment()
_setup_python_path()
from qgis.core import QgsApplication, QgsProject
# 初始化 QGIS(只做一次)
qgis_app_dir = _detect_qgis_app_dir()
QgsApplication.setPrefixPath(qgis_app_dir, True)
qgs_app = QgsApplication([], False)
qgs_app.initQgis()
from app.services.qgis.map_service import MapService
from app.config.qgis_mappings import build_static_layers_config, get_gpkg_dir
print("[qgis_daemon] QGIS 已启动", file=sys.stderr, flush=True)
# ── 请求处理循环 ──
print("[qgis_daemon] 等待请求...", file=sys.stderr, flush=True)
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
except json.JSONDecodeError as e:
resp = json.dumps({"status": "error", "message": f"JSON 解析失败: {e}"})
sys.stdout.write(resp + "\n")
sys.stdout.flush()
continue
action = request.get("action", "map")
if action == "shutdown":
print("[qgis_daemon] 收到 shutdown,退出", file=sys.stderr, flush=True)
break
if action == "map":
config = request["config"]
models = request["models"]
result_file = request.get("result_file", "")
results = []
t_batch = time.time()
if "static_layers" not in config or not config["static_layers"].get("gpkg_dir"):
config["static_layers"] = build_static_layers_config(get_gpkg_dir())
service = MapService(config)
for i, model in enumerate(models):
t_model = time.time()
try:
name = service.generate(model)
results.append({"name": name, "output": model["outFile"]})
elapsed = time.time() - t_model
print(f"[qgis_daemon] [{i+1}/{len(models)}] {name} ({elapsed:.1f}s)",
file=sys.stderr, flush=True)
except Exception as e:
elapsed = time.time() - t_model
results.append({"name": model.get("name", ""), "output": "", "error": str(e)})
print(f"[qgis_daemon] [{i+1}/{len(models)}] 失败: {e}",
file=sys.stderr, flush=True)
total = time.time() - t_batch
ok = sum(1 for r in results if "error" not in r)
print(f"[qgis_daemon] 完成: {ok}/{len(models)}, {total:.1f}s",
file=sys.stderr, flush=True)
resp = {"results": results}
# 写入结果文件(可靠),然后 stdout 发完成信号
if result_file:
try:
with open(result_file, "w", encoding="utf-8") as f:
json.dump(resp, f, ensure_ascii=False)
except Exception as e:
print(f"[qgis_daemon] 写结果文件失败: {e}", file=sys.stderr, flush=True)
sys.stdout.write("OK\n")
sys.stdout.flush()
# 清理
project = QgsProject.instance()
project.clear()
print("[qgis_daemon] 已退出", file=sys.stderr, flush=True)
if __name__ == "__main__":
main()