Skip to content

OpenClaw 自动化工作流设计模式

自动化工作流是 OpenClaw 实现高效任务处理的核心。本文详解 cron 定时任务、heartbeat 心跳机制、工作流编排等设计模式,帮助你构建可靠的自动化系统。

概述

OpenClaw 提供多种自动化机制:

  • Cron 定时任务 - 精确时间调度的任务
  • 💓 Heartbeat 心跳 - 周期性检查与批处理
  • 🔄 工作流编排 - 多步骤任务协调
  • 🎯 事件驱动 - 响应外部事件触发
  • 📊 条件执行 - 基于状态的决策

一、Cron 定时任务

1.1 Cron 基础

OpenClaw 的 cron 系统支持三种调度类型:

javascript
// 1. 一次性任务(at)
{
  kind: "at",
  at: "2026-03-20T08:00:00+08:00"  // ISO-8601 格式
}

// 2. 周期性任务(every)
{
  kind: "every",
  everyMs: 3600000,  // 1 小时
  anchorMs: 1710900000000  // 可选:起始时间
}

// 3. Cron 表达式(cron)
{
  kind: "cron",
  expr: "0 8 * * *",  // 每天 8:00
  tz: "Asia/Shanghai"  // 时区
}

1.2 创建定时任务

javascript
// 创建每日早报任务
await cron({
  action: 'add',
  job: {
    name: '每日早报',
    schedule: {
      kind: 'cron',
      expr: '0 8 * * *',  // 每天 8:00
      tz: 'Asia/Shanghai'
    },
    payload: {
      kind: 'agentTurn',
      message: '生成每日早报:百度热搜 + 驻马店天气',
      timeoutSeconds: 300
    },
    sessionTarget: 'isolated',  // 隔离会话
    enabled: true
  }
})

// 创建每小时检查任务
await cron({
  action: 'add',
  job: {
    name: '每小时检查',
    schedule: {
      kind: 'every',
      everyMs: 3600000  // 1 小时
    },
    payload: {
      kind: 'systemEvent',
      text: 'HEARTBEAT: 检查邮箱、日历、通知'
    },
    sessionTarget: 'main'  // 主会话
  }
})

// 创建一次性提醒
const reminderTime = new Date()
reminderTime.setHours(reminderTime.getHours() + 1)

await cron({
  action: 'add',
  job: {
    name: '一小时后提醒',
    schedule: {
      kind: 'at',
      at: reminderTime.toISOString()
    },
    payload: {
      kind: 'agentTurn',
      message: '提醒:休息眼睛,起来活动一下',
      timeoutSeconds: 60
    },
    sessionTarget: 'isolated',
    delivery: {
      mode: 'announce'  // 通知用户
    }
  }
})

1.3 管理定时任务

javascript
// 列出所有任务
async function listCronJobs() {
  return await cron({
    action: 'list',
    includeDisabled: true
  })
}

// 获取任务状态
async function getCronStatus() {
  return await cron({ action: 'status' })
}

// 立即触发任务
async function runCronJob(jobId) {
  return await cron({
    action: 'run',
    jobId,
    runMode: 'force'  // force | due
  })
}

// 更新任务
async function updateCronJob(jobId, patch) {
  return await cron({
    action: 'update',
    jobId,
    patch
  })
}

// 删除任务
async function removeCronJob(jobId) {
  return await cron({
    action: 'remove',
    jobId
  })
}

// 获取运行历史
async function getCronHistory(jobId) {
  return await cron({
    action: 'runs',
    jobId
  })
}

1.4 实战案例 1:智能早报系统

javascript
class MorningBriefing {
  constructor() {
    this.enabled = true
  }
  
