From dc2d2acc82eac5e781373e5c985aa1e149885af8 Mon Sep 17 00:00:00 2001 From: huty Date: Thu, 21 May 2026 11:01:43 +0900 Subject: [PATCH] feat: initial framework for AI-powered ops terminal Scaffold an MVP of the natural-language ops terminal: inventory + intent template registry, SSH/WinRM/local connectors, risk-gated executor with SQLite audit log, Claude-driven agent layer using Function Calling, plus a Typer CLI and FastAPI surface. Includes 10 cross-OS intents (disk/system/service) and example inventory. Verified end-to-end on the local Windows host: hosts/intents listing, check_disk_usage execution, and WRITE-class confirmation gating. Co-Authored-By: Claude Opus 4 --- .env.example | 25 ++++ .gitignore | 30 +++++ apps/.gitkeep | 2 + config/inventory.example.yaml | 49 ++++++++ config/settings.yaml | 22 ++++ playbooks/.gitkeep | 2 + pyproject.toml | 46 +++++++ src/ops_tools/__init__.py | 3 + src/ops_tools/ai/__init__.py | 3 + src/ops_tools/ai/agent.py | 169 ++++++++++++++++++++++++++ src/ops_tools/ai/tools.py | 82 +++++++++++++ src/ops_tools/api/__init__.py | 0 src/ops_tools/api/routes.py | 130 ++++++++++++++++++++ src/ops_tools/audit/__init__.py | 3 + src/ops_tools/audit/store.py | 88 ++++++++++++++ src/ops_tools/cli.py | 144 ++++++++++++++++++++++ src/ops_tools/config.py | 64 ++++++++++ src/ops_tools/connectors/__init__.py | 4 + src/ops_tools/connectors/base.py | 40 ++++++ src/ops_tools/connectors/factory.py | 35 ++++++ src/ops_tools/connectors/local.py | 53 ++++++++ src/ops_tools/connectors/ssh.py | 60 +++++++++ src/ops_tools/connectors/winrm.py | 80 ++++++++++++ src/ops_tools/executor/__init__.py | 10 ++ src/ops_tools/executor/risk.py | 64 ++++++++++ src/ops_tools/executor/runner.py | 175 +++++++++++++++++++++++++++ src/ops_tools/intents/__init__.py | 11 ++ src/ops_tools/intents/models.py | 45 +++++++ src/ops_tools/intents/registry.py | 46 +++++++ src/ops_tools/inventory/__init__.py | 4 + src/ops_tools/inventory/facts.py | 63 ++++++++++ src/ops_tools/inventory/loader.py | 47 +++++++ src/ops_tools/inventory/models.py | 61 ++++++++++ src/ops_tools/main.py | 43 +++++++ templates/disk.yaml | 27 +++++ templates/service.yaml | 53 ++++++++ templates/system.yaml | 53 ++++++++ tests/test_smoke.py | 46 +++++++ 38 files changed, 1882 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 apps/.gitkeep create mode 100644 config/inventory.example.yaml create mode 100644 config/settings.yaml create mode 100644 playbooks/.gitkeep create mode 100644 pyproject.toml create mode 100644 src/ops_tools/__init__.py create mode 100644 src/ops_tools/ai/__init__.py create mode 100644 src/ops_tools/ai/agent.py create mode 100644 src/ops_tools/ai/tools.py create mode 100644 src/ops_tools/api/__init__.py create mode 100644 src/ops_tools/api/routes.py create mode 100644 src/ops_tools/audit/__init__.py create mode 100644 src/ops_tools/audit/store.py create mode 100644 src/ops_tools/cli.py create mode 100644 src/ops_tools/config.py create mode 100644 src/ops_tools/connectors/__init__.py create mode 100644 src/ops_tools/connectors/base.py create mode 100644 src/ops_tools/connectors/factory.py create mode 100644 src/ops_tools/connectors/local.py create mode 100644 src/ops_tools/connectors/ssh.py create mode 100644 src/ops_tools/connectors/winrm.py create mode 100644 src/ops_tools/executor/__init__.py create mode 100644 src/ops_tools/executor/risk.py create mode 100644 src/ops_tools/executor/runner.py create mode 100644 src/ops_tools/intents/__init__.py create mode 100644 src/ops_tools/intents/models.py create mode 100644 src/ops_tools/intents/registry.py create mode 100644 src/ops_tools/inventory/__init__.py create mode 100644 src/ops_tools/inventory/facts.py create mode 100644 src/ops_tools/inventory/loader.py create mode 100644 src/ops_tools/inventory/models.py create mode 100644 src/ops_tools/main.py create mode 100644 templates/disk.yaml create mode 100644 templates/service.yaml create mode 100644 templates/system.yaml create mode 100644 tests/test_smoke.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..d8bfa92 --- /dev/null +++ b/.env.example @@ -0,0 +1,25 @@ +# Anthropic API +ANTHROPIC_API_KEY=sk-ant-xxx + +# Model selection +OPS_MODEL_MAIN=claude-sonnet-4-6 +OPS_MODEL_FAST=claude-haiku-4-5-20251001 + +# Storage +OPS_DB_URL=sqlite+aiosqlite:///./data/ops.db + +# Paths (relative to project root) +OPS_INVENTORY_PATH=config/inventory.yaml +OPS_TEMPLATES_DIR=templates +OPS_APPS_DIR=apps +OPS_PLAYBOOKS_DIR=playbooks + +# Safety +# Whether WRITE-class intents auto-execute (default false: require confirm) +OPS_AUTO_EXECUTE_WRITE=false +# Whether DESTRUCTIVE intents are allowed at all +OPS_ALLOW_DESTRUCTIVE=false + +# Server +OPS_HOST=127.0.0.1 +OPS_PORT=8000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d1a096 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ +venv/ +.python-version + +# Environment +.env +.env.local + +# Editors / OS / Tools +.vscode/ +.idea/ +.DS_Store +Thumbs.db +.claude/ + +# Project data +data/ +*.db +*.sqlite +*.sqlite3 +audit.log +logs/ + +# Inventory secrets (only ship the example) +config/inventory.yaml +!config/inventory.example.yaml diff --git a/apps/.gitkeep b/apps/.gitkeep new file mode 100644 index 0000000..1527015 --- /dev/null +++ b/apps/.gitkeep @@ -0,0 +1,2 @@ +# 预留目录:未来用于存放应用台账 (L3 应用层) +# 例如 apps/order-service.yaml 描述某个应用的部署元数据 diff --git a/config/inventory.example.yaml b/config/inventory.example.yaml new file mode 100644 index 0000000..c02f4a2 --- /dev/null +++ b/config/inventory.example.yaml @@ -0,0 +1,49 @@ +# 主机清单示例 —— 复制为 inventory.yaml 后填入真实信息 +# Inventory.yaml is gitignored to avoid leaking credentials. + +hosts: + # ───── Linux 示例 ───── + app-prod-01: + address: 10.0.1.10 + port: 22 + os_family: linux # 可省略,首次连接会自动探测 + os_distribution: ubuntu # 可选 + connection: ssh + user: ops + auth: + type: key # key | password + key_path: ~/.ssh/id_rsa + tags: [prod, app] + + db-prod-01: + address: 10.0.1.20 + connection: ssh + user: ops + auth: + type: password + password: "${SECRET_DB_PROD_01}" # 支持环境变量插值 + tags: [prod, db] + + # ───── Windows 示例 ───── + win-app-01: + address: 10.0.2.10 + port: 5985 + os_family: windows + connection: winrm + user: Administrator + auth: + type: password + password: "${SECRET_WIN_APP_01}" + tags: [prod, app] + + # ───── 本地测试 ───── + localhost: + address: 127.0.0.1 + connection: local # 直接调用本机 shell,方便联调 + tags: [dev] + +groups: + prod: [app-prod-01, db-prod-01, win-app-01] + linux: [app-prod-01, db-prod-01] + windows: [win-app-01] + dev: [localhost] diff --git a/config/settings.yaml b/config/settings.yaml new file mode 100644 index 0000000..c430687 --- /dev/null +++ b/config/settings.yaml @@ -0,0 +1,22 @@ +# 全局运行时设置 +default_timeout_seconds: 30 +ssh: + known_hosts: null # 设为 null 表示不校验 known_hosts(实验环境);生产建议指定路径 + connect_timeout: 10 +winrm: + transport: ntlm # ntlm | basic | kerberos + server_cert_validation: ignore # 实验环境;生产请用 validate + +risk_policy: + read_auto: true # READ 类是否自动执行 + write_require_confirm: true + destructive_default_block: true + destructive_keywords: # 命令文本中出现这些词时升级为 DESTRUCTIVE + - "rm -rf" + - "mkfs" + - "format " + - "dd if=" + - "shutdown" + - "reboot" + - "Remove-Item -Recurse" + - "Format-Volume" diff --git a/playbooks/.gitkeep b/playbooks/.gitkeep new file mode 100644 index 0000000..71ccf32 --- /dev/null +++ b/playbooks/.gitkeep @@ -0,0 +1,2 @@ +# 预留目录:未来用于存放部署/编排剧本 (L3-L4) +# 例如 playbooks/deploy_java_jar.yaml 描述一次部署的有序步骤 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9bf8620 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,46 @@ +[project] +name = "ai-app-ops-tools" +version = "0.1.0" +description = "AI-powered natural language ops terminal for heterogeneous servers" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.110", + "uvicorn[standard]>=0.27", + "pydantic>=2.6", + "pydantic-settings>=2.2", + "PyYAML>=6.0", + "Jinja2>=3.1", + "asyncssh>=2.14", + "pywinrm>=0.4.3", + "sqlalchemy[asyncio]>=2.0", + "aiosqlite>=0.19", + "anthropic>=0.40", + "typer>=0.12", + "python-dotenv>=1.0", + "rich>=13.7", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "ruff>=0.3", +] + +[project.scripts] +ops = "ops_tools.cli:app" + +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.ruff] +line-length = 100 +target-version = "py311" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/src/ops_tools/__init__.py b/src/ops_tools/__init__.py new file mode 100644 index 0000000..66ff92c --- /dev/null +++ b/src/ops_tools/__init__.py @@ -0,0 +1,3 @@ +"""ai-app-ops-tools — natural-language ops terminal for heterogeneous servers.""" + +__version__ = "0.1.0" diff --git a/src/ops_tools/ai/__init__.py b/src/ops_tools/ai/__init__.py new file mode 100644 index 0000000..b536ffd --- /dev/null +++ b/src/ops_tools/ai/__init__.py @@ -0,0 +1,3 @@ +from .agent import OpsAgent, AgentTurn + +__all__ = ["OpsAgent", "AgentTurn"] diff --git a/src/ops_tools/ai/agent.py b/src/ops_tools/ai/agent.py new file mode 100644 index 0000000..193fe4d --- /dev/null +++ b/src/ops_tools/ai/agent.py @@ -0,0 +1,169 @@ +"""Claude-powered conversation loop that dispatches to IntentRunner.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from anthropic import AsyncAnthropic + +from ops_tools.config import get_settings +from ops_tools.executor.runner import ConfirmationRequired, IntentRunner +from ops_tools.inventory.models import Inventory + +from .tools import SYSTEM_PROMPT, build_tools + + +@dataclass +class AgentTurn: + """One round-trip with the model, including any tool calls + final text.""" + + user_message: str + assistant_text: str = "" + tool_calls: list[dict] = field(default_factory=list) + + +class OpsAgent: + def __init__(self, inventory: Inventory, runner: IntentRunner): + self.inventory = inventory + self.runner = runner + self.settings = get_settings() + self.client = AsyncAnthropic(api_key=self.settings.anthropic_api_key) + self.tools = build_tools(runner.registry) + # The full message history (multi-turn aware). + self.messages: list[dict] = [] + + async def chat(self, user_message: str, max_tool_loops: int = 6) -> AgentTurn: + turn = AgentTurn(user_message=user_message) + self.messages.append({"role": "user", "content": user_message}) + + for _ in range(max_tool_loops): + resp = await self.client.messages.create( + model=self.settings.model_main, + max_tokens=2048, + system=SYSTEM_PROMPT, + tools=self.tools, + messages=self.messages, + ) + + # Collect assistant message exactly as returned so future tool_use + # blocks can be referenced by id. + assistant_blocks: list[dict] = [] + text_parts: list[str] = [] + tool_uses: list[Any] = [] + for block in resp.content: + if block.type == "text": + assistant_blocks.append({"type": "text", "text": block.text}) + text_parts.append(block.text) + elif block.type == "tool_use": + assistant_blocks.append( + { + "type": "tool_use", + "id": block.id, + "name": block.name, + "input": block.input, + } + ) + tool_uses.append(block) + + self.messages.append({"role": "assistant", "content": assistant_blocks}) + turn.assistant_text = "\n".join(text_parts) + + if not tool_uses: + return turn + + # Execute each tool, attach result to messages. + tool_results: list[dict] = [] + for tu in tool_uses: + turn.tool_calls.append({"name": tu.name, "input": tu.input}) + result = await self._dispatch_tool(tu.name, tu.input or {}) + tool_results.append( + { + "type": "tool_result", + "tool_use_id": tu.id, + "content": result, + } + ) + self.messages.append({"role": "user", "content": tool_results}) + + return turn + + async def _dispatch_tool(self, name: str, args: dict) -> str: + try: + if name == "list_hosts": + return _format_hosts(self.inventory, args.get("selector")) + if name == "list_intents": + return _format_intents(self.runner.registry) + if name == "run_intent": + return await self._run_intent(args) + return f"ERROR: unknown tool '{name}'" + except Exception as e: # noqa: BLE001 — surface to model, don't crash loop + return f"ERROR: {type(e).__name__}: {e}" + + async def _run_intent(self, args: dict) -> str: + intent = args["intent"] + host_selectors = args["hosts"] + params = args.get("params") or {} + confirmed = bool(args.get("confirmed")) + + hosts = [] + seen = set() + for sel in host_selectors: + for h in self.inventory.resolve(sel): + if h.name not in seen: + seen.add(h.name) + hosts.append(h) + + try: + result = await self.runner.run( + intent, hosts, params, confirmed=confirmed, nl_query=None, user="ai-agent" + ) + except ConfirmationRequired as e: + preview = "\n".join( + f" - [{p.host}|{p.os_family}|{p.risk_level.value}] {p.command}" + for p in e.preview + ) + return ( + f"CONFIRM_REQUIRED: {e}\n" + f"将要执行的命令预览:\n{preview}\n" + f"如用户确认,请用 confirmed=true 再次调用 run_intent。" + ) + + lines = [f"intent={result.intent} risk={result.risk_level.value}"] + for ex in result.executions: + if ex.error: + lines.append(f" [{ex.host}] ERROR: {ex.error}") + else: + r = ex.result + lines.append( + f" [{ex.host}|{ex.os_family}] exit={r.exit_code} " + f"duration={r.duration_ms}ms" + ) + if r.stdout: + lines.append(f" stdout:\n{_indent(r.stdout, 6)}") + if r.stderr.strip(): + lines.append(f" stderr:\n{_indent(r.stderr, 6)}") + return "\n".join(lines) + + +def _format_hosts(inv: Inventory, selector: str | None) -> str: + hosts = list(inv.hosts.values()) if not selector else inv.resolve(selector) + if not hosts: + return "(no hosts)" + return "\n".join( + f"- {h.name} address={h.address} os={h.os_family.value} " + f"conn={h.connection.value} tags={h.tags}" + for h in hosts + ) + + +def _format_intents(reg) -> str: + return "\n".join( + f"- {t.intent} [{t.risk_level.value}] {t.description} " + f"(params: {[p.name for p in t.params]})" + for t in reg.all() + ) + + +def _indent(text: str, n: int) -> str: + pad = " " * n + return "\n".join(pad + line for line in text.rstrip().splitlines()) diff --git a/src/ops_tools/ai/tools.py b/src/ops_tools/ai/tools.py new file mode 100644 index 0000000..3dd2742 --- /dev/null +++ b/src/ops_tools/ai/tools.py @@ -0,0 +1,82 @@ +"""Tool (function-call) definitions exposed to Claude. + +We intentionally do NOT give the model a generic "run shell command" tool. +All execution flows through pre-defined intents in the registry. This is the +foundation of safety: the LLM can only compose what we've vetted. +""" +from __future__ import annotations + +from ops_tools.intents.registry import IntentRegistry + +SYSTEM_PROMPT = """你是一个运维助手,帮助用户用自然语言操作异构服务器(Linux/Windows 等)。 + +工作方式: +1. 用户描述需求时,先用 list_hosts / list_intents 探查可用资源 +2. 然后调用 run_intent 执行已知意图,不要凭空构造命令 +3. 如果某个需求没有对应意图,明确告诉用户"暂无该意图",并建议在 templates/ 中新增 +4. 执行结果中如果有 risk_level=WRITE 而被拒绝,要把命令预览展示给用户,请用户确认后用 confirmed=true 再次调用 +5. 输出结果时,把原始 stdout 翻译成人类易读的摘要 + +不要: +- 编造主机名、意图名或命令 +- 在没有用户明确确认前执行 WRITE/DESTRUCTIVE 类意图 +""" + + +def build_tools(registry: IntentRegistry) -> list[dict]: + """Return Anthropic-format tool schemas. Intent names are enumerated so the + model gets autocomplete-style guidance instead of free-form strings.""" + intent_names = registry.names() + + return [ + { + "name": "list_hosts", + "description": "列出 inventory 中所有主机,可按 tag/group 过滤。", + "input_schema": { + "type": "object", + "properties": { + "selector": { + "type": "string", + "description": "可选:主机名/组名/tag。不填则返回全部。", + } + }, + }, + }, + { + "name": "list_intents", + "description": "列出已注册的意图(命令模板)及其用途。", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "run_intent", + "description": ( + "在指定主机上执行一个意图。READ 类自动执行;WRITE/DESTRUCTIVE " + "类首次调用会被拒绝并返回命令预览,得到用户确认后用 confirmed=true 重试。" + ), + "input_schema": { + "type": "object", + "properties": { + "intent": { + "type": "string", + "enum": intent_names or ["__none__"], + "description": "意图名(必须来自 list_intents)", + }, + "hosts": { + "type": "array", + "items": {"type": "string"}, + "description": "主机名/组名/tag 列表", + }, + "params": { + "type": "object", + "description": "意图参数,键值对", + }, + "confirmed": { + "type": "boolean", + "description": "用户已确认风险?默认 false", + "default": False, + }, + }, + "required": ["intent", "hosts"], + }, + }, + ] diff --git a/src/ops_tools/api/__init__.py b/src/ops_tools/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ops_tools/api/routes.py b/src/ops_tools/api/routes.py new file mode 100644 index 0000000..3eeabad --- /dev/null +++ b/src/ops_tools/api/routes.py @@ -0,0 +1,130 @@ +"""HTTP routes — kept minimal: list hosts/intents, run an intent, chat.""" +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from ops_tools.ai.agent import OpsAgent +from ops_tools.audit.store import AuditStore +from ops_tools.executor.runner import ConfirmationRequired, IntentRunner +from ops_tools.intents.registry import IntentRegistry +from ops_tools.inventory.models import Inventory + + +def build_router( + inventory: Inventory, + registry: IntentRegistry, + runner: IntentRunner, +) -> APIRouter: + router = APIRouter() + + @router.get("/hosts") + def hosts(): + return [ + { + "name": h.name, + "address": h.address, + "os_family": h.os_family.value, + "connection": h.connection.value, + "tags": h.tags, + } + for h in inventory.hosts.values() + ] + + @router.get("/intents") + def intents(): + return [ + { + "intent": t.intent, + "description": t.description, + "risk_level": t.risk_level.value, + "params": [p.model_dump() for p in t.params], + "os_families": list(t.implementations.keys()), + } + for t in registry.all() + ] + + class RunBody(BaseModel): + intent: str + hosts: list[str] + params: dict[str, Any] = Field(default_factory=dict) + confirmed: bool = False + user: str = "api" + + @router.post("/run") + async def run(body: RunBody): + host_objs = [] + seen = set() + for sel in body.hosts: + try: + for h in inventory.resolve(sel): + if h.name not in seen: + seen.add(h.name) + host_objs.append(h) + except KeyError as e: + raise HTTPException(status_code=404, detail=str(e)) + try: + result = await runner.run( + body.intent, + host_objs, + body.params, + confirmed=body.confirmed, + user=body.user, + ) + except ConfirmationRequired as e: + return { + "status": "confirmation_required", + "message": str(e), + "preview": [ + { + "host": p.host, + "os_family": p.os_family, + "command": p.command, + "risk_level": p.risk_level.value, + } + for p in e.preview + ], + } + except KeyError as e: + raise HTTPException(status_code=404, detail=str(e)) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + return { + "status": "ok", + "intent": result.intent, + "risk_level": result.risk_level.value, + "executions": [ + { + "host": ex.host, + "os_family": ex.os_family, + "command": ex.command, + "exit_code": ex.result.exit_code if ex.result else None, + "stdout": ex.result.stdout if ex.result else None, + "stderr": ex.result.stderr if ex.result else None, + "duration_ms": ex.result.duration_ms if ex.result else None, + "error": ex.error, + } + for ex in result.executions + ], + } + + class ChatBody(BaseModel): + message: str + + # Naive single-agent-per-process state. Real impl: session_id -> agent. + _agent: dict = {"instance": None} + + @router.post("/chat") + async def chat(body: ChatBody): + agent = _agent["instance"] or OpsAgent(inventory, runner) + _agent["instance"] = agent + turn = await agent.chat(body.message) + return { + "assistant": turn.assistant_text, + "tool_calls": turn.tool_calls, + } + + return router diff --git a/src/ops_tools/audit/__init__.py b/src/ops_tools/audit/__init__.py new file mode 100644 index 0000000..ea02362 --- /dev/null +++ b/src/ops_tools/audit/__init__.py @@ -0,0 +1,3 @@ +from .store import AuditStore, init_db + +__all__ = ["AuditStore", "init_db"] diff --git a/src/ops_tools/audit/store.py b/src/ops_tools/audit/store.py new file mode 100644 index 0000000..bf45955 --- /dev/null +++ b/src/ops_tools/audit/store.py @@ -0,0 +1,88 @@ +"""Persistent audit log. + +A single executions table is enough for L1/L2. When L3 playbooks land, add a +`playbook_runs` table and let `executions` reference it via a nullable FK. +""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path + +from sqlalchemy import JSON, DateTime, Integer, String, Text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + +from ops_tools.config import get_settings + + +class Base(DeclarativeBase): + pass + + +class AuditRecord(Base): + __tablename__ = "audit_records" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True)) + user: Mapped[str] = mapped_column(String(64)) + intent: Mapped[str] = mapped_column(String(128)) + risk_level: Mapped[str] = mapped_column(String(16)) + hosts: Mapped[str] = mapped_column(Text) # JSON array + params: Mapped[str] = mapped_column(Text) # JSON object + nl_query: Mapped[str | None] = mapped_column(Text, nullable=True) + executions: Mapped[dict] = mapped_column(JSON) # list[dict] + + +_engine = None +_session_maker: async_sessionmaker[AsyncSession] | None = None + + +async def init_db() -> None: + global _engine, _session_maker + if _engine is not None: + return + + settings = get_settings() + # Make sure the data/ directory exists for SQLite URLs. + if settings.db_url.startswith("sqlite"): + db_file = settings.db_url.split("///")[-1] + Path(db_file).parent.mkdir(parents=True, exist_ok=True) + + _engine = create_async_engine(settings.db_url, echo=False, future=True) + _session_maker = async_sessionmaker(_engine, expire_on_commit=False) + + async with _engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + +class AuditStore: + async def record( + self, + *, + user: str, + intent: str, + risk_level: str, + hosts: list[str], + params: dict, + executions: list[dict], + nl_query: str | None = None, + ) -> None: + if _session_maker is None: + await init_db() + assert _session_maker is not None + + async with _session_maker() as s: + s.add( + AuditRecord( + timestamp=datetime.now(timezone.utc), + user=user, + intent=intent, + risk_level=risk_level, + hosts=json.dumps(hosts), + params=json.dumps(params, default=str), + nl_query=nl_query, + executions=executions, + ) + ) + await s.commit() diff --git a/src/ops_tools/cli.py b/src/ops_tools/cli.py new file mode 100644 index 0000000..5b2c26c --- /dev/null +++ b/src/ops_tools/cli.py @@ -0,0 +1,144 @@ +"""Typer-based CLI — fastest way to drive the framework end-to-end.""" +from __future__ import annotations + +import asyncio +import json + +import typer +from rich.console import Console +from rich.table import Table + +from ops_tools.ai.agent import OpsAgent +from ops_tools.audit.store import AuditStore, init_db +from ops_tools.config import get_settings +from ops_tools.executor.runner import ConfirmationRequired, IntentRunner +from ops_tools.inventory.loader import load_inventory +from ops_tools.intents.registry import load_intent_registry + +app = typer.Typer(help="ai-app-ops-tools CLI") +console = Console() + + +def _bootstrap(): + settings = get_settings() + inventory = load_inventory(settings.resolve(settings.inventory_path)) + registry = load_intent_registry(settings.resolve(settings.templates_dir)) + return settings, inventory, registry + + +@app.command() +def hosts(): + """List hosts in inventory.""" + _, inv, _ = _bootstrap() + table = Table(title="Inventory") + for col in ("name", "address", "os_family", "connection", "tags"): + table.add_column(col) + for h in inv.hosts.values(): + table.add_row( + h.name, h.address, h.os_family.value, h.connection.value, ",".join(h.tags) + ) + console.print(table) + + +@app.command() +def intents(): + """List registered intents.""" + _, _, reg = _bootstrap() + table = Table(title="Intents") + for col in ("intent", "risk", "description", "params"): + table.add_column(col) + for t in reg.all(): + table.add_row( + t.intent, + t.risk_level.value, + t.description, + ",".join(p.name for p in t.params), + ) + console.print(table) + + +@app.command() +def run( + intent: str = typer.Argument(..., help="Intent name"), + target: str = typer.Argument(..., help="Host / group / tag selector"), + params: str = typer.Option("{}", "--params", "-p", help="JSON object of params"), + confirm: bool = typer.Option(False, "--confirm", "-y", help="Approve WRITE-class run"), +): + """Execute an intent directly without invoking the AI agent.""" + asyncio.run(_run(intent, target, params, confirm)) + + +async def _run(intent: str, target: str, params_json: str, confirmed: bool): + settings, inv, reg = _bootstrap() + await init_db() + runner = IntentRunner(reg, AuditStore()) + hosts_resolved = inv.resolve(target) + try: + result = await runner.run( + intent, + hosts_resolved, + json.loads(params_json), + confirmed=confirmed, + user="cli", + ) + except ConfirmationRequired as e: + console.print(f"[yellow]{e}[/yellow]") + for p in e.preview: + console.print( + f" [{p.host} | {p.os_family} | {p.risk_level.value}] {p.command}" + ) + console.print("\nRe-run with --confirm to execute.") + raise typer.Exit(code=2) + + for ex in result.executions: + console.rule(f"[bold]{ex.host}[/bold] ({ex.os_family})") + if ex.error: + console.print(f"[red]ERROR:[/red] {ex.error}") + continue + r = ex.result + console.print(f"exit={r.exit_code} duration={r.duration_ms}ms") + if r.stdout: + console.print(r.stdout) + if r.stderr.strip(): + console.print(f"[yellow]stderr:[/yellow]\n{r.stderr}") + + +@app.command() +def chat(message: str = typer.Argument(..., help="Natural language request")): + """Single-turn chat with the AI agent.""" + asyncio.run(_chat(message)) + + +async def _chat(message: str): + settings, inv, reg = _bootstrap() + if not settings.anthropic_api_key: + console.print("[red]ANTHROPIC_API_KEY 未设置(参考 .env.example)[/red]") + raise typer.Exit(code=1) + await init_db() + runner = IntentRunner(reg, AuditStore()) + agent = OpsAgent(inv, runner) + turn = await agent.chat(message) + console.rule("[bold]Assistant[/bold]") + console.print(turn.assistant_text) + if turn.tool_calls: + console.rule("[dim]Tool calls[/dim]") + for tc in turn.tool_calls: + console.print(f" • {tc['name']}({tc['input']})") + + +@app.command() +def serve(): + """Start the FastAPI server.""" + import uvicorn + + settings = get_settings() + uvicorn.run( + "ops_tools.main:app", + host=settings.host, + port=settings.port, + reload=False, + ) + + +if __name__ == "__main__": + app() diff --git a/src/ops_tools/config.py b/src/ops_tools/config.py new file mode 100644 index 0000000..61fb2e8 --- /dev/null +++ b/src/ops_tools/config.py @@ -0,0 +1,64 @@ +"""Global settings loaded from .env + config/settings.yaml.""" +from __future__ import annotations + +from functools import lru_cache +from pathlib import Path +from typing import Any + +import yaml +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + +PROJECT_ROOT = Path(__file__).resolve().parents[2] + + +class Settings(BaseSettings): + """Environment-driven settings (see .env.example).""" + + model_config = SettingsConfigDict( + env_file=PROJECT_ROOT / ".env", + env_file_encoding="utf-8", + extra="ignore", + env_prefix="OPS_", + ) + + # Anthropic + anthropic_api_key: str = Field(default="", alias="ANTHROPIC_API_KEY") + model_main: str = "claude-sonnet-4-6" + model_fast: str = "claude-haiku-4-5-20251001" + + # Storage + db_url: str = "sqlite+aiosqlite:///./data/ops.db" + + # Paths + inventory_path: str = "config/inventory.yaml" + templates_dir: str = "templates" + apps_dir: str = "apps" + playbooks_dir: str = "playbooks" + settings_yaml: str = "config/settings.yaml" + + # Safety toggles + auto_execute_write: bool = False + allow_destructive: bool = False + + # Server + host: str = "127.0.0.1" + port: int = 8000 + + def resolve(self, relative: str) -> Path: + """Resolve a path relative to project root.""" + p = Path(relative) + return p if p.is_absolute() else PROJECT_ROOT / p + + def load_runtime_yaml(self) -> dict[str, Any]: + """Load config/settings.yaml (non-secret runtime tuning).""" + path = self.resolve(self.settings_yaml) + if not path.exists(): + return {} + with path.open("r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + + +@lru_cache(maxsize=1) +def get_settings() -> Settings: + return Settings() diff --git a/src/ops_tools/connectors/__init__.py b/src/ops_tools/connectors/__init__.py new file mode 100644 index 0000000..f29fbc0 --- /dev/null +++ b/src/ops_tools/connectors/__init__.py @@ -0,0 +1,4 @@ +from .base import BaseConnector, ExecResult, ConnectorError +from .factory import get_connector + +__all__ = ["BaseConnector", "ExecResult", "ConnectorError", "get_connector"] diff --git a/src/ops_tools/connectors/base.py b/src/ops_tools/connectors/base.py new file mode 100644 index 0000000..9c894fd --- /dev/null +++ b/src/ops_tools/connectors/base.py @@ -0,0 +1,40 @@ +"""Abstract connector contract. + +All transport-specific connectors (SSH, WinRM, local) implement BaseConnector +and return a uniform ExecResult, so upper layers don't care about transport. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass + +from ops_tools.inventory.models import Host + + +class ConnectorError(Exception): + """Raised for connection/transport failures (auth, network, timeout).""" + + +@dataclass +class ExecResult: + exit_code: int + stdout: str + stderr: str + duration_ms: int + + +class BaseConnector(ABC): + """One connector instance is bound to one host. Use as an async context manager.""" + + def __init__(self, host: Host): + self.host = host + + @abstractmethod + async def __aenter__(self) -> "BaseConnector": ... + + @abstractmethod + async def __aexit__(self, exc_type, exc, tb) -> None: ... + + @abstractmethod + async def run(self, command: str, timeout: int = 30) -> ExecResult: + """Execute a single command and capture stdout/stderr.""" diff --git a/src/ops_tools/connectors/factory.py b/src/ops_tools/connectors/factory.py new file mode 100644 index 0000000..fdc4af0 --- /dev/null +++ b/src/ops_tools/connectors/factory.py @@ -0,0 +1,35 @@ +"""Pick a connector implementation based on the host's connection type.""" +from __future__ import annotations + +from ops_tools.config import get_settings +from ops_tools.inventory.models import ConnectionType, Host + +from .base import BaseConnector +from .local import LocalConnector +from .ssh import SSHConnector +from .winrm import WinRMConnector + + +def get_connector(host: Host) -> BaseConnector: + """Return a (not yet opened) connector instance for the given host.""" + runtime = get_settings().load_runtime_yaml() + ssh_cfg = runtime.get("ssh", {}) or {} + winrm_cfg = runtime.get("winrm", {}) or {} + + match host.connection: + case ConnectionType.SSH: + return SSHConnector( + host, + known_hosts=ssh_cfg.get("known_hosts"), + connect_timeout=ssh_cfg.get("connect_timeout", 10), + ) + case ConnectionType.WINRM: + return WinRMConnector( + host, + transport=winrm_cfg.get("transport", "ntlm"), + server_cert_validation=winrm_cfg.get("server_cert_validation", "ignore"), + ) + case ConnectionType.LOCAL: + return LocalConnector(host) + case _: + raise ValueError(f"Unsupported connection type: {host.connection}") diff --git a/src/ops_tools/connectors/local.py b/src/ops_tools/connectors/local.py new file mode 100644 index 0000000..298565d --- /dev/null +++ b/src/ops_tools/connectors/local.py @@ -0,0 +1,53 @@ +"""Local connector — executes commands on the controller machine. + +Useful for development, testing, and managing the controller itself. Picks the +shell based on the controller's OS rather than the Host record so that an +inventory entry of `connection: local` always works. +""" +from __future__ import annotations + +import asyncio +import platform +import time + +from .base import BaseConnector, ExecResult + + +class LocalConnector(BaseConnector): + async def __aenter__(self) -> "LocalConnector": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + return None + + async def run(self, command: str, timeout: int = 30) -> ExecResult: + start = time.monotonic() + is_windows = platform.system().lower() == "windows" + + if is_windows: + # Use PowerShell on Windows so the same intent templates work locally. + proc = await asyncio.create_subprocess_exec( + "powershell.exe", "-NoProfile", "-Command", command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + else: + proc = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise TimeoutError(f"Local command timed out after {timeout}s") + + return ExecResult( + exit_code=proc.returncode or 0, + stdout=stdout.decode("utf-8", errors="replace"), + stderr=stderr.decode("utf-8", errors="replace"), + duration_ms=int((time.monotonic() - start) * 1000), + ) diff --git a/src/ops_tools/connectors/ssh.py b/src/ops_tools/connectors/ssh.py new file mode 100644 index 0000000..f895390 --- /dev/null +++ b/src/ops_tools/connectors/ssh.py @@ -0,0 +1,60 @@ +"""SSH connector for Linux/Unix hosts, built on asyncssh.""" +from __future__ import annotations + +import time + +import asyncssh + +from ops_tools.inventory.models import Host + +from .base import BaseConnector, ConnectorError, ExecResult + + +class SSHConnector(BaseConnector): + def __init__(self, host: Host, *, known_hosts=None, connect_timeout: int = 10): + super().__init__(host) + self._known_hosts = known_hosts + self._connect_timeout = connect_timeout + self._conn: asyncssh.SSHClientConnection | None = None + + async def __aenter__(self) -> "SSHConnector": + h = self.host + auth = h.auth + opts: dict = { + "username": h.user, + "known_hosts": self._known_hosts, # None disables host-key check (dev only) + "connect_timeout": self._connect_timeout, + } + if h.port: + opts["port"] = h.port + if auth and auth.type == "key" and auth.key_path: + opts["client_keys"] = [auth.key_path] + elif auth and auth.type == "password" and auth.password: + opts["password"] = auth.password + + try: + self._conn = await asyncssh.connect(h.address, **opts) + except (OSError, asyncssh.Error) as e: + raise ConnectorError(f"SSH connect failed to {h.name}: {e}") from e + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + if self._conn is not None: + self._conn.close() + await self._conn.wait_closed() + self._conn = None + + async def run(self, command: str, timeout: int = 30) -> ExecResult: + if self._conn is None: + raise ConnectorError("SSH session not open — use 'async with' context") + start = time.monotonic() + try: + res = await self._conn.run(command, check=False, timeout=timeout) + except asyncssh.Error as e: + raise ConnectorError(f"SSH exec failed: {e}") from e + return ExecResult( + exit_code=res.exit_status if res.exit_status is not None else -1, + stdout=res.stdout or "", + stderr=res.stderr or "", + duration_ms=int((time.monotonic() - start) * 1000), + ) diff --git a/src/ops_tools/connectors/winrm.py b/src/ops_tools/connectors/winrm.py new file mode 100644 index 0000000..37f82b0 --- /dev/null +++ b/src/ops_tools/connectors/winrm.py @@ -0,0 +1,80 @@ +"""WinRM connector for Windows hosts. + +pywinrm is synchronous; we run blocking calls in a thread executor so the rest +of the system stays async. +""" +from __future__ import annotations + +import asyncio +import time + +from ops_tools.inventory.models import Host + +from .base import BaseConnector, ConnectorError, ExecResult + + +class WinRMConnector(BaseConnector): + def __init__( + self, + host: Host, + *, + transport: str = "ntlm", + server_cert_validation: str = "ignore", + ): + super().__init__(host) + self._transport = transport + self._server_cert_validation = server_cert_validation + self._session = None + + async def __aenter__(self) -> "WinRMConnector": + try: + import winrm # imported lazily so the package is optional in dev + except ImportError as e: + raise ConnectorError( + "pywinrm not installed — `pip install pywinrm` to use WinRM" + ) from e + + h = self.host + if not h.user or not h.auth or not h.auth.password: + raise ConnectorError(f"WinRM host {h.name} requires user + password auth") + + endpoint = f"http://{h.address}:{h.port or 5985}/wsman" + loop = asyncio.get_running_loop() + try: + self._session = await loop.run_in_executor( + None, + lambda: winrm.Session( + endpoint, + auth=(h.user, h.auth.password), + transport=self._transport, + server_cert_validation=self._server_cert_validation, + ), + ) + except Exception as e: # noqa: BLE001 — bubble to connector layer + raise ConnectorError(f"WinRM session init failed for {h.name}: {e}") from e + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + self._session = None # pywinrm Session has no explicit close + + async def run(self, command: str, timeout: int = 30) -> ExecResult: + if self._session is None: + raise ConnectorError("WinRM session not open") + start = time.monotonic() + loop = asyncio.get_running_loop() + try: + res = await asyncio.wait_for( + loop.run_in_executor(None, self._session.run_ps, command), + timeout=timeout, + ) + except asyncio.TimeoutError: + raise TimeoutError(f"WinRM command timed out after {timeout}s") + except Exception as e: # noqa: BLE001 + raise ConnectorError(f"WinRM exec failed: {e}") from e + + return ExecResult( + exit_code=res.status_code, + stdout=(res.std_out or b"").decode("utf-8", errors="replace"), + stderr=(res.std_err or b"").decode("utf-8", errors="replace"), + duration_ms=int((time.monotonic() - start) * 1000), + ) diff --git a/src/ops_tools/executor/__init__.py b/src/ops_tools/executor/__init__.py new file mode 100644 index 0000000..d7ee428 --- /dev/null +++ b/src/ops_tools/executor/__init__.py @@ -0,0 +1,10 @@ +from .runner import IntentRunner, IntentRunResult, ConfirmationRequired +from .risk import classify_risk, sanitize_params + +__all__ = [ + "IntentRunner", + "IntentRunResult", + "ConfirmationRequired", + "classify_risk", + "sanitize_params", +] diff --git a/src/ops_tools/executor/risk.py b/src/ops_tools/executor/risk.py new file mode 100644 index 0000000..94050a3 --- /dev/null +++ b/src/ops_tools/executor/risk.py @@ -0,0 +1,64 @@ +"""Risk classification and parameter sanitization.""" +from __future__ import annotations + +import shlex +from typing import Any + +from ops_tools.intents.models import IntentTemplate, RiskLevel + + +def classify_risk( + template: IntentTemplate, + rendered_command: str, + destructive_keywords: list[str], +) -> RiskLevel: + """Possibly upgrade the template's declared risk level based on the rendered command. + + A template author may say WRITE but if user-supplied parameters cause the + command to contain `rm -rf`, we promote it to DESTRUCTIVE. + """ + cmd_lower = rendered_command.lower() + for kw in destructive_keywords or []: + if kw.lower() in cmd_lower: + return RiskLevel.DESTRUCTIVE + return template.risk_level + + +def sanitize_params(template: IntentTemplate, params: dict[str, Any]) -> dict[str, Any]: + """Validate against the declared param schema and shell-escape string values. + + Notes + ----- + Shell-escaping with `shlex.quote` is correct for POSIX shells. For PowerShell + we rely on single-quote wrapping inside templates (see service.yaml). This is + intentionally conservative — better to over-quote than allow injection. + """ + out: dict[str, Any] = {} + declared = {p.name: p for p in template.params} + + for name, spec in declared.items(): + if name in params: + value = params[name] + elif spec.default is not None: + value = spec.default + elif spec.required: + raise ValueError(f"Missing required param: {name}") + else: + continue + + if spec.type == "integer": + value = int(value) + elif spec.type == "boolean": + value = bool(value) + else: + # Strings get shell-quoted (POSIX). PowerShell templates wrap them in + # single quotes themselves; the extra escaping is safe. + value = shlex.quote(str(value)).strip("'") + out[name] = value + + # Reject undeclared params to keep the surface tight. + extra = set(params) - set(declared) + if extra: + raise ValueError(f"Unknown params for intent {template.intent}: {extra}") + + return out diff --git a/src/ops_tools/executor/runner.py b/src/ops_tools/executor/runner.py new file mode 100644 index 0000000..7881f4b --- /dev/null +++ b/src/ops_tools/executor/runner.py @@ -0,0 +1,175 @@ +"""Run a named intent against one or more hosts, with risk gating + audit.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from jinja2 import Environment, StrictUndefined + +from ops_tools.audit.store import AuditStore +from ops_tools.config import get_settings +from ops_tools.connectors import ExecResult, get_connector +from ops_tools.inventory.facts import probe_facts +from ops_tools.inventory.models import Host, OSFamily +from ops_tools.intents.models import IntentTemplate, RiskLevel +from ops_tools.intents.registry import IntentRegistry + +from .risk import classify_risk, sanitize_params + +_jinja = Environment(undefined=StrictUndefined, autoescape=False, keep_trailing_newline=False) + + +class ConfirmationRequired(Exception): + """Raised when a WRITE/DESTRUCTIVE intent is dispatched without `confirmed=True`.""" + + def __init__(self, message: str, preview: list["CommandPreview"]): + super().__init__(message) + self.preview = preview + + +@dataclass +class CommandPreview: + host: str + os_family: str + command: str + risk_level: RiskLevel + + +@dataclass +class HostExecution: + host: str + os_family: str + command: str + result: ExecResult | None = None + error: str | None = None + + +@dataclass +class IntentRunResult: + intent: str + risk_level: RiskLevel + executions: list[HostExecution] = field(default_factory=list) + + +class IntentRunner: + """Top-level entry point used by the CLI, the API, and the AI agent.""" + + def __init__( + self, + registry: IntentRegistry, + audit: AuditStore, + ): + self.registry = registry + self.audit = audit + self.settings = get_settings() + self.runtime = self.settings.load_runtime_yaml() + + async def run( + self, + intent_name: str, + hosts: list[Host], + params: dict[str, Any] | None = None, + *, + confirmed: bool = False, + nl_query: str | None = None, + user: str = "system", + ) -> IntentRunResult: + params = params or {} + template = self.registry.get(intent_name) + sanitized = sanitize_params(template, params) + + # 1) Build per-host previews (command + risk after rendering). + previews: list[CommandPreview] = [] + rendered: dict[str, tuple[str, str]] = {} # host -> (os_family, command) + + for host in hosts: + connector = get_connector(host) + async with connector as conn: + facts = await probe_facts(host, conn) + os_family = facts.get("os_family") or OSFamily.UNKNOWN.value + impl = template.implementation_for(os_family) + if impl is None: + raise ValueError( + f"Intent '{intent_name}' has no implementation for OS family " + f"'{os_family}' (host {host.name})" + ) + cmd = _jinja.from_string(impl.command).render(**sanitized) + rendered[host.name] = (os_family, cmd) + previews.append( + CommandPreview( + host=host.name, + os_family=os_family, + command=cmd, + risk_level=classify_risk( + template, + cmd, + self._destructive_keywords(), + ), + ) + ) + + # 2) Apply risk gate. Use the highest level across hosts. + effective_risk = max((p.risk_level for p in previews), key=_risk_order, default=RiskLevel.READ) + + if effective_risk == RiskLevel.DESTRUCTIVE and not self.settings.allow_destructive: + raise ConfirmationRequired( + f"Intent '{intent_name}' classified as DESTRUCTIVE and OPS_ALLOW_DESTRUCTIVE is false.", + previews, + ) + if effective_risk != RiskLevel.READ and not confirmed and not ( + effective_risk == RiskLevel.WRITE and self.settings.auto_execute_write + ): + raise ConfirmationRequired( + f"Intent '{intent_name}' is {effective_risk.value}; confirmation required.", + previews, + ) + + # 3) Execute and collect results per host. + result = IntentRunResult(intent=intent_name, risk_level=effective_risk) + for host in hosts: + os_family, cmd = rendered[host.name] + exec_record = HostExecution(host=host.name, os_family=os_family, command=cmd) + try: + connector = get_connector(host) + async with connector as conn: + exec_record.result = await conn.run( + cmd, + timeout=self.runtime.get("default_timeout_seconds", 30), + ) + except Exception as e: # noqa: BLE001 — record then continue to next host + exec_record.error = str(e) + result.executions.append(exec_record) + + # 4) Audit log. + await self.audit.record( + user=user, + intent=intent_name, + risk_level=effective_risk.value, + hosts=[h.name for h in hosts], + params=sanitized, + nl_query=nl_query, + executions=[ + { + "host": e.host, + "os_family": e.os_family, + "command": e.command, + "exit_code": e.result.exit_code if e.result else None, + "error": e.error, + "duration_ms": e.result.duration_ms if e.result else None, + } + for e in result.executions + ], + ) + + return result + + def _destructive_keywords(self) -> list[str]: + policy = self.runtime.get("risk_policy", {}) or {} + return policy.get("destructive_keywords", []) or [] + + +_RISK_RANK = {RiskLevel.READ: 0, RiskLevel.WRITE: 1, RiskLevel.DESTRUCTIVE: 2} + + +def _risk_order(r: RiskLevel) -> int: + return _RISK_RANK[r] diff --git a/src/ops_tools/intents/__init__.py b/src/ops_tools/intents/__init__.py new file mode 100644 index 0000000..4098489 --- /dev/null +++ b/src/ops_tools/intents/__init__.py @@ -0,0 +1,11 @@ +from .models import IntentTemplate, IntentParam, RiskLevel, Implementation +from .registry import IntentRegistry, load_intent_registry + +__all__ = [ + "IntentTemplate", + "IntentParam", + "RiskLevel", + "Implementation", + "IntentRegistry", + "load_intent_registry", +] diff --git a/src/ops_tools/intents/models.py b/src/ops_tools/intents/models.py new file mode 100644 index 0000000..a2cc337 --- /dev/null +++ b/src/ops_tools/intents/models.py @@ -0,0 +1,45 @@ +"""Intent template data models.""" +from __future__ import annotations + +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + + +class RiskLevel(str, Enum): + READ = "READ" # 自动执行 + WRITE = "WRITE" # 需用户确认 + DESTRUCTIVE = "DESTRUCTIVE" # 默认禁用 + 二次确认 + + +class IntentParam(BaseModel): + name: str + type: str = "string" # string | integer | boolean + required: bool = True + default: Any = None + description: str | None = None + + +class Implementation(BaseModel): + """How an intent is realized on a specific OS family / variant. + + The key in IntentTemplate.implementations identifies the OS — typically one of + `linux`, `windows`, `darwin`, `aix`. More specific keys like `linux_systemd` + or `ubuntu` are reserved for the future when we need finer routing. + """ + + command: str + parser: str | None = None # 可选:输出后处理器名称 + shell: str | None = None # 显式指定 shell(如 powershell / bash) + + +class IntentTemplate(BaseModel): + intent: str + description: str + risk_level: RiskLevel = RiskLevel.READ + params: list[IntentParam] = Field(default_factory=list) + implementations: dict[str, Implementation] + + def implementation_for(self, os_family: str) -> Implementation | None: + return self.implementations.get(os_family) diff --git a/src/ops_tools/intents/registry.py b/src/ops_tools/intents/registry.py new file mode 100644 index 0000000..0a07cc8 --- /dev/null +++ b/src/ops_tools/intents/registry.py @@ -0,0 +1,46 @@ +"""Load all *.yaml files under templates/ into an IntentRegistry.""" +from __future__ import annotations + +from pathlib import Path + +import yaml + +from .models import IntentTemplate + + +class IntentRegistry: + def __init__(self, intents: dict[str, IntentTemplate]): + self._intents = intents + + def get(self, name: str) -> IntentTemplate: + if name not in self._intents: + raise KeyError(f"Unknown intent: {name}") + return self._intents[name] + + def all(self) -> list[IntentTemplate]: + return list(self._intents.values()) + + def names(self) -> list[str]: + return list(self._intents.keys()) + + def __contains__(self, name: str) -> bool: + return name in self._intents + + +def load_intent_registry(templates_dir: str | Path) -> IntentRegistry: + base = Path(templates_dir) + intents: dict[str, IntentTemplate] = {} + + if not base.exists(): + return IntentRegistry(intents) + + for yml in sorted(base.glob("*.yaml")): + with yml.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + for spec in data.get("intents", []): + tpl = IntentTemplate(**spec) + if tpl.intent in intents: + raise ValueError(f"Duplicate intent name: {tpl.intent} (in {yml})") + intents[tpl.intent] = tpl + + return IntentRegistry(intents) diff --git a/src/ops_tools/inventory/__init__.py b/src/ops_tools/inventory/__init__.py new file mode 100644 index 0000000..ae72ea1 --- /dev/null +++ b/src/ops_tools/inventory/__init__.py @@ -0,0 +1,4 @@ +from .models import Host, Inventory, OSFamily, ConnectionType +from .loader import load_inventory + +__all__ = ["Host", "Inventory", "OSFamily", "ConnectionType", "load_inventory"] diff --git a/src/ops_tools/inventory/facts.py b/src/ops_tools/inventory/facts.py new file mode 100644 index 0000000..8b6853d --- /dev/null +++ b/src/ops_tools/inventory/facts.py @@ -0,0 +1,63 @@ +"""Probe OS facts on first connect; cache in-memory so we only run uname once.""" +from __future__ import annotations + +from .models import Host, OSFamily + +# In-memory cache. For production, persist to DB so facts survive restarts. +_FACTS_CACHE: dict[str, dict] = {} + + +def get_cached_facts(host: Host) -> dict | None: + return _FACTS_CACHE.get(host.name) + + +def set_cached_facts(host: Host, facts: dict) -> None: + _FACTS_CACHE[host.name] = facts + + +async def probe_facts(host: Host, connector) -> dict: + """Run a tiny shell snippet to discover OS family + distribution. + + `connector` is an already-open connector instance for this host. + Returns a facts dict and caches it. + """ + cached = get_cached_facts(host) + if cached: + return cached + + facts: dict = {"os_family": host.os_family.value} + + # If we already know it from inventory, trust it. + if host.os_family != OSFamily.UNKNOWN: + set_cached_facts(host, facts) + return facts + + # Heuristic: try Linux/Unix first, then Windows. + # Note: on Windows hosts with Git-Bash installed, `uname -s` may return + # MINGW/MSYS/CYGWIN — those still mean the underlying OS is Windows. + try: + res = await connector.run("uname -s", timeout=5) + if res.exit_code == 0 and res.stdout.strip(): + kernel = res.stdout.strip().lower() + if any(tok in kernel for tok in ("mingw", "msys", "cygwin")): + facts["os_family"] = OSFamily.WINDOWS.value + elif "linux" in kernel: + facts["os_family"] = OSFamily.LINUX.value + elif "darwin" in kernel: + facts["os_family"] = OSFamily.DARWIN.value + elif "aix" in kernel: + facts["os_family"] = OSFamily.AIX.value + + if facts["os_family"] == OSFamily.UNKNOWN.value: + # Try Windows PowerShell as last resort. + res = await connector.run( + "(Get-CimInstance Win32_OperatingSystem).Caption", timeout=5 + ) + if res.exit_code == 0 and res.stdout.strip(): + facts["os_family"] = OSFamily.WINDOWS.value + facts["os_caption"] = res.stdout.strip() + except Exception as e: # noqa: BLE001 — probing is best-effort + facts["probe_error"] = str(e) + + set_cached_facts(host, facts) + return facts diff --git a/src/ops_tools/inventory/loader.py b/src/ops_tools/inventory/loader.py new file mode 100644 index 0000000..5ea2713 --- /dev/null +++ b/src/ops_tools/inventory/loader.py @@ -0,0 +1,47 @@ +"""Load inventory.yaml with ${ENV_VAR} interpolation for secrets.""" +from __future__ import annotations + +import os +import re +from pathlib import Path + +import yaml + +from .models import Host, Inventory + +_ENV_RE = re.compile(r"\$\{([A-Z0-9_]+)\}") + + +def _interp(value): + """Recursively replace ${ENV_VAR} placeholders with environment values.""" + if isinstance(value, str): + return _ENV_RE.sub(lambda m: os.getenv(m.group(1), ""), value) + if isinstance(value, dict): + return {k: _interp(v) for k, v in value.items()} + if isinstance(value, list): + return [_interp(v) for v in value] + return value + + +def load_inventory(path: str | Path) -> Inventory: + p = Path(path) + if not p.exists(): + # Fall back to the example file so the framework boots on a fresh checkout. + example = p.with_name("inventory.example.yaml") + if example.exists(): + p = example + else: + return Inventory() + + with p.open("r", encoding="utf-8") as f: + raw = yaml.safe_load(f) or {} + + raw = _interp(raw) + hosts_dict = raw.get("hosts") or {} + hosts: dict[str, Host] = {} + for name, spec in hosts_dict.items(): + spec = dict(spec) + spec["name"] = name + hosts[name] = Host(**spec) + + return Inventory(hosts=hosts, groups=raw.get("groups") or {}) diff --git a/src/ops_tools/inventory/models.py b/src/ops_tools/inventory/models.py new file mode 100644 index 0000000..357c7e3 --- /dev/null +++ b/src/ops_tools/inventory/models.py @@ -0,0 +1,61 @@ +"""Inventory data models — hosts, groups, auth metadata.""" +from __future__ import annotations + +from enum import Enum +from typing import Literal + +from pydantic import BaseModel, Field + + +class OSFamily(str, Enum): + LINUX = "linux" + WINDOWS = "windows" + DARWIN = "darwin" + AIX = "aix" + UNKNOWN = "unknown" + + +class ConnectionType(str, Enum): + SSH = "ssh" + WINRM = "winrm" + LOCAL = "local" + + +class AuthSpec(BaseModel): + type: Literal["key", "password"] = "key" + key_path: str | None = None + password: str | None = None # supports ${ENV_VAR} interpolation at load time + + +class Host(BaseModel): + name: str + address: str + port: int | None = None + os_family: OSFamily = OSFamily.UNKNOWN + os_distribution: str | None = None + connection: ConnectionType = ConnectionType.SSH + user: str | None = None + auth: AuthSpec | None = None + tags: list[str] = Field(default_factory=list) + + +class Inventory(BaseModel): + hosts: dict[str, Host] = Field(default_factory=dict) + groups: dict[str, list[str]] = Field(default_factory=dict) + + def get(self, name: str) -> Host: + if name not in self.hosts: + raise KeyError(f"Host not found in inventory: {name}") + return self.hosts[name] + + def resolve(self, selector: str) -> list[Host]: + """Resolve a selector that may be a host name, group name, or tag.""" + if selector in self.hosts: + return [self.hosts[selector]] + if selector in self.groups: + return [self.hosts[n] for n in self.groups[selector] if n in self.hosts] + # tag-based + matched = [h for h in self.hosts.values() if selector in h.tags] + if matched: + return matched + raise KeyError(f"Selector matches no host/group/tag: {selector}") diff --git a/src/ops_tools/main.py b/src/ops_tools/main.py new file mode 100644 index 0000000..db000c4 --- /dev/null +++ b/src/ops_tools/main.py @@ -0,0 +1,43 @@ +"""FastAPI app factory.""" +from __future__ import annotations + +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from ops_tools.api.routes import build_router +from ops_tools.audit.store import AuditStore, init_db +from ops_tools.config import get_settings +from ops_tools.executor.runner import IntentRunner +from ops_tools.intents.registry import load_intent_registry +from ops_tools.inventory.loader import load_inventory + + +def _wire(): + settings = get_settings() + inventory = load_inventory(settings.resolve(settings.inventory_path)) + registry = load_intent_registry(settings.resolve(settings.templates_dir)) + runner = IntentRunner(registry, AuditStore()) + return inventory, registry, runner + + +@asynccontextmanager +async def lifespan(app: FastAPI): + await init_db() + yield + + +app = FastAPI( + title="ai-app-ops-tools", + description="AI-powered natural language ops terminal", + version="0.1.0", + lifespan=lifespan, +) + +_inventory, _registry, _runner = _wire() +app.include_router(build_router(_inventory, _registry, _runner), prefix="/api/v1") + + +@app.get("/health") +def health(): + return {"status": "ok"} diff --git a/templates/disk.yaml b/templates/disk.yaml new file mode 100644 index 0000000..d2aeff4 --- /dev/null +++ b/templates/disk.yaml @@ -0,0 +1,27 @@ +# 磁盘相关意图模板 +intents: + - intent: check_disk_usage + description: 查看主机各分区磁盘使用率 + risk_level: READ + params: [] + implementations: + linux: + command: "df -h --output=source,size,used,avail,pcent,target" + darwin: + command: "df -h" + windows: + command: "Get-PSDrive -PSProvider FileSystem | Select-Object Name,@{N='UsedGB';E={[math]::Round($_.Used/1GB,1)}},@{N='FreeGB';E={[math]::Round($_.Free/1GB,1)}} | ConvertTo-Json" + + - intent: check_largest_dirs + description: 在指定路径下找出占用最大的目录(Top 10) + risk_level: READ + params: + - name: path + type: string + required: true + description: 要扫描的根路径,如 /var + implementations: + linux: + command: "du -h --max-depth=1 {{path}} 2>/dev/null | sort -hr | head -n 10" + windows: + command: "Get-ChildItem -Path '{{path}}' -Directory | ForEach-Object { [PSCustomObject]@{ Path=$_.FullName; SizeMB=[math]::Round(((Get-ChildItem $_.FullName -Recurse -ErrorAction SilentlyContinue | Measure-Object -Property Length -Sum).Sum)/1MB,1) } } | Sort-Object SizeMB -Descending | Select-Object -First 10 | ConvertTo-Json" diff --git a/templates/service.yaml b/templates/service.yaml new file mode 100644 index 0000000..abe2a7b --- /dev/null +++ b/templates/service.yaml @@ -0,0 +1,53 @@ +# 服务管理意图(含 WRITE 类,需确认后才会执行) +intents: + - intent: check_service_status + description: 查看服务当前状态 + risk_level: READ + params: + - name: service + type: string + required: true + implementations: + linux: + command: "systemctl status {{service}} --no-pager || service {{service}} status" + windows: + command: "Get-Service -Name '{{service}}' | Select-Object Name,Status,StartType | ConvertTo-Json" + + - intent: restart_service + description: 重启指定服务 + risk_level: WRITE + params: + - name: service + type: string + required: true + implementations: + linux: + command: "sudo systemctl restart {{service}}" + windows: + command: "Restart-Service -Name '{{service}}' -Force" + + - intent: stop_service + description: 停止指定服务 + risk_level: WRITE + params: + - name: service + type: string + required: true + implementations: + linux: + command: "sudo systemctl stop {{service}}" + windows: + command: "Stop-Service -Name '{{service}}' -Force" + + - intent: start_service + description: 启动指定服务 + risk_level: WRITE + params: + - name: service + type: string + required: true + implementations: + linux: + command: "sudo systemctl start {{service}}" + windows: + command: "Start-Service -Name '{{service}}'" diff --git a/templates/system.yaml b/templates/system.yaml new file mode 100644 index 0000000..4bbfd5e --- /dev/null +++ b/templates/system.yaml @@ -0,0 +1,53 @@ +# 系统资源与进程相关意图 +intents: + - intent: check_os_info + description: 查看操作系统版本与内核信息(用于 facts 探测和人工核对) + risk_level: READ + params: [] + implementations: + linux: + command: "uname -a && cat /etc/os-release 2>/dev/null || true" + darwin: + command: "uname -a && sw_vers" + windows: + command: "Get-CimInstance Win32_OperatingSystem | Select-Object Caption,Version,BuildNumber,OSArchitecture | ConvertTo-Json" + + - intent: check_memory + description: 查看内存使用情况 + risk_level: READ + params: [] + implementations: + linux: + command: "free -h" + darwin: + command: "vm_stat" + windows: + command: "Get-CimInstance Win32_OperatingSystem | Select-Object @{N='TotalGB';E={[math]::Round($_.TotalVisibleMemorySize/1MB,2)}},@{N='FreeGB';E={[math]::Round($_.FreePhysicalMemory/1MB,2)}} | ConvertTo-Json" + + - intent: top_cpu_processes + description: 查看 CPU 占用最高的 N 个进程 + risk_level: READ + params: + - name: top_n + type: integer + required: false + default: 10 + implementations: + linux: + command: "ps -eo pid,user,%cpu,%mem,comm --sort=-%cpu | head -n {{top_n + 1}}" + darwin: + command: "ps -eo pid,user,%cpu,%mem,comm -r | head -n {{top_n + 1}}" + windows: + command: "Get-Process | Sort-Object CPU -Descending | Select-Object -First {{top_n}} Id,ProcessName,CPU,WorkingSet | ConvertTo-Json" + + - intent: check_uptime + description: 查看主机已运行时间 + risk_level: READ + params: [] + implementations: + linux: + command: "uptime" + darwin: + command: "uptime" + windows: + command: "(Get-CimInstance Win32_OperatingSystem).LastBootUpTime" diff --git a/tests/test_smoke.py b/tests/test_smoke.py new file mode 100644 index 0000000..abcc6b9 --- /dev/null +++ b/tests/test_smoke.py @@ -0,0 +1,46 @@ +"""Smoke tests: framework loads, registry parses, local connector executes.""" +from __future__ import annotations + +import asyncio +import platform + +import pytest + +from ops_tools.config import PROJECT_ROOT, get_settings +from ops_tools.connectors.local import LocalConnector +from ops_tools.inventory.loader import load_inventory +from ops_tools.inventory.models import ConnectionType, Host, OSFamily +from ops_tools.intents.registry import load_intent_registry + + +def test_inventory_loads_example(): + settings = get_settings() + inv = load_inventory(settings.resolve(settings.inventory_path)) + # The example file ships with localhost; loader falls back to it if no real + # inventory.yaml exists. + assert "localhost" in inv.hosts + assert inv.hosts["localhost"].connection == ConnectionType.LOCAL + + +def test_intent_registry_has_disk_check(): + reg = load_intent_registry(PROJECT_ROOT / "templates") + assert "check_disk_usage" in reg + intent = reg.get("check_disk_usage") + assert "linux" in intent.implementations + assert "windows" in intent.implementations + + +@pytest.mark.asyncio +async def test_local_connector_runs(): + host = Host( + name="localhost", + address="127.0.0.1", + connection=ConnectionType.LOCAL, + os_family=(OSFamily.WINDOWS if platform.system().lower() == "windows" else OSFamily.LINUX), + ) + async with LocalConnector(host) as conn: + # Pick a command that exists on both Windows and POSIX. + cmd = "Get-Date" if platform.system().lower() == "windows" else "echo hello" + result = await conn.run(cmd, timeout=10) + assert result.exit_code == 0 + assert result.stdout.strip() != ""