第03章 基础工具实战:文件、HTTP、系统命令

第03章 基础工具实战:文件、HTTP、系统命令

“最有用的工具往往最简单:读文件、发请求、跑命令。掌握这三类,你的Agent就能完成80%的实际任务。” —— 实用主义工程师


理论够了,动手实现。本章构建三类最常用的基础工具,并讲解每类工具的实现要点和坑。


3.1 文件系统工具

# 文件工具:读、写、列目录

import os
import json
from pathlib import Path
from typing import Optional

# ===== 读文件 =====

def read_file(path: str, encoding: str = "utf-8", max_chars: int = 10000) -> dict:
    """
    读取文件内容
    
    安全要点:
    1. 路径遍历攻击防护(不允许读取工作目录外的文件)
    2. 文件大小限制(防止读取超大文件撑爆context)
    3. 编码错误处理
    """
    try:
        # 路径安全检查
        abs_path = Path(path).resolve()
        allowed_root = Path("./workspace").resolve()  # 限制读取范围
        
        if not str(abs_path).startswith(str(allowed_root)):
            return {
                "error": f"路径超出允许范围",
                "allowed_root": str(allowed_root),
                "tip": "只能读取 workspace/ 目录下的文件"
            }
        
        if not abs_path.exists():
            return {"error": f"文件不存在: {path}"}
        
        if not abs_path.is_file():
            return {"error": f"路径是目录,不是文件: {path}"}
        
        # 文件大小检查
        file_size = abs_path.stat().st_size
        if file_size > 500_000:  # 500KB 上限
            return {
                "error": f"文件太大 ({file_size/1024:.1f}KB),超过500KB限制",
                "suggestion": "请指定要读取的行范围,或先列出文件内容摘要"
            }
        
        content = abs_path.read_text(encoding=encoding)
        
        # 内容截断(防止超出LLM context)
        truncated = False
        if len(content) > max_chars:
            content = content[:max_chars]
            truncated = True
        
        return {
            "path": str(abs_path.relative_to(allowed_root)),
            "content": content,
            "lines": content.count("\n") + 1,
            "size_bytes": file_size,
            "truncated": truncated,
            "truncated_at_chars": max_chars if truncated else None,
        }
    
    except UnicodeDecodeError:
        return {"error": f"文件编码错误,无法以 {encoding} 解码。请尝试 encoding='latin-1'"}
    except PermissionError:
        return {"error": f"权限不足,无法读取文件: {path}"}
    except Exception as e:
        return {"error": f"读取失败: {str(e)}"}

def write_file(path: str, content: str, mode: str = "overwrite") -> dict:
    """
    写文件
    
    mode: overwrite(覆盖)| append(追加)| create_new(仅创建新文件,已存在则失败)
    """
    try:
        abs_path = Path(path).resolve()
        allowed_root = Path("./workspace").resolve()
        
        if not str(abs_path).startswith(str(allowed_root)):
            return {"error": "路径超出允许范围,只能写入 workspace/ 目录"}
        
        if mode == "create_new" and abs_path.exists():
            return {"error": f"文件已存在: {path},使用 mode='overwrite' 覆盖"}
        
        # 创建父目录
        abs_path.parent.mkdir(parents=True, exist_ok=True)
        
        write_mode = "a" if mode == "append" else "w"
        abs_path.write_text(content, encoding="utf-8")
        
        return {
            "success": True,
            "path": str(abs_path.relative_to(allowed_root)),
            "bytes_written": len(content.encode("utf-8")),
            "mode": mode,
        }
    
    except Exception as e:
        return {"error": f"写文件失败: {str(e)}"}

def list_directory(path: str = ".", include_hidden: bool = False) -> dict:
    """列出目录内容"""
    try:
        abs_path = Path(path).resolve()
        allowed_root = Path("./workspace").resolve()
        
        if not str(abs_path).startswith(str(allowed_root)):
            return {"error": "路径超出允许范围"}
        
        if not abs_path.is_dir():
            return {"error": f"路径不是目录: {path}"}
        
        items = []
        for item in sorted(abs_path.iterdir()):
            if not include_hidden and item.name.startswith("."):
                continue
            stat = item.stat()
            items.append({
                "name": item.name,
                "type": "directory" if item.is_dir() else "file",
                "size_bytes": stat.st_size if item.is_file() else None,
                "modified": stat.st_mtime,
            })
        
        return {
            "path": str(abs_path.relative_to(allowed_root)),
            "items": items,
            "count": len(items),
        }
    
    except Exception as e:
        return {"error": f"列目录失败: {str(e)}"}