  // 注册定时任务
  async register() {
    await cron({
      action: 'add',
      job: {
        name: '每日早报',
        schedule: {
          kind: 'cron',
          expr: '0 8 * * *',
          tz: 'Asia/Shanghai'
        },
        payload: {
          kind: 'agentTurn',
          message: '生成每日早报',
          timeoutSeconds: 300
        },
        sessionTarget: 'isolated',
        delivery: {
          mode: 'announce'
        }
      }
    })
    
    console.log('✅ 每日早报任务已注册')
  }
  
  // 生成早报
  async generate() {
    console.log('📰 生成每日早报...')
    
    const sections = []
    
    // 1. 日期信息
    const today = new Date()
    sections.push(`# 每日早报\n\n`)
    sections.push(`📅 ${today.toLocaleDateString('zh-CN', { 
      year: 'numeric', 
      month: 'long', 
      day: 'numeric',
      weekday: 'long'
    })}\n\n`)
    
    // 2. 天气
    const weather = await this.getWeather('驻马店')
    sections.push(`## 🌤️ 天气\n\n${weather}\n\n`)
    
    // 3. 百度热搜
    const hotSearch = await this.getHotSearch()
    sections.push(`## 🔥 百度热搜\n\n${hotSearch}\n\n`)
    
    // 4. 日历事件
    const events = await this.getCalendarEvents()
    if (events.length > 0) {
      sections.push(`## 📅 今日日程\n\n${events}\n\n`)
    }
    
    // 5. 待办提醒
    const todos = await this.getPendingTodos()
    if (todos.length > 0) {
      sections.push(`## ✅ 今日待办\n\n${todos}\n\n`)
    }
    
    // 6. 励志语录
    const quote = await this.getDailyQuote()
    sections.push(`## 💭 每日一句\n\n${quote}\n\n`)
    
    // 合并并发送
    const briefing = sections.join('')
    
    await message({
      action: 'send',
      target: '老大',
      message: briefing
    })
    
    console.log('✅ 早报已发送')
    return briefing
  }
  
  async getWeather(location) {
    try {
      const result = await exec({
        command: `curl -s "wttr.in/${encodeURIComponent(location)}?format=3"`
      })
      return result.stdout.trim() || '天气数据暂不可用'
    } catch (e) {
      return '天气数据获取失败'
    }
  }
  
  async getHotSearch() {
    try {
      // 使用 web_search 或 web_fetch 获取热搜
      const search = await web_search({
        query: '百度热搜榜',
        count: 5
      })
      
      return search.results?.slice(0, 5).map((r, i) => 
        `${i + 1}. [${r.title}](${r.url})`
      ).join('\n') || '热搜数据暂不可用'
    } catch (e) {
      return '热搜数据获取失败'
    }
  }
  
  async getCalendarEvents() {
    // 这里可以集成日历 API
    // 简化处理
    return '暂无日程安排'
  }
  
  async getPendingTodos() {
    // 从 MEMORY.md 或任务系统获取
    try {
      const memory = await read({
        path: '/home/pao/.openclaw/workspace/MEMORY.md'
      })
      
      const todos = memory.match(/\[ \] .*/g) || []
      
      if (todos.length === 0) {
        return '暂无待办事项'
      }
      
      return todos.slice(0, 5).map(t => t.replace('[ ] ', '- ')).join('\n')
    } catch (e) {
      return '待办数据获取失败'
    }
  }
  
  async getDailyQuote() {
    const quotes = [
      '千里之行,始于足下。',
      '不积跬步,无以至千里。',
      '学而不思则罔,思而不学则殆。',
      '知行合一。',
      '每天进步一点点。'
    ]
    
    const today = new Date()
    const index = today.getDate() % quotes.length
    
    return quotes[index]
  }
}

// 使用
const briefing = new MorningBriefing()
await briefing.register()

// 手动触发
await briefing.generate()

1.5 实战案例 2:晚报总结系统

javascript
class EveningSummary {
  constructor() {
    this.workLog = []
  }
  
