第03章:表单与消息——自动填写与批量回复

第03章:表单与消息——自动填写与批量回复


3.1 为什么表单和消息是最值得自动化的两件事

表单和消息是日常工作中最高频的两个操作:

表单:出差、报销、入职、采购、审批……每周填 N 次,每次10-30分钟
消息:回复客户、通知团队、跟进待办……每天几十条,每条2-5分钟

而且这两件事有个共同特点:规则明确、数据结构清晰,最适合自动化。


3.2 表单自动化的三种模式

模式一:固定模板表单(最简单)

表单格式不变,只是数据每次不同。比如每周填一次工作周报:

def auto_submit_weekly_report(name, week, accomplishments, blockers, next_week_plan):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto("https://internal.company.com/weekly-report/new")
        page.wait_for_load_state("networkidle")

        page.fill('input[name="employee_name"]', name)
        page.fill('input[name="week_of"]', week)
        page.fill('textarea[name="accomplishments"]', accomplishments)
        page.fill('textarea[name="blockers"]', blockers)
        page.fill('textarea[name="next_week"]', next_week_plan)

        page.click('button:has-text("提交")')
        page.wait_for_url("**/submitted**")
        print(f"✅ {week} 周报已提交")
        browser.close()

模式二:动态多行表单(最常见)

报销单、采购单这种,一个表单里有多行数据行:

def auto_fill_multi_row_order(items):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto("https://internal.company.com/purchase/new")

        # 填写基本信息
        page.fill('input[name="requester"]', "张明")
        page.fill('input[name="department"]', "产品部")
        page.fill('input[name="purchase_date"]', "2026-04-15")

        # 逐行添加采购项
        for i, item in enumerate(items):
            page.click('button:has-text("添加一行")')
            page.wait_for_timeout(300)

            rows = page.locator(".form-row[data-index]")
            current_row = rows.nth(i)

            current_row.locator('input[name="item_name"]').fill(item["name"])
            current_row.locator('input[name="quantity"]').fill(str(item["qty"]))
            current_row.locator('input[name="unit_price"]').fill(str(item["price"]))
            current_row.locator('select[name="category"]').select_option(item["category"])

        # 总额自动计算(等前端算完)
        page.wait_for_timeout(500)
        total = page.locator("#total-amount").inner_text()
        print(f"合计金额:{total},确认后提交……")

        page.click('button:has-text("提交审批")')
        print("✅ 采购单已提交")
        browser.close()

模式三:文件导入型表单(最省力)

某些系统支持上传 Excel/CSV 文件批量导入,比逐行填快10倍:

import csv

def prepare_purchase_csv(items, output_path):
    """生成符合系统导入格式的 CSV 文件"""
    with open(output_path, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=["品名", "数量", "单价", "类别", "备注"])
        writer.writeheader()
        writer.writerows(items)
    return output_path

def import_purchase_csv(csv_path):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto("https://internal.company.com/purchase/import")
        page.set_input_files('input[type="file"]', csv_path)
        page.click('button:has-text("开始导入")')
        page.wait_for_selector(".import-result")
        result = page.locator(".import-result").inner_text()
        print(result)
        browser.close()

3.3 批量消息回复:消息模板 + 变量替换

场景:批量发送个性化消息

def batch_send_messages(recipients, template, channel="feishu"):
    """
    recipients: [{"name": "张三", "phone": "138xxxx", "package": "基础套餐"}, ...]
    template: "亲爱的{name},您的{package}将于明天到期……"
    """

    sent = []
    failed = []

    for person in recipients:
        # 变量替换
        message = template
        for key, value in person.items():
            message = message.replace(f"{{{key}}}", str(value))

        if channel == "feishu":
            result = send_feishu_message(person["feishu_id"], message)
        elif channel == "wechat":
            result = send_wechat_message(person["wechat_id"], message)
        elif channel == "email":
            result = send_email(person["email"], "套餐到期提醒", message)

        if result["success"]:
            sent.append(person["name"])
            print(f"✅ {person['name']} → {message[:20]}……")
        else:
            failed.append({"name": person["name"], "error": result["error"]})

    return {"sent": len(sent), "failed": len(failed), "failed_list": failed}


# 使用示例
batch_send_messages(
    recipients=[
        {"name": "王小姐", "phone": "13800138001", "package": "焕白管理季卡", "feishu_id": "ou_xxx1"},
        {"name": "李总", "phone": "13900139001", "package": "抗衰年卡", "feishu_id": "ou_xxx2"},
    ],
    template="亲爱的{name},您的{package}将于3天后到期。为保障您的持续体验,请及时联系您的美容师预约续费。",
    channel="feishu"
)

3.4 消息队列:把发送任务排队执行

发送量大了以后,不要一口气全部发出——会被平台限流。使用任务队列:

from queue import Queue
import time, threading

message_queue = Queue()

def worker(thread_name):
    while True:
        task = message_queue.get()
        if task is None:
            break

        person, message = task["recipient"], task["message"]
        try:
            send_feishu_message(person["feishu_id"], message)
            print(f"[{thread_name}] ✅ {person['name']}")
        except Exception as e:
            print(f"[{thread_name}] ❌ {person['name']}: {e}")

        message_queue.task_done()
        time.sleep(1)  # 每条消息间隔1秒,避免触发平台限流


# 启动3个并发worker
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"worker-{i}",))
    t.start()
    threads.append(t)

# 把任务加入队列
for person in recipients:
    message_queue.put({"recipient": person, "message": personalized_msg})

# 等待所有任务完成
message_queue.join()

# 停止worker
for _ in range(3):
    message_queue.put(None)
for t in threads:
    t.join()

落地动作

  1. 找出你工作中最高频填写的表单(TOP3),记录字段
  2. 为每个表单写一个数据填充脚本(数据可以先 hardcode)
  3. 设计一套你的消息模板(客户通知/团队通知/跟进提醒)
  4. 配置第一个批量消息发送任务(先用自己测试)
  5. 加入任务队列逻辑,确保大量发送时不被平台限制