# Schema 定义
FILE_TOOLS_SCHEMA = [
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "读取文件内容。只能读取 workspace/ 目录下的文件。文件超过500KB时会返回错误。",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "文件路径,相对于 workspace/ 目录。例如:'docs/readme.txt'"
                    },
                    "max_chars": {
                        "type": "integer",
                        "description": "最多读取多少字符。默认10000。大文件时可以减小以节省token",
                        "default": 10000,
                    }
                },
                "required": ["path"],
                "additionalProperties": False,
            }
        }
    },
]

3.2 HTTP 请求工具

# HTTP工具:调用外部API

import httpx
import asyncio
from urllib.parse import urlparse

# 安全白名单:只允许调用指定域名
ALLOWED_DOMAINS = {
    "api.openweathermap.org",
    "api.github.com",
    "api.cloudflare.com",
    "api.resend.com",
    # 开发时可以允许任意域名,但生产环境必须限制
}

async def http_get(
    url: str,
    headers: dict = None,
    params: dict = None,
    timeout_seconds: int = 10,
) -> dict:
    """
    发送HTTP GET请求
    
    安全要点:
    1. 域名白名单(防止SSRF攻击)
    2. 超时设置(防止LLM等待过久)
    3. 响应大小限制
    """
    # 安全检查:验证域名
    parsed = urlparse(url)
    if parsed.scheme not in ("https", "http"):
        return {"error": "只支持HTTP/HTTPS协议"}
    
    if ALLOWED_DOMAINS and parsed.netloc not in ALLOWED_DOMAINS:
        return {
            "error": f"域名 '{parsed.netloc}' 不在允许列表中",
            "allowed_domains": list(ALLOWED_DOMAINS),
        }
    
    # 生产环境强制HTTPS
    if parsed.scheme == "http":
        return {"error": "生产环境只允许HTTPS请求"}
    
    try:
        async with httpx.AsyncClient(timeout=timeout_seconds) as client:
            response = await client.get(url, headers=headers or {}, params=params or {})
            
            # 响应大小检查
            content = response.text
            if len(content) > 50_000:
                content = content[:50_000]
                truncated = True
            else:
                truncated = False
            
            # 尝试解析JSON
            try:
                data = response.json()
                if len(str(data)) > 10_000:
                    # JSON太大,只返回顶层结构
                    data = {k: type(v).__name__ + "(...)" for k, v in data.items()} if isinstance(data, dict) else f"Array[{len(data)}]"
            except Exception:
                data = None
            
            return {
                "url": url,
                "status_code": response.status_code,
                "ok": response.is_success,
                "json": data,
                "text": content if not data else None,
                "truncated": truncated,
                "headers": dict(response.headers),
            }
    
    except httpx.TimeoutException:
        return {"error": f"请求超时({timeout_seconds}秒)", "url": url}
    except httpx.ConnectError:
        return {"error": f"无法连接到 {parsed.netloc},请检查网络或域名是否正确"}
    except Exception as e:
        return {"error": f"HTTP请求失败: {str(e)}"}

async def http_post(
    url: str,
    body: dict,
    headers: dict = None,
    timeout_seconds: int = 15,
) -> dict:
    """发送HTTP POST请求(JSON body)"""
    parsed = urlparse(url)
    if ALLOWED_DOMAINS and parsed.netloc not in ALLOWED_DOMAINS:
        return {"error": f"域名不在允许列表中"}
    
    try:
        async with httpx.AsyncClient(timeout=timeout_seconds) as client:
            response = await client.post(
                url,
                json=body,
                headers={"Content-Type": "application/json", **(headers or {})},
            )
            
            try:
                result_data = response.json()
            except Exception:
                result_data = response.text[:5000]
            
            return {
                "url": url,
                "status_code": response.status_code,
                "ok": response.is_success,
                "response": result_data,
            }
    
    except httpx.TimeoutException:
        return {"error": f"请求超时({timeout_seconds}秒)"}
    except Exception as e:
        return {"error": f"POST请求失败: {str(e)}"}

3.3 系统命令工具(高风险,需特别小心)

# 系统命令工具:最强大也最危险

import subprocess
import shlex

# 极其重要的安全白名单
ALLOWED_COMMANDS = {
    "ls", "pwd", "echo", "cat", "grep", "wc", "head", "tail",
    "df", "du", "free", "uptime", "date", "whoami",
    "ping",          # 网络测试
    "curl",          # HTTP(但要限制参数)
    "systemctl",     # 服务管理(要限制操作类型)
}

# 绝对禁止的命令(黑名单)
FORBIDDEN_COMMANDS = {
    "rm", "rmdir",   # 删除
    "dd",            # 磁盘写入
    "mkfs",          # 格式化
    "fdisk",         # 磁盘分区
    "sudo",          # 提权
    "su",            # 切换用户
    "passwd",        # 修改密码
    "chmod 777",     # 危险权限
    "wget", "curl",  # 如果不在白名单中
    ">", ">>",       # 重定向(在shell中可能覆盖文件)
    "|",             # 管道(可能被用于命令注入)
    ";",             # 命令分隔符(命令注入)
    "&&", "||",      # 条件执行(命令注入)
    "`",             # 命令替换(注入)
    "$(", "${",      # 命令替换(注入)
}