  async register() {
    await cron({
      action: 'add',
      job: {
        name: '每日晚报',
        schedule: {
          kind: 'cron',
          expr: '0 19 * * *',  // 每天 19:00
          tz: 'Asia/Shanghai'
        },
        payload: {
          kind: 'agentTurn',
          message: '生成每日工作总结',
          timeoutSeconds: 300
        },
        sessionTarget: 'isolated',
        delivery: {
          mode: 'announce'
        }
      }
    })
    
    console.log('✅ 每日晚报任务已注册')
  }
  
  async generate() {
    console.log('📝 生成每日工作总结...')
    
    // 1. 读取今日日志
    const today = new Date().toISOString().split('T')[0]
    let dailyLog = ''
    
    try {
      dailyLog = await read({
        path: `/home/pao/.openclaw/workspace/memory/${today}.md`
      })
    } catch (e) {
      dailyLog = '今日无日志记录'
    }
    
    // 2. 使用 AI 总结
    const summary = await sessions_spawn({
      task: `基于以下今日日志生成工作总结:
      
      ${dailyLog}
      
      包括:
      1. 完成的主要工作(3-5 项)
      2. 遇到的问题和解决
      3. 学到的经验
      4. 明日计划建议
      
      格式简洁,使用 Markdown。`,
      mode: 'run',
      timeoutSeconds: 180
    })
    
    // 3. 发送总结
    await message({
      action: 'send',
      target: '老大',
      message: `# 每日工作总结\n\n日期:${today}\n\n${summary.output}`
    })
    
    console.log('✅ 晚报已发送')
    return summary.output
  }
}

// 使用
const summary = new EveningSummary()
await summary.register()

二、Heartbeat 心跳机制

2.1 Heartbeat 基础

Heartbeat 是 OpenClaw 的周期性检查机制,用于:

  • 批量检查多个数据源
  • 保持会话活跃
  • 执行周期性维护任务

2.2 配置 Heartbeat

yaml
# ~/.openclaw/workspace/HEARTBEAT.md
# 心跳检查清单

## 检查项目
- [ ] 邮箱 - 检查未读邮件
- [ ] 日历 - 查看 24 小时内事件
- [ ] 通知 - 检查重要提醒
- [ ] 天气 - 如有外出计划

## 检查频率
- 工作日:每 30 分钟
- 周末:每 2 小时
- 夜间(23:00-08:00):静默

## 注意事项
- 不要重复检查相同内容
- 记录检查状态到 memory/heartbeat-state.json
- 发现重要事项及时通知

2.3 Heartbeat 实现

javascript
class HeartbeatManager {
  constructor() {
    this.stateFile = '/home/pao/.openclaw/workspace/memory/heartbeat-state.json'
    this.state = this.loadState()
  }
  
  loadState() {
    try {
      const content = await read({ path: this.stateFile })
      return JSON.parse(content)
    } catch (e) {
      return {
        lastChecks: {
          email: null,
          calendar: null,
          notifications: null,
          weather: null
        },
        checkCount: 0
      }
    }
  }
  
  saveState() {
    write({
      path: this.stateFile,
      content: JSON.stringify(this.state, null, 2)
    })
  }
  
