feat: initial framework for AI-powered ops terminal

Scaffold an MVP of the natural-language ops terminal: inventory + intent
template registry, SSH/WinRM/local connectors, risk-gated executor with
SQLite audit log, Claude-driven agent layer using Function Calling, plus a
Typer CLI and FastAPI surface.

Includes 10 cross-OS intents (disk/system/service) and example inventory.
Verified end-to-end on the local Windows host: hosts/intents listing,
check_disk_usage execution, and WRITE-class confirmation gating.

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
This commit is contained in:
2026-05-21 11:01:43 +09:00
commit dc2d2acc82
38 changed files with 1882 additions and 0 deletions
+25
View File
@@ -0,0 +1,25 @@
# Anthropic API
ANTHROPIC_API_KEY=sk-ant-xxx
# Model selection
OPS_MODEL_MAIN=claude-sonnet-4-6
OPS_MODEL_FAST=claude-haiku-4-5-20251001
# Storage
OPS_DB_URL=sqlite+aiosqlite:///./data/ops.db
# Paths (relative to project root)
OPS_INVENTORY_PATH=config/inventory.yaml
OPS_TEMPLATES_DIR=templates
OPS_APPS_DIR=apps
OPS_PLAYBOOKS_DIR=playbooks
# Safety
# Whether WRITE-class intents auto-execute (default false: require confirm)
OPS_AUTO_EXECUTE_WRITE=false
# Whether DESTRUCTIVE intents are allowed at all
OPS_ALLOW_DESTRUCTIVE=false
# Server
OPS_HOST=127.0.0.1
OPS_PORT=8000
+30
View File
@@ -0,0 +1,30 @@
# Python
__pycache__/
*.py[cod]
*.egg-info/
.venv/
venv/
.python-version
# Environment
.env
.env.local
# Editors / OS / Tools
.vscode/
.idea/
.DS_Store
Thumbs.db
.claude/
# Project data
data/
*.db
*.sqlite
*.sqlite3
audit.log
logs/
# Inventory secrets (only ship the example)
config/inventory.yaml
!config/inventory.example.yaml
+2
View File
@@ -0,0 +1,2 @@
# 预留目录:未来用于存放应用台账 (L3 应用层)
# 例如 apps/order-service.yaml 描述某个应用的部署元数据
+49
View File
@@ -0,0 +1,49 @@
# 主机清单示例 —— 复制为 inventory.yaml 后填入真实信息
# Inventory.yaml is gitignored to avoid leaking credentials.
hosts:
# ───── Linux 示例 ─────
app-prod-01:
address: 10.0.1.10
port: 22
os_family: linux # 可省略,首次连接会自动探测
os_distribution: ubuntu # 可选
connection: ssh
user: ops
auth:
type: key # key | password
key_path: ~/.ssh/id_rsa
tags: [prod, app]
db-prod-01:
address: 10.0.1.20
connection: ssh
user: ops
auth:
type: password
password: "${SECRET_DB_PROD_01}" # 支持环境变量插值
tags: [prod, db]
# ───── Windows 示例 ─────
win-app-01:
address: 10.0.2.10
port: 5985
os_family: windows
connection: winrm
user: Administrator
auth:
type: password
password: "${SECRET_WIN_APP_01}"
tags: [prod, app]
# ───── 本地测试 ─────
localhost:
address: 127.0.0.1
connection: local # 直接调用本机 shell,方便联调
tags: [dev]
groups:
prod: [app-prod-01, db-prod-01, win-app-01]
linux: [app-prod-01, db-prod-01]
windows: [win-app-01]
dev: [localhost]
+22
View File
@@ -0,0 +1,22 @@
# 全局运行时设置
default_timeout_seconds: 30
ssh:
known_hosts: null # 设为 null 表示不校验 known_hosts(实验环境);生产建议指定路径
connect_timeout: 10
winrm:
transport: ntlm # ntlm | basic | kerberos
server_cert_validation: ignore # 实验环境;生产请用 validate
risk_policy:
read_auto: true # READ 类是否自动执行
write_require_confirm: true
destructive_default_block: true
destructive_keywords: # 命令文本中出现这些词时升级为 DESTRUCTIVE
- "rm -rf"
- "mkfs"
- "format "
- "dd if="
- "shutdown"
- "reboot"
- "Remove-Item -Recurse"
- "Format-Volume"
+2
View File
@@ -0,0 +1,2 @@
# 预留目录:未来用于存放部署/编排剧本 (L3-L4)
# 例如 playbooks/deploy_java_jar.yaml 描述一次部署的有序步骤
+46
View File
@@ -0,0 +1,46 @@
[project]
name = "ai-app-ops-tools"
version = "0.1.0"
description = "AI-powered natural language ops terminal for heterogeneous servers"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.110",
"uvicorn[standard]>=0.27",
"pydantic>=2.6",
"pydantic-settings>=2.2",
"PyYAML>=6.0",
"Jinja2>=3.1",
"asyncssh>=2.14",
"pywinrm>=0.4.3",
"sqlalchemy[asyncio]>=2.0",
"aiosqlite>=0.19",
"anthropic>=0.40",
"typer>=0.12",
"python-dotenv>=1.0",
"rich>=13.7",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"ruff>=0.3",
]
[project.scripts]
ops = "ops_tools.cli:app"
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
+3
View File
@@ -0,0 +1,3 @@
"""ai-app-ops-tools — natural-language ops terminal for heterogeneous servers."""
__version__ = "0.1.0"
+3
View File
@@ -0,0 +1,3 @@
from .agent import OpsAgent, AgentTurn
__all__ = ["OpsAgent", "AgentTurn"]
+169
View File
@@ -0,0 +1,169 @@
"""Claude-powered conversation loop that dispatches to IntentRunner."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from anthropic import AsyncAnthropic
from ops_tools.config import get_settings
from ops_tools.executor.runner import ConfirmationRequired, IntentRunner
from ops_tools.inventory.models import Inventory
from .tools import SYSTEM_PROMPT, build_tools
@dataclass
class AgentTurn:
"""One round-trip with the model, including any tool calls + final text."""
user_message: str
assistant_text: str = ""
tool_calls: list[dict] = field(default_factory=list)
class OpsAgent:
def __init__(self, inventory: Inventory, runner: IntentRunner):
self.inventory = inventory
self.runner = runner
self.settings = get_settings()
self.client = AsyncAnthropic(api_key=self.settings.anthropic_api_key)
self.tools = build_tools(runner.registry)
# The full message history (multi-turn aware).
self.messages: list[dict] = []
async def chat(self, user_message: str, max_tool_loops: int = 6) -> AgentTurn:
turn = AgentTurn(user_message=user_message)
self.messages.append({"role": "user", "content": user_message})
for _ in range(max_tool_loops):
resp = await self.client.messages.create(
model=self.settings.model_main,
max_tokens=2048,
system=SYSTEM_PROMPT,
tools=self.tools,
messages=self.messages,
)
# Collect assistant message exactly as returned so future tool_use
# blocks can be referenced by id.
assistant_blocks: list[dict] = []
text_parts: list[str] = []
tool_uses: list[Any] = []
for block in resp.content:
if block.type == "text":
assistant_blocks.append({"type": "text", "text": block.text})
text_parts.append(block.text)
elif block.type == "tool_use":
assistant_blocks.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
tool_uses.append(block)
self.messages.append({"role": "assistant", "content": assistant_blocks})
turn.assistant_text = "\n".join(text_parts)
if not tool_uses:
return turn
# Execute each tool, attach result to messages.
tool_results: list[dict] = []
for tu in tool_uses:
turn.tool_calls.append({"name": tu.name, "input": tu.input})
result = await self._dispatch_tool(tu.name, tu.input or {})
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tu.id,
"content": result,
}
)
self.messages.append({"role": "user", "content": tool_results})
return turn
async def _dispatch_tool(self, name: str, args: dict) -> str:
try:
if name == "list_hosts":
return _format_hosts(self.inventory, args.get("selector"))
if name == "list_intents":
return _format_intents(self.runner.registry)
if name == "run_intent":
return await self._run_intent(args)
return f"ERROR: unknown tool '{name}'"
except Exception as e: # noqa: BLE001 — surface to model, don't crash loop
return f"ERROR: {type(e).__name__}: {e}"
async def _run_intent(self, args: dict) -> str:
intent = args["intent"]
host_selectors = args["hosts"]
params = args.get("params") or {}
confirmed = bool(args.get("confirmed"))
hosts = []
seen = set()
for sel in host_selectors:
for h in self.inventory.resolve(sel):
if h.name not in seen:
seen.add(h.name)
hosts.append(h)
try:
result = await self.runner.run(
intent, hosts, params, confirmed=confirmed, nl_query=None, user="ai-agent"
)
except ConfirmationRequired as e:
preview = "\n".join(
f" - [{p.host}|{p.os_family}|{p.risk_level.value}] {p.command}"
for p in e.preview
)
return (
f"CONFIRM_REQUIRED: {e}\n"
f"将要执行的命令预览:\n{preview}\n"
f"如用户确认,请用 confirmed=true 再次调用 run_intent。"
)
lines = [f"intent={result.intent} risk={result.risk_level.value}"]
for ex in result.executions:
if ex.error:
lines.append(f" [{ex.host}] ERROR: {ex.error}")
else:
r = ex.result
lines.append(
f" [{ex.host}|{ex.os_family}] exit={r.exit_code} "
f"duration={r.duration_ms}ms"
)
if r.stdout:
lines.append(f" stdout:\n{_indent(r.stdout, 6)}")
if r.stderr.strip():
lines.append(f" stderr:\n{_indent(r.stderr, 6)}")
return "\n".join(lines)
def _format_hosts(inv: Inventory, selector: str | None) -> str:
hosts = list(inv.hosts.values()) if not selector else inv.resolve(selector)
if not hosts:
return "(no hosts)"
return "\n".join(
f"- {h.name} address={h.address} os={h.os_family.value} "
f"conn={h.connection.value} tags={h.tags}"
for h in hosts
)
def _format_intents(reg) -> str:
return "\n".join(
f"- {t.intent} [{t.risk_level.value}] {t.description} "
f"(params: {[p.name for p in t.params]})"
for t in reg.all()
)
def _indent(text: str, n: int) -> str:
pad = " " * n
return "\n".join(pad + line for line in text.rstrip().splitlines())
+82
View File
@@ -0,0 +1,82 @@
"""Tool (function-call) definitions exposed to Claude.
We intentionally do NOT give the model a generic "run shell command" tool.
All execution flows through pre-defined intents in the registry. This is the
foundation of safety: the LLM can only compose what we've vetted.
"""
from __future__ import annotations
from ops_tools.intents.registry import IntentRegistry
SYSTEM_PROMPT = """你是一个运维助手,帮助用户用自然语言操作异构服务器(Linux/Windows 等)。
工作方式:
1. 用户描述需求时,先用 list_hosts / list_intents 探查可用资源
2. 然后调用 run_intent 执行已知意图,不要凭空构造命令
3. 如果某个需求没有对应意图,明确告诉用户"暂无该意图",并建议在 templates/ 中新增
4. 执行结果中如果有 risk_level=WRITE 而被拒绝,要把命令预览展示给用户,请用户确认后用 confirmed=true 再次调用
5. 输出结果时,把原始 stdout 翻译成人类易读的摘要
不要:
- 编造主机名、意图名或命令
- 在没有用户明确确认前执行 WRITE/DESTRUCTIVE 类意图
"""
def build_tools(registry: IntentRegistry) -> list[dict]:
"""Return Anthropic-format tool schemas. Intent names are enumerated so the
model gets autocomplete-style guidance instead of free-form strings."""
intent_names = registry.names()
return [
{
"name": "list_hosts",
"description": "列出 inventory 中所有主机,可按 tag/group 过滤。",
"input_schema": {
"type": "object",
"properties": {
"selector": {
"type": "string",
"description": "可选:主机名/组名/tag。不填则返回全部。",
}
},
},
},
{
"name": "list_intents",
"description": "列出已注册的意图(命令模板)及其用途。",
"input_schema": {"type": "object", "properties": {}},
},
{
"name": "run_intent",
"description": (
"在指定主机上执行一个意图。READ 类自动执行;WRITE/DESTRUCTIVE "
"类首次调用会被拒绝并返回命令预览,得到用户确认后用 confirmed=true 重试。"
),
"input_schema": {
"type": "object",
"properties": {
"intent": {
"type": "string",
"enum": intent_names or ["__none__"],
"description": "意图名(必须来自 list_intents",
},
"hosts": {
"type": "array",
"items": {"type": "string"},
"description": "主机名/组名/tag 列表",
},
"params": {
"type": "object",
"description": "意图参数,键值对",
},
"confirmed": {
"type": "boolean",
"description": "用户已确认风险?默认 false",
"default": False,
},
},
"required": ["intent", "hosts"],
},
},
]
View File
+130
View File
@@ -0,0 +1,130 @@
"""HTTP routes — kept minimal: list hosts/intents, run an intent, chat."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from ops_tools.ai.agent import OpsAgent
from ops_tools.audit.store import AuditStore
from ops_tools.executor.runner import ConfirmationRequired, IntentRunner
from ops_tools.intents.registry import IntentRegistry
from ops_tools.inventory.models import Inventory
def build_router(
inventory: Inventory,
registry: IntentRegistry,
runner: IntentRunner,
) -> APIRouter:
router = APIRouter()
@router.get("/hosts")
def hosts():
return [
{
"name": h.name,
"address": h.address,
"os_family": h.os_family.value,
"connection": h.connection.value,
"tags": h.tags,
}
for h in inventory.hosts.values()
]
@router.get("/intents")
def intents():
return [
{
"intent": t.intent,
"description": t.description,
"risk_level": t.risk_level.value,
"params": [p.model_dump() for p in t.params],
"os_families": list(t.implementations.keys()),
}
for t in registry.all()
]
class RunBody(BaseModel):
intent: str
hosts: list[str]
params: dict[str, Any] = Field(default_factory=dict)
confirmed: bool = False
user: str = "api"
@router.post("/run")
async def run(body: RunBody):
host_objs = []
seen = set()
for sel in body.hosts:
try:
for h in inventory.resolve(sel):
if h.name not in seen:
seen.add(h.name)
host_objs.append(h)
except KeyError as e:
raise HTTPException(status_code=404, detail=str(e))
try:
result = await runner.run(
body.intent,
host_objs,
body.params,
confirmed=body.confirmed,
user=body.user,
)
except ConfirmationRequired as e:
return {
"status": "confirmation_required",
"message": str(e),
"preview": [
{
"host": p.host,
"os_family": p.os_family,
"command": p.command,
"risk_level": p.risk_level.value,
}
for p in e.preview
],
}
except KeyError as e:
raise HTTPException(status_code=404, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "ok",
"intent": result.intent,
"risk_level": result.risk_level.value,
"executions": [
{
"host": ex.host,
"os_family": ex.os_family,
"command": ex.command,
"exit_code": ex.result.exit_code if ex.result else None,
"stdout": ex.result.stdout if ex.result else None,
"stderr": ex.result.stderr if ex.result else None,
"duration_ms": ex.result.duration_ms if ex.result else None,
"error": ex.error,
}
for ex in result.executions
],
}
class ChatBody(BaseModel):
message: str
# Naive single-agent-per-process state. Real impl: session_id -> agent.
_agent: dict = {"instance": None}
@router.post("/chat")
async def chat(body: ChatBody):
agent = _agent["instance"] or OpsAgent(inventory, runner)
_agent["instance"] = agent
turn = await agent.chat(body.message)
return {
"assistant": turn.assistant_text,
"tool_calls": turn.tool_calls,
}
return router
+3
View File
@@ -0,0 +1,3 @@
from .store import AuditStore, init_db
__all__ = ["AuditStore", "init_db"]
+88
View File
@@ -0,0 +1,88 @@
"""Persistent audit log.
A single executions table is enough for L1/L2. When L3 playbooks land, add a
`playbook_runs` table and let `executions` reference it via a nullable FK.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import JSON, DateTime, Integer, String, Text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ops_tools.config import get_settings
class Base(DeclarativeBase):
pass
class AuditRecord(Base):
__tablename__ = "audit_records"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True))
user: Mapped[str] = mapped_column(String(64))
intent: Mapped[str] = mapped_column(String(128))
risk_level: Mapped[str] = mapped_column(String(16))
hosts: Mapped[str] = mapped_column(Text) # JSON array
params: Mapped[str] = mapped_column(Text) # JSON object
nl_query: Mapped[str | None] = mapped_column(Text, nullable=True)
executions: Mapped[dict] = mapped_column(JSON) # list[dict]
_engine = None
_session_maker: async_sessionmaker[AsyncSession] | None = None
async def init_db() -> None:
global _engine, _session_maker
if _engine is not None:
return
settings = get_settings()
# Make sure the data/ directory exists for SQLite URLs.
if settings.db_url.startswith("sqlite"):
db_file = settings.db_url.split("///")[-1]
Path(db_file).parent.mkdir(parents=True, exist_ok=True)
_engine = create_async_engine(settings.db_url, echo=False, future=True)
_session_maker = async_sessionmaker(_engine, expire_on_commit=False)
async with _engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
class AuditStore:
async def record(
self,
*,
user: str,
intent: str,
risk_level: str,
hosts: list[str],
params: dict,
executions: list[dict],
nl_query: str | None = None,
) -> None:
if _session_maker is None:
await init_db()
assert _session_maker is not None
async with _session_maker() as s:
s.add(
AuditRecord(
timestamp=datetime.now(timezone.utc),
user=user,
intent=intent,
risk_level=risk_level,
hosts=json.dumps(hosts),
params=json.dumps(params, default=str),
nl_query=nl_query,
executions=executions,
)
)
await s.commit()
+144
View File
@@ -0,0 +1,144 @@
"""Typer-based CLI — fastest way to drive the framework end-to-end."""
from __future__ import annotations
import asyncio
import json
import typer
from rich.console import Console
from rich.table import Table
from ops_tools.ai.agent import OpsAgent
from ops_tools.audit.store import AuditStore, init_db
from ops_tools.config import get_settings
from ops_tools.executor.runner import ConfirmationRequired, IntentRunner
from ops_tools.inventory.loader import load_inventory
from ops_tools.intents.registry import load_intent_registry
app = typer.Typer(help="ai-app-ops-tools CLI")
console = Console()
def _bootstrap():
settings = get_settings()
inventory = load_inventory(settings.resolve(settings.inventory_path))
registry = load_intent_registry(settings.resolve(settings.templates_dir))
return settings, inventory, registry
@app.command()
def hosts():
"""List hosts in inventory."""
_, inv, _ = _bootstrap()
table = Table(title="Inventory")
for col in ("name", "address", "os_family", "connection", "tags"):
table.add_column(col)
for h in inv.hosts.values():
table.add_row(
h.name, h.address, h.os_family.value, h.connection.value, ",".join(h.tags)
)
console.print(table)
@app.command()
def intents():
"""List registered intents."""
_, _, reg = _bootstrap()
table = Table(title="Intents")
for col in ("intent", "risk", "description", "params"):
table.add_column(col)
for t in reg.all():
table.add_row(
t.intent,
t.risk_level.value,
t.description,
",".join(p.name for p in t.params),
)
console.print(table)
@app.command()
def run(
intent: str = typer.Argument(..., help="Intent name"),
target: str = typer.Argument(..., help="Host / group / tag selector"),
params: str = typer.Option("{}", "--params", "-p", help="JSON object of params"),
confirm: bool = typer.Option(False, "--confirm", "-y", help="Approve WRITE-class run"),
):
"""Execute an intent directly without invoking the AI agent."""
asyncio.run(_run(intent, target, params, confirm))
async def _run(intent: str, target: str, params_json: str, confirmed: bool):
settings, inv, reg = _bootstrap()
await init_db()
runner = IntentRunner(reg, AuditStore())
hosts_resolved = inv.resolve(target)
try:
result = await runner.run(
intent,
hosts_resolved,
json.loads(params_json),
confirmed=confirmed,
user="cli",
)
except ConfirmationRequired as e:
console.print(f"[yellow]{e}[/yellow]")
for p in e.preview:
console.print(
f" [{p.host} | {p.os_family} | {p.risk_level.value}] {p.command}"
)
console.print("\nRe-run with --confirm to execute.")
raise typer.Exit(code=2)
for ex in result.executions:
console.rule(f"[bold]{ex.host}[/bold] ({ex.os_family})")
if ex.error:
console.print(f"[red]ERROR:[/red] {ex.error}")
continue
r = ex.result
console.print(f"exit={r.exit_code} duration={r.duration_ms}ms")
if r.stdout:
console.print(r.stdout)
if r.stderr.strip():
console.print(f"[yellow]stderr:[/yellow]\n{r.stderr}")
@app.command()
def chat(message: str = typer.Argument(..., help="Natural language request")):
"""Single-turn chat with the AI agent."""
asyncio.run(_chat(message))
async def _chat(message: str):
settings, inv, reg = _bootstrap()
if not settings.anthropic_api_key:
console.print("[red]ANTHROPIC_API_KEY 未设置(参考 .env.example[/red]")
raise typer.Exit(code=1)
await init_db()
runner = IntentRunner(reg, AuditStore())
agent = OpsAgent(inv, runner)
turn = await agent.chat(message)
console.rule("[bold]Assistant[/bold]")
console.print(turn.assistant_text)
if turn.tool_calls:
console.rule("[dim]Tool calls[/dim]")
for tc in turn.tool_calls:
console.print(f"{tc['name']}({tc['input']})")
@app.command()
def serve():
"""Start the FastAPI server."""
import uvicorn
settings = get_settings()
uvicorn.run(
"ops_tools.main:app",
host=settings.host,
port=settings.port,
reload=False,
)
if __name__ == "__main__":
app()
+64
View File
@@ -0,0 +1,64 @@
"""Global settings loaded from .env + config/settings.yaml."""
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from typing import Any
import yaml
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
PROJECT_ROOT = Path(__file__).resolve().parents[2]
class Settings(BaseSettings):
"""Environment-driven settings (see .env.example)."""
model_config = SettingsConfigDict(
env_file=PROJECT_ROOT / ".env",
env_file_encoding="utf-8",
extra="ignore",
env_prefix="OPS_",
)
# Anthropic
anthropic_api_key: str = Field(default="", alias="ANTHROPIC_API_KEY")
model_main: str = "claude-sonnet-4-6"
model_fast: str = "claude-haiku-4-5-20251001"
# Storage
db_url: str = "sqlite+aiosqlite:///./data/ops.db"
# Paths
inventory_path: str = "config/inventory.yaml"
templates_dir: str = "templates"
apps_dir: str = "apps"
playbooks_dir: str = "playbooks"
settings_yaml: str = "config/settings.yaml"
# Safety toggles
auto_execute_write: bool = False
allow_destructive: bool = False
# Server
host: str = "127.0.0.1"
port: int = 8000
def resolve(self, relative: str) -> Path:
"""Resolve a path relative to project root."""
p = Path(relative)
return p if p.is_absolute() else PROJECT_ROOT / p
def load_runtime_yaml(self) -> dict[str, Any]:
"""Load config/settings.yaml (non-secret runtime tuning)."""
path = self.resolve(self.settings_yaml)
if not path.exists():
return {}
with path.open("r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()
+4
View File
@@ -0,0 +1,4 @@
from .base import BaseConnector, ExecResult, ConnectorError
from .factory import get_connector
__all__ = ["BaseConnector", "ExecResult", "ConnectorError", "get_connector"]
+40
View File
@@ -0,0 +1,40 @@
"""Abstract connector contract.
All transport-specific connectors (SSH, WinRM, local) implement BaseConnector
and return a uniform ExecResult, so upper layers don't care about transport.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from ops_tools.inventory.models import Host
class ConnectorError(Exception):
"""Raised for connection/transport failures (auth, network, timeout)."""
@dataclass
class ExecResult:
exit_code: int
stdout: str
stderr: str
duration_ms: int
class BaseConnector(ABC):
"""One connector instance is bound to one host. Use as an async context manager."""
def __init__(self, host: Host):
self.host = host
@abstractmethod
async def __aenter__(self) -> "BaseConnector": ...
@abstractmethod
async def __aexit__(self, exc_type, exc, tb) -> None: ...
@abstractmethod
async def run(self, command: str, timeout: int = 30) -> ExecResult:
"""Execute a single command and capture stdout/stderr."""
+35
View File
@@ -0,0 +1,35 @@
"""Pick a connector implementation based on the host's connection type."""
from __future__ import annotations
from ops_tools.config import get_settings
from ops_tools.inventory.models import ConnectionType, Host
from .base import BaseConnector
from .local import LocalConnector
from .ssh import SSHConnector
from .winrm import WinRMConnector
def get_connector(host: Host) -> BaseConnector:
"""Return a (not yet opened) connector instance for the given host."""
runtime = get_settings().load_runtime_yaml()
ssh_cfg = runtime.get("ssh", {}) or {}
winrm_cfg = runtime.get("winrm", {}) or {}
match host.connection:
case ConnectionType.SSH:
return SSHConnector(
host,
known_hosts=ssh_cfg.get("known_hosts"),
connect_timeout=ssh_cfg.get("connect_timeout", 10),
)
case ConnectionType.WINRM:
return WinRMConnector(
host,
transport=winrm_cfg.get("transport", "ntlm"),
server_cert_validation=winrm_cfg.get("server_cert_validation", "ignore"),
)
case ConnectionType.LOCAL:
return LocalConnector(host)
case _:
raise ValueError(f"Unsupported connection type: {host.connection}")
+53
View File
@@ -0,0 +1,53 @@
"""Local connector — executes commands on the controller machine.
Useful for development, testing, and managing the controller itself. Picks the
shell based on the controller's OS rather than the Host record so that an
inventory entry of `connection: local` always works.
"""
from __future__ import annotations
import asyncio
import platform
import time
from .base import BaseConnector, ExecResult
class LocalConnector(BaseConnector):
async def __aenter__(self) -> "LocalConnector":
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
async def run(self, command: str, timeout: int = 30) -> ExecResult:
start = time.monotonic()
is_windows = platform.system().lower() == "windows"
if is_windows:
# Use PowerShell on Windows so the same intent templates work locally.
proc = await asyncio.create_subprocess_exec(
"powershell.exe", "-NoProfile", "-Command", command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
else:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise TimeoutError(f"Local command timed out after {timeout}s")
return ExecResult(
exit_code=proc.returncode or 0,
stdout=stdout.decode("utf-8", errors="replace"),
stderr=stderr.decode("utf-8", errors="replace"),
duration_ms=int((time.monotonic() - start) * 1000),
)
+60
View File
@@ -0,0 +1,60 @@
"""SSH connector for Linux/Unix hosts, built on asyncssh."""
from __future__ import annotations
import time
import asyncssh
from ops_tools.inventory.models import Host
from .base import BaseConnector, ConnectorError, ExecResult
class SSHConnector(BaseConnector):
def __init__(self, host: Host, *, known_hosts=None, connect_timeout: int = 10):
super().__init__(host)
self._known_hosts = known_hosts
self._connect_timeout = connect_timeout
self._conn: asyncssh.SSHClientConnection | None = None
async def __aenter__(self) -> "SSHConnector":
h = self.host
auth = h.auth
opts: dict = {
"username": h.user,
"known_hosts": self._known_hosts, # None disables host-key check (dev only)
"connect_timeout": self._connect_timeout,
}
if h.port:
opts["port"] = h.port
if auth and auth.type == "key" and auth.key_path:
opts["client_keys"] = [auth.key_path]
elif auth and auth.type == "password" and auth.password:
opts["password"] = auth.password
try:
self._conn = await asyncssh.connect(h.address, **opts)
except (OSError, asyncssh.Error) as e:
raise ConnectorError(f"SSH connect failed to {h.name}: {e}") from e
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
if self._conn is not None:
self._conn.close()
await self._conn.wait_closed()
self._conn = None
async def run(self, command: str, timeout: int = 30) -> ExecResult:
if self._conn is None:
raise ConnectorError("SSH session not open — use 'async with' context")
start = time.monotonic()
try:
res = await self._conn.run(command, check=False, timeout=timeout)
except asyncssh.Error as e:
raise ConnectorError(f"SSH exec failed: {e}") from e
return ExecResult(
exit_code=res.exit_status if res.exit_status is not None else -1,
stdout=res.stdout or "",
stderr=res.stderr or "",
duration_ms=int((time.monotonic() - start) * 1000),
)
+80
View File
@@ -0,0 +1,80 @@
"""WinRM connector for Windows hosts.
pywinrm is synchronous; we run blocking calls in a thread executor so the rest
of the system stays async.
"""
from __future__ import annotations
import asyncio
import time
from ops_tools.inventory.models import Host
from .base import BaseConnector, ConnectorError, ExecResult
class WinRMConnector(BaseConnector):
def __init__(
self,
host: Host,
*,
transport: str = "ntlm",
server_cert_validation: str = "ignore",
):
super().__init__(host)
self._transport = transport
self._server_cert_validation = server_cert_validation
self._session = None
async def __aenter__(self) -> "WinRMConnector":
try:
import winrm # imported lazily so the package is optional in dev
except ImportError as e:
raise ConnectorError(
"pywinrm not installed — `pip install pywinrm` to use WinRM"
) from e
h = self.host
if not h.user or not h.auth or not h.auth.password:
raise ConnectorError(f"WinRM host {h.name} requires user + password auth")
endpoint = f"http://{h.address}:{h.port or 5985}/wsman"
loop = asyncio.get_running_loop()
try:
self._session = await loop.run_in_executor(
None,
lambda: winrm.Session(
endpoint,
auth=(h.user, h.auth.password),
transport=self._transport,
server_cert_validation=self._server_cert_validation,
),
)
except Exception as e: # noqa: BLE001 — bubble to connector layer
raise ConnectorError(f"WinRM session init failed for {h.name}: {e}") from e
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
self._session = None # pywinrm Session has no explicit close
async def run(self, command: str, timeout: int = 30) -> ExecResult:
if self._session is None:
raise ConnectorError("WinRM session not open")
start = time.monotonic()
loop = asyncio.get_running_loop()
try:
res = await asyncio.wait_for(
loop.run_in_executor(None, self._session.run_ps, command),
timeout=timeout,
)
except asyncio.TimeoutError:
raise TimeoutError(f"WinRM command timed out after {timeout}s")
except Exception as e: # noqa: BLE001
raise ConnectorError(f"WinRM exec failed: {e}") from e
return ExecResult(
exit_code=res.status_code,
stdout=(res.std_out or b"").decode("utf-8", errors="replace"),
stderr=(res.std_err or b"").decode("utf-8", errors="replace"),
duration_ms=int((time.monotonic() - start) * 1000),
)
+10
View File
@@ -0,0 +1,10 @@
from .runner import IntentRunner, IntentRunResult, ConfirmationRequired
from .risk import classify_risk, sanitize_params
__all__ = [
"IntentRunner",
"IntentRunResult",
"ConfirmationRequired",
"classify_risk",
"sanitize_params",
]
+64
View File
@@ -0,0 +1,64 @@
"""Risk classification and parameter sanitization."""
from __future__ import annotations
import shlex
from typing import Any
from ops_tools.intents.models import IntentTemplate, RiskLevel
def classify_risk(
template: IntentTemplate,
rendered_command: str,
destructive_keywords: list[str],
) -> RiskLevel:
"""Possibly upgrade the template's declared risk level based on the rendered command.
A template author may say WRITE but if user-supplied parameters cause the
command to contain `rm -rf`, we promote it to DESTRUCTIVE.
"""
cmd_lower = rendered_command.lower()
for kw in destructive_keywords or []:
if kw.lower() in cmd_lower:
return RiskLevel.DESTRUCTIVE
return template.risk_level
def sanitize_params(template: IntentTemplate, params: dict[str, Any]) -> dict[str, Any]:
"""Validate against the declared param schema and shell-escape string values.
Notes
-----
Shell-escaping with `shlex.quote` is correct for POSIX shells. For PowerShell
we rely on single-quote wrapping inside templates (see service.yaml). This is
intentionally conservative — better to over-quote than allow injection.
"""
out: dict[str, Any] = {}
declared = {p.name: p for p in template.params}
for name, spec in declared.items():
if name in params:
value = params[name]
elif spec.default is not None:
value = spec.default
elif spec.required:
raise ValueError(f"Missing required param: {name}")
else:
continue
if spec.type == "integer":
value = int(value)
elif spec.type == "boolean":
value = bool(value)
else:
# Strings get shell-quoted (POSIX). PowerShell templates wrap them in
# single quotes themselves; the extra escaping is safe.
value = shlex.quote(str(value)).strip("'")
out[name] = value
# Reject undeclared params to keep the surface tight.
extra = set(params) - set(declared)
if extra:
raise ValueError(f"Unknown params for intent {template.intent}: {extra}")
return out
+175
View File
@@ -0,0 +1,175 @@
"""Run a named intent against one or more hosts, with risk gating + audit."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from jinja2 import Environment, StrictUndefined
from ops_tools.audit.store import AuditStore
from ops_tools.config import get_settings
from ops_tools.connectors import ExecResult, get_connector
from ops_tools.inventory.facts import probe_facts
from ops_tools.inventory.models import Host, OSFamily
from ops_tools.intents.models import IntentTemplate, RiskLevel
from ops_tools.intents.registry import IntentRegistry
from .risk import classify_risk, sanitize_params
_jinja = Environment(undefined=StrictUndefined, autoescape=False, keep_trailing_newline=False)
class ConfirmationRequired(Exception):
"""Raised when a WRITE/DESTRUCTIVE intent is dispatched without `confirmed=True`."""
def __init__(self, message: str, preview: list["CommandPreview"]):
super().__init__(message)
self.preview = preview
@dataclass
class CommandPreview:
host: str
os_family: str
command: str
risk_level: RiskLevel
@dataclass
class HostExecution:
host: str
os_family: str
command: str
result: ExecResult | None = None
error: str | None = None
@dataclass
class IntentRunResult:
intent: str
risk_level: RiskLevel
executions: list[HostExecution] = field(default_factory=list)
class IntentRunner:
"""Top-level entry point used by the CLI, the API, and the AI agent."""
def __init__(
self,
registry: IntentRegistry,
audit: AuditStore,
):
self.registry = registry
self.audit = audit
self.settings = get_settings()
self.runtime = self.settings.load_runtime_yaml()
async def run(
self,
intent_name: str,
hosts: list[Host],
params: dict[str, Any] | None = None,
*,
confirmed: bool = False,
nl_query: str | None = None,
user: str = "system",
) -> IntentRunResult:
params = params or {}
template = self.registry.get(intent_name)
sanitized = sanitize_params(template, params)
# 1) Build per-host previews (command + risk after rendering).
previews: list[CommandPreview] = []
rendered: dict[str, tuple[str, str]] = {} # host -> (os_family, command)
for host in hosts:
connector = get_connector(host)
async with connector as conn:
facts = await probe_facts(host, conn)
os_family = facts.get("os_family") or OSFamily.UNKNOWN.value
impl = template.implementation_for(os_family)
if impl is None:
raise ValueError(
f"Intent '{intent_name}' has no implementation for OS family "
f"'{os_family}' (host {host.name})"
)
cmd = _jinja.from_string(impl.command).render(**sanitized)
rendered[host.name] = (os_family, cmd)
previews.append(
CommandPreview(
host=host.name,
os_family=os_family,
command=cmd,
risk_level=classify_risk(
template,
cmd,
self._destructive_keywords(),
),
)
)
# 2) Apply risk gate. Use the highest level across hosts.
effective_risk = max((p.risk_level for p in previews), key=_risk_order, default=RiskLevel.READ)
if effective_risk == RiskLevel.DESTRUCTIVE and not self.settings.allow_destructive:
raise ConfirmationRequired(
f"Intent '{intent_name}' classified as DESTRUCTIVE and OPS_ALLOW_DESTRUCTIVE is false.",
previews,
)
if effective_risk != RiskLevel.READ and not confirmed and not (
effective_risk == RiskLevel.WRITE and self.settings.auto_execute_write
):
raise ConfirmationRequired(
f"Intent '{intent_name}' is {effective_risk.value}; confirmation required.",
previews,
)
# 3) Execute and collect results per host.
result = IntentRunResult(intent=intent_name, risk_level=effective_risk)
for host in hosts:
os_family, cmd = rendered[host.name]
exec_record = HostExecution(host=host.name, os_family=os_family, command=cmd)
try:
connector = get_connector(host)
async with connector as conn:
exec_record.result = await conn.run(
cmd,
timeout=self.runtime.get("default_timeout_seconds", 30),
)
except Exception as e: # noqa: BLE001 — record then continue to next host
exec_record.error = str(e)
result.executions.append(exec_record)
# 4) Audit log.
await self.audit.record(
user=user,
intent=intent_name,
risk_level=effective_risk.value,
hosts=[h.name for h in hosts],
params=sanitized,
nl_query=nl_query,
executions=[
{
"host": e.host,
"os_family": e.os_family,
"command": e.command,
"exit_code": e.result.exit_code if e.result else None,
"error": e.error,
"duration_ms": e.result.duration_ms if e.result else None,
}
for e in result.executions
],
)
return result
def _destructive_keywords(self) -> list[str]:
policy = self.runtime.get("risk_policy", {}) or {}
return policy.get("destructive_keywords", []) or []
_RISK_RANK = {RiskLevel.READ: 0, RiskLevel.WRITE: 1, RiskLevel.DESTRUCTIVE: 2}
def _risk_order(r: RiskLevel) -> int:
return _RISK_RANK[r]
+11
View File
@@ -0,0 +1,11 @@
from .models import IntentTemplate, IntentParam, RiskLevel, Implementation
from .registry import IntentRegistry, load_intent_registry
__all__ = [
"IntentTemplate",
"IntentParam",
"RiskLevel",
"Implementation",
"IntentRegistry",
"load_intent_registry",
]
+45
View File
@@ -0,0 +1,45 @@
"""Intent template data models."""
from __future__ import annotations
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class RiskLevel(str, Enum):
READ = "READ" # 自动执行
WRITE = "WRITE" # 需用户确认
DESTRUCTIVE = "DESTRUCTIVE" # 默认禁用 + 二次确认
class IntentParam(BaseModel):
name: str
type: str = "string" # string | integer | boolean
required: bool = True
default: Any = None
description: str | None = None
class Implementation(BaseModel):
"""How an intent is realized on a specific OS family / variant.
The key in IntentTemplate.implementations identifies the OS — typically one of
`linux`, `windows`, `darwin`, `aix`. More specific keys like `linux_systemd`
or `ubuntu` are reserved for the future when we need finer routing.
"""
command: str
parser: str | None = None # 可选:输出后处理器名称
shell: str | None = None # 显式指定 shell(如 powershell / bash
class IntentTemplate(BaseModel):
intent: str
description: str
risk_level: RiskLevel = RiskLevel.READ
params: list[IntentParam] = Field(default_factory=list)
implementations: dict[str, Implementation]
def implementation_for(self, os_family: str) -> Implementation | None:
return self.implementations.get(os_family)
+46
View File
@@ -0,0 +1,46 @@
"""Load all *.yaml files under templates/ into an IntentRegistry."""
from __future__ import annotations
from pathlib import Path
import yaml
from .models import IntentTemplate
class IntentRegistry:
def __init__(self, intents: dict[str, IntentTemplate]):
self._intents = intents
def get(self, name: str) -> IntentTemplate:
if name not in self._intents:
raise KeyError(f"Unknown intent: {name}")
return self._intents[name]
def all(self) -> list[IntentTemplate]:
return list(self._intents.values())
def names(self) -> list[str]:
return list(self._intents.keys())
def __contains__(self, name: str) -> bool:
return name in self._intents
def load_intent_registry(templates_dir: str | Path) -> IntentRegistry:
base = Path(templates_dir)
intents: dict[str, IntentTemplate] = {}
if not base.exists():
return IntentRegistry(intents)
for yml in sorted(base.glob("*.yaml")):
with yml.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
for spec in data.get("intents", []):
tpl = IntentTemplate(**spec)
if tpl.intent in intents:
raise ValueError(f"Duplicate intent name: {tpl.intent} (in {yml})")
intents[tpl.intent] = tpl
return IntentRegistry(intents)
+4
View File
@@ -0,0 +1,4 @@
from .models import Host, Inventory, OSFamily, ConnectionType
from .loader import load_inventory
__all__ = ["Host", "Inventory", "OSFamily", "ConnectionType", "load_inventory"]
+63
View File
@@ -0,0 +1,63 @@
"""Probe OS facts on first connect; cache in-memory so we only run uname once."""
from __future__ import annotations
from .models import Host, OSFamily
# In-memory cache. For production, persist to DB so facts survive restarts.
_FACTS_CACHE: dict[str, dict] = {}
def get_cached_facts(host: Host) -> dict | None:
return _FACTS_CACHE.get(host.name)
def set_cached_facts(host: Host, facts: dict) -> None:
_FACTS_CACHE[host.name] = facts
async def probe_facts(host: Host, connector) -> dict:
"""Run a tiny shell snippet to discover OS family + distribution.
`connector` is an already-open connector instance for this host.
Returns a facts dict and caches it.
"""
cached = get_cached_facts(host)
if cached:
return cached
facts: dict = {"os_family": host.os_family.value}
# If we already know it from inventory, trust it.
if host.os_family != OSFamily.UNKNOWN:
set_cached_facts(host, facts)
return facts
# Heuristic: try Linux/Unix first, then Windows.
# Note: on Windows hosts with Git-Bash installed, `uname -s` may return
# MINGW/MSYS/CYGWIN — those still mean the underlying OS is Windows.
try:
res = await connector.run("uname -s", timeout=5)
if res.exit_code == 0 and res.stdout.strip():
kernel = res.stdout.strip().lower()
if any(tok in kernel for tok in ("mingw", "msys", "cygwin")):
facts["os_family"] = OSFamily.WINDOWS.value
elif "linux" in kernel:
facts["os_family"] = OSFamily.LINUX.value
elif "darwin" in kernel:
facts["os_family"] = OSFamily.DARWIN.value
elif "aix" in kernel:
facts["os_family"] = OSFamily.AIX.value
if facts["os_family"] == OSFamily.UNKNOWN.value:
# Try Windows PowerShell as last resort.
res = await connector.run(
"(Get-CimInstance Win32_OperatingSystem).Caption", timeout=5
)
if res.exit_code == 0 and res.stdout.strip():
facts["os_family"] = OSFamily.WINDOWS.value
facts["os_caption"] = res.stdout.strip()
except Exception as e: # noqa: BLE001 — probing is best-effort
facts["probe_error"] = str(e)
set_cached_facts(host, facts)
return facts
+47
View File
@@ -0,0 +1,47 @@
"""Load inventory.yaml with ${ENV_VAR} interpolation for secrets."""
from __future__ import annotations
import os
import re
from pathlib import Path
import yaml
from .models import Host, Inventory
_ENV_RE = re.compile(r"\$\{([A-Z0-9_]+)\}")
def _interp(value):
"""Recursively replace ${ENV_VAR} placeholders with environment values."""
if isinstance(value, str):
return _ENV_RE.sub(lambda m: os.getenv(m.group(1), ""), value)
if isinstance(value, dict):
return {k: _interp(v) for k, v in value.items()}
if isinstance(value, list):
return [_interp(v) for v in value]
return value
def load_inventory(path: str | Path) -> Inventory:
p = Path(path)
if not p.exists():
# Fall back to the example file so the framework boots on a fresh checkout.
example = p.with_name("inventory.example.yaml")
if example.exists():
p = example
else:
return Inventory()
with p.open("r", encoding="utf-8") as f:
raw = yaml.safe_load(f) or {}
raw = _interp(raw)
hosts_dict = raw.get("hosts") or {}
hosts: dict[str, Host] = {}
for name, spec in hosts_dict.items():
spec = dict(spec)
spec["name"] = name
hosts[name] = Host(**spec)
return Inventory(hosts=hosts, groups=raw.get("groups") or {})
+61
View File
@@ -0,0 +1,61 @@
"""Inventory data models — hosts, groups, auth metadata."""
from __future__ import annotations
from enum import Enum
from typing import Literal
from pydantic import BaseModel, Field
class OSFamily(str, Enum):
LINUX = "linux"
WINDOWS = "windows"
DARWIN = "darwin"
AIX = "aix"
UNKNOWN = "unknown"
class ConnectionType(str, Enum):
SSH = "ssh"
WINRM = "winrm"
LOCAL = "local"
class AuthSpec(BaseModel):
type: Literal["key", "password"] = "key"
key_path: str | None = None
password: str | None = None # supports ${ENV_VAR} interpolation at load time
class Host(BaseModel):
name: str
address: str
port: int | None = None
os_family: OSFamily = OSFamily.UNKNOWN
os_distribution: str | None = None
connection: ConnectionType = ConnectionType.SSH
user: str | None = None
auth: AuthSpec | None = None
tags: list[str] = Field(default_factory=list)
class Inventory(BaseModel):
hosts: dict[str, Host] = Field(default_factory=dict)
groups: dict[str, list[str]] = Field(default_factory=dict)
def get(self, name: str) -> Host:
if name not in self.hosts:
raise KeyError(f"Host not found in inventory: {name}")
return self.hosts[name]
def resolve(self, selector: str) -> list[Host]:
"""Resolve a selector that may be a host name, group name, or tag."""
if selector in self.hosts:
return [self.hosts[selector]]
if selector in self.groups:
return [self.hosts[n] for n in self.groups[selector] if n in self.hosts]
# tag-based
matched = [h for h in self.hosts.values() if selector in h.tags]
if matched:
return matched
raise KeyError(f"Selector matches no host/group/tag: {selector}")
+43
View File
@@ -0,0 +1,43 @@
"""FastAPI app factory."""
from __future__ import annotations
from contextlib import asynccontextmanager
from fastapi import FastAPI
from ops_tools.api.routes import build_router
from ops_tools.audit.store import AuditStore, init_db
from ops_tools.config import get_settings
from ops_tools.executor.runner import IntentRunner
from ops_tools.intents.registry import load_intent_registry
from ops_tools.inventory.loader import load_inventory
def _wire():
settings = get_settings()
inventory = load_inventory(settings.resolve(settings.inventory_path))
registry = load_intent_registry(settings.resolve(settings.templates_dir))
runner = IntentRunner(registry, AuditStore())
return inventory, registry, runner
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
yield
app = FastAPI(
title="ai-app-ops-tools",
description="AI-powered natural language ops terminal",
version="0.1.0",
lifespan=lifespan,
)
_inventory, _registry, _runner = _wire()
app.include_router(build_router(_inventory, _registry, _runner), prefix="/api/v1")
@app.get("/health")
def health():
return {"status": "ok"}
+27
View File
@@ -0,0 +1,27 @@
# 磁盘相关意图模板
intents:
- intent: check_disk_usage
description: 查看主机各分区磁盘使用率
risk_level: READ
params: []
implementations:
linux:
command: "df -h --output=source,size,used,avail,pcent,target"
darwin:
command: "df -h"
windows:
command: "Get-PSDrive -PSProvider FileSystem | Select-Object Name,@{N='UsedGB';E={[math]::Round($_.Used/1GB,1)}},@{N='FreeGB';E={[math]::Round($_.Free/1GB,1)}} | ConvertTo-Json"
- intent: check_largest_dirs
description: 在指定路径下找出占用最大的目录(Top 10)
risk_level: READ
params:
- name: path
type: string
required: true
description: 要扫描的根路径,如 /var
implementations:
linux:
command: "du -h --max-depth=1 {{path}} 2>/dev/null | sort -hr | head -n 10"
windows:
command: "Get-ChildItem -Path '{{path}}' -Directory | ForEach-Object { [PSCustomObject]@{ Path=$_.FullName; SizeMB=[math]::Round(((Get-ChildItem $_.FullName -Recurse -ErrorAction SilentlyContinue | Measure-Object -Property Length -Sum).Sum)/1MB,1) } } | Sort-Object SizeMB -Descending | Select-Object -First 10 | ConvertTo-Json"
+53
View File
@@ -0,0 +1,53 @@
# 服务管理意图(含 WRITE 类,需确认后才会执行)
intents:
- intent: check_service_status
description: 查看服务当前状态
risk_level: READ
params:
- name: service
type: string
required: true
implementations:
linux:
command: "systemctl status {{service}} --no-pager || service {{service}} status"
windows:
command: "Get-Service -Name '{{service}}' | Select-Object Name,Status,StartType | ConvertTo-Json"
- intent: restart_service
description: 重启指定服务
risk_level: WRITE
params:
- name: service
type: string
required: true
implementations:
linux:
command: "sudo systemctl restart {{service}}"
windows:
command: "Restart-Service -Name '{{service}}' -Force"
- intent: stop_service
description: 停止指定服务
risk_level: WRITE
params:
- name: service
type: string
required: true
implementations:
linux:
command: "sudo systemctl stop {{service}}"
windows:
command: "Stop-Service -Name '{{service}}' -Force"
- intent: start_service
description: 启动指定服务
risk_level: WRITE
params:
- name: service
type: string
required: true
implementations:
linux:
command: "sudo systemctl start {{service}}"
windows:
command: "Start-Service -Name '{{service}}'"
+53
View File
@@ -0,0 +1,53 @@
# 系统资源与进程相关意图
intents:
- intent: check_os_info
description: 查看操作系统版本与内核信息(用于 facts 探测和人工核对)
risk_level: READ
params: []
implementations:
linux:
command: "uname -a && cat /etc/os-release 2>/dev/null || true"
darwin:
command: "uname -a && sw_vers"
windows:
command: "Get-CimInstance Win32_OperatingSystem | Select-Object Caption,Version,BuildNumber,OSArchitecture | ConvertTo-Json"
- intent: check_memory
description: 查看内存使用情况
risk_level: READ
params: []
implementations:
linux:
command: "free -h"
darwin:
command: "vm_stat"
windows:
command: "Get-CimInstance Win32_OperatingSystem | Select-Object @{N='TotalGB';E={[math]::Round($_.TotalVisibleMemorySize/1MB,2)}},@{N='FreeGB';E={[math]::Round($_.FreePhysicalMemory/1MB,2)}} | ConvertTo-Json"
- intent: top_cpu_processes
description: 查看 CPU 占用最高的 N 个进程
risk_level: READ
params:
- name: top_n
type: integer
required: false
default: 10
implementations:
linux:
command: "ps -eo pid,user,%cpu,%mem,comm --sort=-%cpu | head -n {{top_n + 1}}"
darwin:
command: "ps -eo pid,user,%cpu,%mem,comm -r | head -n {{top_n + 1}}"
windows:
command: "Get-Process | Sort-Object CPU -Descending | Select-Object -First {{top_n}} Id,ProcessName,CPU,WorkingSet | ConvertTo-Json"
- intent: check_uptime
description: 查看主机已运行时间
risk_level: READ
params: []
implementations:
linux:
command: "uptime"
darwin:
command: "uptime"
windows:
command: "(Get-CimInstance Win32_OperatingSystem).LastBootUpTime"
+46
View File
@@ -0,0 +1,46 @@
"""Smoke tests: framework loads, registry parses, local connector executes."""
from __future__ import annotations
import asyncio
import platform
import pytest
from ops_tools.config import PROJECT_ROOT, get_settings
from ops_tools.connectors.local import LocalConnector
from ops_tools.inventory.loader import load_inventory
from ops_tools.inventory.models import ConnectionType, Host, OSFamily
from ops_tools.intents.registry import load_intent_registry
def test_inventory_loads_example():
settings = get_settings()
inv = load_inventory(settings.resolve(settings.inventory_path))
# The example file ships with localhost; loader falls back to it if no real
# inventory.yaml exists.
assert "localhost" in inv.hosts
assert inv.hosts["localhost"].connection == ConnectionType.LOCAL
def test_intent_registry_has_disk_check():
reg = load_intent_registry(PROJECT_ROOT / "templates")
assert "check_disk_usage" in reg
intent = reg.get("check_disk_usage")
assert "linux" in intent.implementations
assert "windows" in intent.implementations
@pytest.mark.asyncio
async def test_local_connector_runs():
host = Host(
name="localhost",
address="127.0.0.1",
connection=ConnectionType.LOCAL,
os_family=(OSFamily.WINDOWS if platform.system().lower() == "windows" else OSFamily.LINUX),
)
async with LocalConnector(host) as conn:
# Pick a command that exists on both Windows and POSIX.
cmd = "Get-Date" if platform.system().lower() == "windows" else "echo hello"
result = await conn.run(cmd, timeout=10)
assert result.exit_code == 0
assert result.stdout.strip() != ""