第03章 基础工具实战:文件、HTTP、系统命令
第03章 基础工具实战:文件、HTTP、系统命令
“最有用的工具往往最简单:读文件、发请求、跑命令。掌握这三类,你的Agent就能完成80%的实际任务。” —— 实用主义工程师
理论够了,动手实现。本章构建三类最常用的基础工具,并讲解每类工具的实现要点和坑。
3.1 文件系统工具
# 文件工具:读、写、列目录
import os
import json
from pathlib import Path
from typing import Optional
# ===== 读文件 =====
def read_file(path: str, encoding: str = "utf-8", max_chars: int = 10000) -> dict:
"""
读取文件内容
安全要点:
1. 路径遍历攻击防护(不允许读取工作目录外的文件)
2. 文件大小限制(防止读取超大文件撑爆context)
3. 编码错误处理
"""
try:
# 路径安全检查
abs_path = Path(path).resolve()
allowed_root = Path("./workspace").resolve() # 限制读取范围
if not str(abs_path).startswith(str(allowed_root)):
return {
"error": f"路径超出允许范围",
"allowed_root": str(allowed_root),
"tip": "只能读取 workspace/ 目录下的文件"
}
if not abs_path.exists():
return {"error": f"文件不存在: {path}"}
if not abs_path.is_file():
return {"error": f"路径是目录,不是文件: {path}"}
# 文件大小检查
file_size = abs_path.stat().st_size
if file_size > 500_000: # 500KB 上限
return {
"error": f"文件太大 ({file_size/1024:.1f}KB),超过500KB限制",
"suggestion": "请指定要读取的行范围,或先列出文件内容摘要"
}
content = abs_path.read_text(encoding=encoding)
# 内容截断(防止超出LLM context)
truncated = False
if len(content) > max_chars:
content = content[:max_chars]
truncated = True
return {
"path": str(abs_path.relative_to(allowed_root)),
"content": content,
"lines": content.count("\n") + 1,
"size_bytes": file_size,
"truncated": truncated,
"truncated_at_chars": max_chars if truncated else None,
}
except UnicodeDecodeError:
return {"error": f"文件编码错误,无法以 {encoding} 解码。请尝试 encoding='latin-1'"}
except PermissionError:
return {"error": f"权限不足,无法读取文件: {path}"}
except Exception as e:
return {"error": f"读取失败: {str(e)}"}
def write_file(path: str, content: str, mode: str = "overwrite") -> dict:
"""
写文件
mode: overwrite(覆盖)| append(追加)| create_new(仅创建新文件,已存在则失败)
"""
try:
abs_path = Path(path).resolve()
allowed_root = Path("./workspace").resolve()
if not str(abs_path).startswith(str(allowed_root)):
return {"error": "路径超出允许范围,只能写入 workspace/ 目录"}
if mode == "create_new" and abs_path.exists():
return {"error": f"文件已存在: {path},使用 mode='overwrite' 覆盖"}
# 创建父目录
abs_path.parent.mkdir(parents=True, exist_ok=True)
write_mode = "a" if mode == "append" else "w"
abs_path.write_text(content, encoding="utf-8")
return {
"success": True,
"path": str(abs_path.relative_to(allowed_root)),
"bytes_written": len(content.encode("utf-8")),
"mode": mode,
}
except Exception as e:
return {"error": f"写文件失败: {str(e)}"}
def list_directory(path: str = ".", include_hidden: bool = False) -> dict:
"""列出目录内容"""
try:
abs_path = Path(path).resolve()
allowed_root = Path("./workspace").resolve()
if not str(abs_path).startswith(str(allowed_root)):
return {"error": "路径超出允许范围"}
if not abs_path.is_dir():
return {"error": f"路径不是目录: {path}"}
items = []
for item in sorted(abs_path.iterdir()):
if not include_hidden and item.name.startswith("."):
continue
stat = item.stat()
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size_bytes": stat.st_size if item.is_file() else None,
"modified": stat.st_mtime,
})
return {
"path": str(abs_path.relative_to(allowed_root)),
"items": items,
"count": len(items),
}
except Exception as e:
return {"error": f"列目录失败: {str(e)}"}
# Schema 定义
FILE_TOOLS_SCHEMA = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "读取文件内容。只能读取 workspace/ 目录下的文件。文件超过500KB时会返回错误。",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径,相对于 workspace/ 目录。例如:'docs/readme.txt'"
},
"max_chars": {
"type": "integer",
"description": "最多读取多少字符。默认10000。大文件时可以减小以节省token",
"default": 10000,
}
},
"required": ["path"],
"additionalProperties": False,
}
}
},
]
3.2 HTTP 请求工具
# HTTP工具:调用外部API
import httpx
import asyncio
from urllib.parse import urlparse
# 安全白名单:只允许调用指定域名
ALLOWED_DOMAINS = {
"api.openweathermap.org",
"api.github.com",
"api.cloudflare.com",
"api.resend.com",
# 开发时可以允许任意域名,但生产环境必须限制
}
async def http_get(
url: str,
headers: dict = None,
params: dict = None,
timeout_seconds: int = 10,
) -> dict:
"""
发送HTTP GET请求
安全要点:
1. 域名白名单(防止SSRF攻击)
2. 超时设置(防止LLM等待过久)
3. 响应大小限制
"""
# 安全检查:验证域名
parsed = urlparse(url)
if parsed.scheme not in ("https", "http"):
return {"error": "只支持HTTP/HTTPS协议"}
if ALLOWED_DOMAINS and parsed.netloc not in ALLOWED_DOMAINS:
return {
"error": f"域名 '{parsed.netloc}' 不在允许列表中",
"allowed_domains": list(ALLOWED_DOMAINS),
}
# 生产环境强制HTTPS
if parsed.scheme == "http":
return {"error": "生产环境只允许HTTPS请求"}
try:
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
response = await client.get(url, headers=headers or {}, params=params or {})
# 响应大小检查
content = response.text
if len(content) > 50_000:
content = content[:50_000]
truncated = True
else:
truncated = False
# 尝试解析JSON
try:
data = response.json()
if len(str(data)) > 10_000:
# JSON太大,只返回顶层结构
data = {k: type(v).__name__ + "(...)" for k, v in data.items()} if isinstance(data, dict) else f"Array[{len(data)}]"
except Exception:
data = None
return {
"url": url,
"status_code": response.status_code,
"ok": response.is_success,
"json": data,
"text": content if not data else None,
"truncated": truncated,
"headers": dict(response.headers),
}
except httpx.TimeoutException:
return {"error": f"请求超时({timeout_seconds}秒)", "url": url}
except httpx.ConnectError:
return {"error": f"无法连接到 {parsed.netloc},请检查网络或域名是否正确"}
except Exception as e:
return {"error": f"HTTP请求失败: {str(e)}"}
async def http_post(
url: str,
body: dict,
headers: dict = None,
timeout_seconds: int = 15,
) -> dict:
"""发送HTTP POST请求(JSON body)"""
parsed = urlparse(url)
if ALLOWED_DOMAINS and parsed.netloc not in ALLOWED_DOMAINS:
return {"error": f"域名不在允许列表中"}
try:
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
response = await client.post(
url,
json=body,
headers={"Content-Type": "application/json", **(headers or {})},
)
try:
result_data = response.json()
except Exception:
result_data = response.text[:5000]
return {
"url": url,
"status_code": response.status_code,
"ok": response.is_success,
"response": result_data,
}
except httpx.TimeoutException:
return {"error": f"请求超时({timeout_seconds}秒)"}
except Exception as e:
return {"error": f"POST请求失败: {str(e)}"}
3.3 系统命令工具(高风险,需特别小心)
# 系统命令工具:最强大也最危险
import subprocess
import shlex
# 极其重要的安全白名单
ALLOWED_COMMANDS = {
"ls", "pwd", "echo", "cat", "grep", "wc", "head", "tail",
"df", "du", "free", "uptime", "date", "whoami",
"ping", # 网络测试
"curl", # HTTP(但要限制参数)
"systemctl", # 服务管理(要限制操作类型)
}
# 绝对禁止的命令(黑名单)
FORBIDDEN_COMMANDS = {
"rm", "rmdir", # 删除
"dd", # 磁盘写入
"mkfs", # 格式化
"fdisk", # 磁盘分区
"sudo", # 提权
"su", # 切换用户
"passwd", # 修改密码
"chmod 777", # 危险权限
"wget", "curl", # 如果不在白名单中
">", ">>", # 重定向(在shell中可能覆盖文件)
"|", # 管道(可能被用于命令注入)
";", # 命令分隔符(命令注入)
"&&", "||", # 条件执行(命令注入)
"`", # 命令替换(注入)
"$(", "${", # 命令替换(注入)
}
def run_shell_command(command: str, timeout_seconds: int = 30) -> dict:
"""
执行系统命令(带严格安全控制)
强烈建议:仅在受控环境(Docker容器、沙箱)中使用
"""
# 安全检查1:检查是否包含危险字符(命令注入防护)
for forbidden in FORBIDDEN_COMMANDS:
if forbidden in command:
return {
"error": f"命令包含不允许的操作: '{forbidden}'",
"blocked": True,
}
# 安全检查2:解析命令,检查程序是否在白名单中
try:
parts = shlex.split(command)
except ValueError as e:
return {"error": f"命令解析失败: {str(e)}"}
if not parts:
return {"error": "命令为空"}
base_command = parts[0]
if base_command not in ALLOWED_COMMANDS:
return {
"error": f"命令 '{base_command}' 不在允许列表中",
"allowed_commands": sorted(ALLOWED_COMMANDS),
}
try:
result = subprocess.run(
parts,
capture_output=True,
text=True,
timeout=timeout_seconds,
# 关键:不通过shell执行,防止shell注入
# shell=False 是默认值,但显式设置以强调
)
stdout = result.stdout[:10000] if result.stdout else ""
stderr = result.stderr[:2000] if result.stderr else ""
return {
"command": command,
"exit_code": result.returncode,
"success": result.returncode == 0,
"stdout": stdout,
"stderr": stderr,
"truncated": len(result.stdout) > 10000 if result.stdout else False,
}
except subprocess.TimeoutExpired:
return {"error": f"命令超时({timeout_seconds}秒)", "command": command}
except FileNotFoundError:
return {"error": f"命令 '{base_command}' 不存在,请检查是否已安装"}
except Exception as e:
return {"error": f"命令执行失败: {str(e)}"}
本章小结
五个核心认知:
-
文件工具必须限制访问路径:
Path.resolve()+ 根目录白名单是最简单有效的路径遍历防护;允许Agent读取任意路径是严重安全漏洞 -
HTTP工具必须有域名白名单:允许Agent发送HTTP请求到任意URL = SSRF攻击向量;生产环境必须限制可访问的域名列表
-
系统命令工具是最后手段:优先用Python库替代系统命令(
shutil替代rm,os.listdir替代ls);如果必须用命令行,绝不能使用shell=True -
工具返回值要做大小截断:工具返回10MB的文件内容会让整个session崩溃;每个工具都要有明确的输出大小上限和截断逻辑
-
错误信息要帮助LLM自我纠正:返回
{"error": "权限不足"}帮助不大;返回{"error": "权限不足", "allowed_root": "./workspace", "tip": "只能读取workspace目录"}让LLM知道怎么修正
核心行动:
# 今天的实践任务:
# 实现一个"目录浏览器Agent"
# 工具:list_directory + read_file
# 任务:让Agent自动找到workspace/里最大的文件
#
# 注意检查:
# - Agent会不会尝试读取 workspace/ 外的文件?
# - 如果文件很大,Agent怎么处理截断?
# - 错误信息是否够清晰让Agent自我纠正?
本章提示词模板
模板一:设计文件操作工具集
我需要为我的Agent设计文件操作工具集。我的场景是:
[描述Agent的用途,例如:代码分析助手、文档整理助手]
Agent需要的文件操作:[列出需要的操作:读/写/移动/搜索等]
工作目录:[描述文件存储的位置和结构]
安全要求:[有哪些文件/目录是绝对不能访问的]
请帮我:
1. 设计工具列表(哪些工具是必要的,哪些可以合并)
2. 为每个工具设计Schema和实现骨架
3. 指出每个工具的主要安全风险和防护措施
4. 推荐路径限制的实现方式
5. 大文件处理策略(超过多大时怎么处理)
模板二:调试工具返回异常
我的工具调用返回了以下结果,但Agent没有正确处理:
工具名称:[名称]
调用参数:[JSON]
返回结果:[JSON]
Agent的行为:[描述Agent做了什么,或没做什么]
期望Agent的行为:[描述期望]
请帮我分析:
1. 这个返回结果的格式有什么问题?
2. LLM可能如何误解这个结果?
3. 如何修改返回格式让LLM更容易理解?
4. System Prompt中是否需要添加对这类结果的处理指导?