  // 处理心跳
  async handleHeartbeat() {
    console.log('💓 心跳检查...')
    
    const now = Date.now()
    const hour = new Date().getHours()
    
    // 夜间静默
    if (hour >= 23 || hour < 8) {
      console.log('夜间静默,跳过检查')
      return 'HEARTBEAT_OK'
    }
    
    const results = []
    
    // 检查邮箱(每 30 分钟)
    if (this.shouldCheck('email', 30 * 60 * 1000)) {
      const emailResult = await this.checkEmail()
      results.push({ type: 'email', ...emailResult })
      this.state.lastChecks.email = now
    }
    
    // 检查日历(每 60 分钟)
    if (this.shouldCheck('calendar', 60 * 60 * 1000)) {
      const calendarResult = await this.checkCalendar()
      results.push({ type: 'calendar', ...calendarResult })
      this.state.lastChecks.calendar = now
    }
    
    // 检查通知(每 30 分钟)
    if (this.shouldCheck('notifications', 30 * 60 * 1000)) {
      const notificationResult = await this.checkNotifications()
      results.push({ type: 'notifications', ...notificationResult })
      this.state.lastChecks.notifications = now
    }
    
    // 检查天气(每 2 小时)
    if (this.shouldCheck('weather', 2 * 60 * 60 * 1000)) {
      const weatherResult = await this.checkWeather()
      results.push({ type: 'weather', ...weatherResult })
      this.state.lastChecks.weather = now
    }
    
    this.state.checkCount++
    this.saveState()
    
    // 汇总结果
    if (results.some(r => r.hasNew)) {
      await this.sendSummary(results)
    }
    
    return results.length > 0 ? `完成 ${results.length} 项检查` : 'HEARTBEAT_OK'
  }
  
  shouldCheck(type, interval) {
    const lastCheck = this.state.lastChecks[type]
    if (!lastCheck) return true
    return Date.now() - lastCheck >= interval
  }
  
  async checkEmail() {
    // 实现邮箱检查逻辑
    return { hasNew: false, count: 0 }
  }
  
  async checkCalendar() {
    // 实现日历检查逻辑
    const upcomingEvents = []  // 获取 24 小时内事件
    
    return {
      hasNew: upcomingEvents.length > 0,
      events: upcomingEvents
    }
  }
  
  async checkNotifications() {
    // 实现通知检查逻辑
    return { hasNew: false, notifications: [] }
  }
  
  async checkWeather() {
    try {
      const result = await exec({
        command: 'curl -s "wttr.in/驻马店?format=3"'
      })
      
      return {
        hasNew: true,
        weather: result.stdout.trim()
      }
    } catch (e) {
      return { hasNew: false }
    }
  }
  
  async sendSummary(results) {
    let summary = '💓 心跳检查摘要\n\n'
    
    for (const result of results) {
      if (result.hasNew) {
        summary += `## ${this.getTypeIcon(result.type)} ${result.type}\n`
        
        if (result.events) {
          summary += result.events.map(e => `- ${e.title} (${e.time})\n`).join('')
        }
        
        if (result.weather) {
          summary += `${result.weather}\n`
        }
        
        summary += '\n'
      }
    }
    
    if (summary.trim().split('\n').length > 2) {
      await message({
        action: 'send',
        target: '老大',
        message: summary
      })
    }
  }
  
  getTypeIcon(type) {
    const icons = {
      email: '📧',
      calendar: '📅',
      notifications: '🔔',
      weather: '🌤️'
    }
    return icons[type] || '📌'
  }
}

// 使用
const heartbeat = new HeartbeatManager()

// 在 HEARTBEAT.md 中配置的检查
// 收到心跳消息时调用
async function onHeartbeat(message) {
  return await heartbeat.handleHeartbeat()
}

2.4 实战案例 3:Heartbeat 与 Cron 对比

javascript
// Heartbeat vs Cron 选择指南

/*
使用 Heartbeat 当:
✅ 多个检查可以批量处理
✅ 需要会话上下文
✅ 时间可以略有漂移
✅ 想减少 API 调用

使用 Cron 当:
✅ 精确时间很重要
✅ 任务需要隔离
✅ 不同模型/配置
✅ 一次性提醒
*/

// 示例:批处理 vs 独立任务

// ❌ 不推荐:多个独立 cron 任务
await cron({ action: 'add', job: { name: '检查邮箱', schedule: { kind: 'every', everyMs: 1800000 } } })
await cron({ action: 'add', job: { name: '检查日历', schedule: { kind: 'every', everyMs: 1800000 } } })
await cron({ action: 'add', job: { name: '检查天气', schedule: { kind: 'every', everyMs: 1800000 } } })

