第03章:表单与消息——自动填写与批量回复
第03章:表单与消息——自动填写与批量回复
3.1 为什么表单和消息是最值得自动化的两件事
表单和消息是日常工作中最高频的两个操作:
表单:出差、报销、入职、采购、审批……每周填 N 次,每次10-30分钟
消息:回复客户、通知团队、跟进待办……每天几十条,每条2-5分钟
而且这两件事有个共同特点:规则明确、数据结构清晰,最适合自动化。
3.2 表单自动化的三种模式
模式一:固定模板表单(最简单)
表单格式不变,只是数据每次不同。比如每周填一次工作周报:
def auto_submit_weekly_report(name, week, accomplishments, blockers, next_week_plan):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://internal.company.com/weekly-report/new")
page.wait_for_load_state("networkidle")
page.fill('input[name="employee_name"]', name)
page.fill('input[name="week_of"]', week)
page.fill('textarea[name="accomplishments"]', accomplishments)
page.fill('textarea[name="blockers"]', blockers)
page.fill('textarea[name="next_week"]', next_week_plan)
page.click('button:has-text("提交")')
page.wait_for_url("**/submitted**")
print(f"✅ {week} 周报已提交")
browser.close()
模式二:动态多行表单(最常见)
报销单、采购单这种,一个表单里有多行数据行:
def auto_fill_multi_row_order(items):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://internal.company.com/purchase/new")
# 填写基本信息
page.fill('input[name="requester"]', "张明")
page.fill('input[name="department"]', "产品部")
page.fill('input[name="purchase_date"]', "2026-04-15")
# 逐行添加采购项
for i, item in enumerate(items):
page.click('button:has-text("添加一行")')
page.wait_for_timeout(300)
rows = page.locator(".form-row[data-index]")
current_row = rows.nth(i)
current_row.locator('input[name="item_name"]').fill(item["name"])
current_row.locator('input[name="quantity"]').fill(str(item["qty"]))
current_row.locator('input[name="unit_price"]').fill(str(item["price"]))
current_row.locator('select[name="category"]').select_option(item["category"])
# 总额自动计算(等前端算完)
page.wait_for_timeout(500)
total = page.locator("#total-amount").inner_text()
print(f"合计金额:{total},确认后提交……")
page.click('button:has-text("提交审批")')
print("✅ 采购单已提交")
browser.close()
模式三:文件导入型表单(最省力)
某些系统支持上传 Excel/CSV 文件批量导入,比逐行填快10倍:
import csv
def prepare_purchase_csv(items, output_path):
"""生成符合系统导入格式的 CSV 文件"""
with open(output_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["品名", "数量", "单价", "类别", "备注"])
writer.writeheader()
writer.writerows(items)
return output_path
def import_purchase_csv(csv_path):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://internal.company.com/purchase/import")
page.set_input_files('input[type="file"]', csv_path)
page.click('button:has-text("开始导入")')
page.wait_for_selector(".import-result")
result = page.locator(".import-result").inner_text()
print(result)
browser.close()
3.3 批量消息回复:消息模板 + 变量替换
场景:批量发送个性化消息
def batch_send_messages(recipients, template, channel="feishu"):
"""
recipients: [{"name": "张三", "phone": "138xxxx", "package": "基础套餐"}, ...]
template: "亲爱的{name},您的{package}将于明天到期……"
"""
sent = []
failed = []
for person in recipients:
# 变量替换
message = template
for key, value in person.items():
message = message.replace(f"{{{key}}}", str(value))
if channel == "feishu":
result = send_feishu_message(person["feishu_id"], message)
elif channel == "wechat":
result = send_wechat_message(person["wechat_id"], message)
elif channel == "email":
result = send_email(person["email"], "套餐到期提醒", message)
if result["success"]:
sent.append(person["name"])
print(f"✅ {person['name']} → {message[:20]}……")
else:
failed.append({"name": person["name"], "error": result["error"]})
return {"sent": len(sent), "failed": len(failed), "failed_list": failed}
# 使用示例
batch_send_messages(
recipients=[
{"name": "王小姐", "phone": "13800138001", "package": "焕白管理季卡", "feishu_id": "ou_xxx1"},
{"name": "李总", "phone": "13900139001", "package": "抗衰年卡", "feishu_id": "ou_xxx2"},
],
template="亲爱的{name},您的{package}将于3天后到期。为保障您的持续体验,请及时联系您的美容师预约续费。",
channel="feishu"
)
3.4 消息队列:把发送任务排队执行
发送量大了以后,不要一口气全部发出——会被平台限流。使用任务队列:
from queue import Queue
import time, threading
message_queue = Queue()
def worker(thread_name):
while True:
task = message_queue.get()
if task is None:
break
person, message = task["recipient"], task["message"]
try:
send_feishu_message(person["feishu_id"], message)
print(f"[{thread_name}] ✅ {person['name']}")
except Exception as e:
print(f"[{thread_name}] ❌ {person['name']}: {e}")
message_queue.task_done()
time.sleep(1) # 每条消息间隔1秒,避免触发平台限流
# 启动3个并发worker
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"worker-{i}",))
t.start()
threads.append(t)
# 把任务加入队列
for person in recipients:
message_queue.put({"recipient": person, "message": personalized_msg})
# 等待所有任务完成
message_queue.join()
# 停止worker
for _ in range(3):
message_queue.put(None)
for t in threads:
t.join()
落地动作
- 找出你工作中最高频填写的表单(TOP3),记录字段
- 为每个表单写一个数据填充脚本(数据可以先 hardcode)
- 设计一套你的消息模板(客户通知/团队通知/跟进提醒)
- 配置第一个批量消息发送任务(先用自己测试)
- 加入任务队列逻辑,确保大量发送时不被平台限制