def run_shell_command(command: str, timeout_seconds: int = 30) -> dict:
    """
    执行系统命令(带严格安全控制)
    
    强烈建议:仅在受控环境(Docker容器、沙箱)中使用
    """
    # 安全检查1:检查是否包含危险字符(命令注入防护)
    for forbidden in FORBIDDEN_COMMANDS:
        if forbidden in command:
            return {
                "error": f"命令包含不允许的操作: '{forbidden}'",
                "blocked": True,
            }
    
    # 安全检查2:解析命令,检查程序是否在白名单中
    try:
        parts = shlex.split(command)
    except ValueError as e:
        return {"error": f"命令解析失败: {str(e)}"}
    
    if not parts:
        return {"error": "命令为空"}
    
    base_command = parts[0]
    if base_command not in ALLOWED_COMMANDS:
        return {
            "error": f"命令 '{base_command}' 不在允许列表中",
            "allowed_commands": sorted(ALLOWED_COMMANDS),
        }
    
    try:
        result = subprocess.run(
            parts,
            capture_output=True,
            text=True,
            timeout=timeout_seconds,
            # 关键:不通过shell执行,防止shell注入
            # shell=False 是默认值,但显式设置以强调
        )
        
        stdout = result.stdout[:10000] if result.stdout else ""
        stderr = result.stderr[:2000] if result.stderr else ""
        
        return {
            "command": command,
            "exit_code": result.returncode,
            "success": result.returncode == 0,
            "stdout": stdout,
            "stderr": stderr,
            "truncated": len(result.stdout) > 10000 if result.stdout else False,
        }
    
    except subprocess.TimeoutExpired:
        return {"error": f"命令超时({timeout_seconds}秒)", "command": command}
    except FileNotFoundError:
        return {"error": f"命令 '{base_command}' 不存在,请检查是否已安装"}
    except Exception as e:
        return {"error": f"命令执行失败: {str(e)}"}

本章小结

五个核心认知:

  1. 文件工具必须限制访问路径Path.resolve() + 根目录白名单是最简单有效的路径遍历防护;允许Agent读取任意路径是严重安全漏洞

  2. HTTP工具必须有域名白名单:允许Agent发送HTTP请求到任意URL = SSRF攻击向量;生产环境必须限制可访问的域名列表

  3. 系统命令工具是最后手段:优先用Python库替代系统命令(shutil替代rmos.listdir替代ls);如果必须用命令行,绝不能使用shell=True

  4. 工具返回值要做大小截断:工具返回10MB的文件内容会让整个session崩溃;每个工具都要有明确的输出大小上限和截断逻辑

  5. 错误信息要帮助LLM自我纠正:返回 {"error": "权限不足"} 帮助不大;返回 {"error": "权限不足", "allowed_root": "./workspace", "tip": "只能读取workspace目录"} 让LLM知道怎么修正

核心行动

# 今天的实践任务:
# 实现一个"目录浏览器Agent"
# 工具:list_directory + read_file
# 任务:让Agent自动找到workspace/里最大的文件
# 
# 注意检查:
# - Agent会不会尝试读取 workspace/ 外的文件?
# - 如果文件很大,Agent怎么处理截断?
# - 错误信息是否够清晰让Agent自我纠正?

本章提示词模板

模板一:设计文件操作工具集

我需要为我的Agent设计文件操作工具集。我的场景是:
[描述Agent的用途,例如:代码分析助手、文档整理助手]

Agent需要的文件操作:[列出需要的操作:读/写/移动/搜索等]
工作目录:[描述文件存储的位置和结构]
安全要求:[有哪些文件/目录是绝对不能访问的]

请帮我:
1. 设计工具列表(哪些工具是必要的,哪些可以合并)
2. 为每个工具设计Schema和实现骨架
3. 指出每个工具的主要安全风险和防护措施
4. 推荐路径限制的实现方式
5. 大文件处理策略(超过多大时怎么处理)

模板二:调试工具返回异常

我的工具调用返回了以下结果,但Agent没有正确处理:

工具名称:[名称]
调用参数:[JSON]
返回结果:[JSON]
Agent的行为:[描述Agent做了什么,或没做什么]
期望Agent的行为:[描述期望]

请帮我分析:
1. 这个返回结果的格式有什么问题?
2. LLM可能如何误解这个结果?
3. 如何修改返回格式让LLM更容易理解?
4. System Prompt中是否需要添加对这类结果的处理指导?

→ 第04章:并行工具调用:一次请求执行多个操作