// ✅ 推荐:使用 Heartbeat 批量处理
// 在 HEARTBEAT.md 中配置,一次检查所有项目

三、工作流编排

3.1 顺序工作流

javascript
class SequentialWorkflow {
  constructor(name) {
    this.name = name
    this.steps = []
  }
  
  addStep(name, fn, options = {}) {
    this.steps.push({
      name,
      fn,
      timeout: options.timeout || 300,
      retries: options.retries || 0
    })
    return this
  }
  
  async execute(context = {}) {
    console.log(`🚀 开始工作流:${this.name}`)
    
    const results = {}
    const startTime = Date.now()
    
    for (let i = 0; i < this.steps.length; i++) {
      const step = this.steps[i]
      console.log(`步骤 ${i + 1}/${this.steps.length}: ${step.name}`)
      
      try {
        const result = await this.executeWithRetry(step, context)
        results[step.name] = result
        
        // 更新上下文
        Object.assign(context, result.context || {})
        
      } catch (error) {
        console.error(`步骤失败:${step.name}`, error)
        
        if (step.onFailure === 'continue') {
          results[step.name] = { error: error.message }
        } else {
          throw new Error(`工作流在步骤 "${step.name}" 失败:${error.message}`)
        }
      }
    }
    
    const duration = Date.now() - startTime
    console.log(`✅ 工作流完成,耗时:${duration}ms`)
    
    return { results, context, duration }
  }
  
  async executeWithRetry(step, context) {
    let lastError
    
    for (let attempt = 0; attempt <= step.retries; attempt++) {
      try {
        return await Promise.race([
          step.fn(context),
          this.timeout(step.timeout * 1000)
        ])
      } catch (error) {
        lastError = error
        
        if (attempt < step.retries) {
          console.log(`重试 ${attempt + 1}/${step.retries}`)
          await this.sleep(2000 * (attempt + 1))
        }
      }
    }
    
    throw lastError
  }
  
  timeout(ms) {
    return new Promise((_, reject) => {
      setTimeout(() => reject(new Error('超时')), ms)
    })
  }
  
  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms))
  }
}

// 使用:文章发布工作流
const publishWorkflow = new SequentialWorkflow('文章发布')
  .addStep('验证草稿', async (ctx) => {
    const exists = await this.verifyDraft(ctx.filePath)
    return { verified: exists, context: { ...ctx } }
  })
  .addStep('添加 frontmatter', async (ctx) => {
    await this.addFrontmatter(ctx.filePath, ctx.title, ctx.date)
    return { frontmatterAdded: true }
  })
  .addStep('更新侧边栏', async (ctx) => {
    await this.updateSidebar(ctx.filePath, ctx.title)
    return { sidebarUpdated: true }
  })
  .addStep('构建网站', async (ctx) => {
    const result = await exec({ command: 'npm run build', timeout: 120 })
    return { buildSuccess: result.exitCode === 0 }
  })
  .addStep('部署文件', async (ctx) => {
    await exec({ command: 'rsync -av dist/ user@server:/var/www/' })
    return { deployed: true }
  })
  .addStep('验证发布', async (ctx) => {
    const response = await fetch(ctx.url)
    return { verified: response.ok }
  })
  .addStep('提交 Git', async (ctx) => {
    await exec({ command: 'git add -A && git commit -m "publish: ..." && git push' })
    return { committed: true }
  })

// 执行
const result = await publishWorkflow.execute({
  filePath: '/docs/guide/article.md',
  title: '新文章',
  date: '2026-03-20',
  url: 'https://example.com/guide/article.html'
})

3.2 并行工作流

javascript
class ParallelWorkflow {
  constructor(name) {
    this.name = name
    this.tasks = []
    this.concurrency = 5
  }
  
  addTask(name, fn) {
    this.tasks.push({ name, fn })
    return this
  }
  
  async execute(context = {}) {
    console.log(`🚀 开始并行工作流:${this.name}`)
    console.log(`任务数:${this.tasks.length}, 并发数:${this.concurrency}`)
    
    const results = []
    const startTime = Date.now()
    
    // 分批执行
    for (let i = 0; i < this.tasks.length; i += this.concurrency) {
      const batch = this.tasks.slice(i, i + this.concurrency)
      console.log(`执行批次 ${Math.floor(i / this.concurrency) + 1}`)
      
      const batchResults = await Promise.allSettled(
        batch.map(task => task.fn(context).then(r => ({ name: task.name, result: r })))
      )
      
      results.push(...batchResults)
    }
    
    const duration = Date.now() - startTime
    const success = results.filter(r => r.status === 'fulfilled').length
    const failed = results.filter(r => r.status === 'rejected').length
    
    console.log(`✅ 工作流完成:${success} 成功,${failed} 失败,耗时:${duration}ms`)
    
    return {
      results: results.map(r => r.status === 'fulfilled' ? r.value : { error: r.reason.message }),
      success,
      failed,
      duration
    }
  }
}

// 使用:批量文章处理
const batchWorkflow = new ParallelWorkflow('批量文章处理')

for (const article of articles) {
  batchWorkflow.addTask(article.title, async () => {
    // 处理单篇文章
    await processArticle(article)
    return { processed: true, title: article.title }
  })
}

const result = await batchWorkflow.execute()

3.3 条件工作流

javascript
class ConditionalWorkflow {
  constructor(name) {
    this.name = name
    this.branches = []
  }
  
  addBranch(condition, workflow) {
    this.branches.push({ condition, workflow })
    return this
  }
  
  async execute(context) {
    console.log(`🚀 开始条件工作流:${this.name}`)
    
    for (const branch of this.branches) {
      const shouldRun = await branch.condition(context)
      
      if (shouldRun) {
        console.log(`执行分支:${branch.workflow.name}`)
        return await branch.workflow.execute(context)
      }
    }
    
    console.log('没有匹配的条件分支')
    return { executed: false }
  }
}

// 使用:根据文件类型选择处理流程
const fileWorkflow = new ConditionalWorkflow('文件处理')
  .addBranch(
    async (ctx) => ctx.fileType === 'image',
    new SequentialWorkflow('图片处理')
      .addStep('压缩', compressImage)
      .addStep('添加水印', addWatermark)
      .addStep('上传 CDN', uploadToCDN)
  )
  .addBranch(
    async (ctx) => ctx.fileType === 'document',
    new SequentialWorkflow('文档处理')
      .addStep('转换格式', convertFormat)
      .addStep('提取文本', extractText)
      .addStep('建立索引', buildIndex)
  )
  .addBranch(
    async (ctx) => ctx.fileType === 'video',
    new SequentialWorkflow('视频处理')
      .addStep('转码', transcode)
      .addStep('生成缩略图', generateThumbnail)
      .addStep('上传存储', uploadToStorage)
  )

// 执行
const result = await fileWorkflow.execute({
  fileType: 'image',
  filePath: '/path/to/image.jpg'
})

3.4 实战案例 4:完整的内容发布系统

javascript
class ContentPublishingSystem {
  constructor() {
    this.workflows = {
      article: this.createArticleWorkflow(),
      video: this.createVideoWorkflow(),
      podcast: this.createPodcastWorkflow()
    }
  }
  
  createArticleWorkflow() {
    return new SequentialWorkflow('文章发布')
      .addStep('验证内容', async (ctx) => {
        const content = await read({ path: ctx.filePath })
        if (content.length < 1000) {
          throw new Error('文章内容过短')
        }
        return { wordCount: content.length }
      })
      .addStep('AI 审校', async (ctx) => {
        const content = await read({ path: ctx.filePath })
        const review = await sessions_spawn({
          task: `审校以下文章:
          - 语法错误
          - 逻辑问题
          - 改进建议
          
          ${content}`,
          mode: 'run'
        })
        return { review: review.output }
      })
      .addStep('添加元数据', async (ctx) => {
        await this.addFrontmatter(ctx)
        return { frontmatterAdded: true }
      })
      .addStep('更新导航', async (ctx) => {
        await this.updateNavigation(ctx)
        return { navigationUpdated: true }
      })
      .addStep('构建部署', async (ctx) => {
        await exec({ command: 'npm run build && npm run deploy', timeout: 300 })
        return { deployed: true }
      })
      .addStep('通知订阅', async (ctx) => {
        await this.notifySubscribers(ctx)
        return { notified: true }
      })
  }
  
  createVideoWorkflow() {
    return new SequentialWorkflow('视频发布')
      .addStep('视频转码', async (ctx) => {
        await exec({ command: `ffmpeg -i ${ctx.input} -c:v libx264 ${ctx.output}` })
        return { transcoded: true }
      })
      .addStep('生成缩略图', async (ctx) => {
        await exec({ command: `ffmpeg -i ${ctx.input} -ss 00:00:05 -vframes 1 ${ctx.thumbnail}` })
        return { thumbnailGenerated: true }
      })
      .addStep('上传视频', async (ctx) => {
        await this.uploadVideo(ctx.output)
        return { uploaded: true }
      })
      .addStep('创建页面', async (ctx) => {
        await this.createVideoPage(ctx)
        return { pageCreated: true }
      })
  }
  
  async publish(type, context) {
    const workflow = this.workflows[type]
    
    if (!workflow) {
      throw new Error(`未知内容类型:${type}`)
    }
    
    console.log(`📤 开始发布 ${type} 内容`)
    
    try {
      const result = await workflow.execute(context)
      
      // 记录发布历史
      await this.recordPublish(type, context, result)
      
      return { success: true, ...result }
      
    } catch (error) {
      console.error('发布失败:', error)
      
      // 通知失败
      await message({
        action: 'send',
        target: '老大',
        message: `❌ 内容发布失败\n\n类型:${type}\n错误:${error.message}\n时间:${new Date().toLocaleString('zh-CN')}`
      })
      
      return { success: false, error: error.message }
    }
  }
  
  async recordPublish(type, context, result) {
    const entry = {
      type,
      context,
      result,
      timestamp: Date.now()
    }
    
    // 追加到发布历史
    const historyPath = '/home/pao/.openclaw/workspace/memory/publish-history.jsonl'
    await exec({
      command: `echo '${JSON.stringify(entry)}' >> ${historyPath}`
    })
  }
  
  async notifySubscribers(ctx) {
    // 实现订阅通知逻辑
    await message({
      action: 'send',
      target: 'content-subscribers',
      message: `📰 新文章发布:${ctx.title}\n\n${ctx.url}`
    })
  }
}

// 使用
const publisher = new ContentPublishingSystem()

// 发布文章
const result = await publisher.publish('article', {
  filePath: '/docs/guide/new-article.md',
  title: '新文章标题',
  url: 'https://example.com/guide/new-article.html'
})

console.log('发布结果:', result)

四、事件驱动自动化

4.1 文件系统监听

javascript
class FileWatcher {
  constructor(watchPath, handler) {
    this.watchPath = watchPath
    this.handler = handler
    this.watching = false
  }
  
  async start() {
    this.watching = true
    console.log(`开始监听:${this.watchPath}`)
    
    // 使用 inotifywait 或 fs.watch
    const process = await exec({
      command: `inotifywait -m -r -e modify,create,delete ${this.watchPath}`,
      background: true
    })
    
    // 处理事件
    process.on('output', (output) => {
      const event = this.parseEvent(output)
      if (event) {
        this.handler(event)
      }
    })
  }
  
  parseEvent(output) {
    // 解析 inotifywait 输出
    const match = output.match(/(\w+)\s+(\w+)\s+(.+)/)
    if (match) {
      return {
        directory: match[1],
        event: match[2],
        file: match[3]
      }
    }
    return null
  }
}

// 使用:监听文章目录
const watcher = new FileWatcher('/home/pao/projects/ai-knowledge-base/docs/guide', async (event) => {
  console.log('文件事件:', event)
  
  if (event.event === 'CREATE' && event.file.endsWith('.md')) {
    // 新文章创建,触发处理流程
    await processNewArticle(event.file)
  }
})

await watcher.start()

4.2 消息事件监听

javascript
class MessageEventListener {
  constructor() {
    this.listeners = new Map()
  }
  
  on(pattern, handler) {
    this.listeners.set(pattern, handler)
  }
  
  async handleMessage(message) {
    for (const [pattern, handler] of this.listeners) {
      if (this.matchPattern(message.content, pattern)) {
        await handler(message)
        return
      }
    }
  }
  
  matchPattern(content, pattern) {
    // 简单的模式匹配
    if (pattern.startsWith('/') && pattern.endsWith('/')) {
      const regex = new RegExp(pattern.slice(1, -1))
      return regex.test(content)
    }
    return content.includes(pattern)
  }
}

// 使用
const listener = new MessageEventListener()

// 监听发布命令
listener.on('/publish', async (message) => {
  const filePath = message.content.split(' ')[1]
  await publisher.publish('article', { filePath })
})

// 监听天气查询
listener.on('天气', async (message) => {
  const location = message.content.replace('天气', '').trim() || '驻马店'
  const weather = await getWeather(location)
  await message({ action: 'send', replyTo: message.id, message: weather })
})

五、最佳实践

5.1 错误处理

javascript
// 始终设置超时
await sessions_spawn({
  task: '...',
  timeoutSeconds: 300  // 5 分钟
})

// 实现重试
async function withRetry(fn, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn()
    } catch (error) {
      if (i === maxRetries - 1) throw error
      await sleep(2000 * (i + 1))
    }
  }
}

// 保存状态支持恢复
async function saveState(state) {
  await write({
    path: '/tmp/workflow-state.json',
    content: JSON.stringify(state)
  })
}

5.2 日志记录

javascript
class WorkflowLogger {
  constructor(name) {
    this.name = name
    this.logs = []
    this.startTime = Date.now()
  }
  
  log(step, status, details = {}) {
    this.logs.push({
      step,
      status,
      timestamp: Date.now(),
      details
    })
    
    console.log(`[${this.name}] ${step}: ${status}`)
  }
  
  generateReport() {
    const duration = Date.now() - this.startTime
    const failed = this.logs.filter(l => l.status === 'failed')
    
    return {
      workflow: this.name,
      duration,
      totalSteps: this.logs.length,
      failedSteps: failed.length,
      logs: this.logs
    }
  }
}

5.3 性能优化

javascript
// 限制并发数
const semaphore = { count: 0, max: 5, queue: [] }

async function acquire() {
  if (semaphore.count >= semaphore.max) {
    await new Promise(resolve => semaphore.queue.push(resolve))
  }
  semaphore.count++
}

async function release() {
  semaphore.count--
  if (semaphore.queue.length > 0) {
    semaphore.queue.shift()()
  }
}

// 使用
await acquire()
try {
  await processTask()
} finally {
  await release()
}

六、总结

核心要点

  1. Cron 用于精确时间调度
  2. Heartbeat 用于批量周期检查
  3. 工作流编排复杂任务
  4. 事件驱动响应外部变化
  5. 错误处理和日志至关重要

选择指南

需求推荐方案
每天固定时间Cron
周期性检查Heartbeat
多步骤任务顺序工作流
批量处理并行工作流
条件执行条件工作流
实时响应事件驱动

🟢🐉 开始构建你的自动化工作流吧!

Released under the MIT License.