跳到主要内容

调度管理(定时任务 + 任务队列)

Admin API 的自动化调度系统,包含定时 Cron Jobs 和异步任务队列。

最后更新:2026-03-10


1. 定时任务(Cron Jobs)

使用 robfig/cron 运行,与 task_queue 任务队列协同工作。

任务频率说明
queueWorker每 5 秒消费 task_queue 表中待处理任务
jobHealthCheck每 5 分钟SSH 检查所有实例存活状态
jobAutoRecovery每 10 分钟自动恢复不健康的实例(restart service)
jobServerMetrics每 5 分钟采集服务器 CPU/内存/磁盘指标
jobTokenUsageSync每 15 分钟同步实例 token 使用量
jobLLMSelfHeal每 5 分钟LLM 子 Key 健康检查 + 自动修复
jobSyncLLMSpend每 10 分钟同步子 Key 日/月花费 + 零点重置

任务详情

jobHealthCheck

  1. 遍历所有未删除实例
  2. SSH 执行 systemctl is-active <service>
  3. 更新 health_statushealth_fail_count
  4. 连续失败 ≥ 3 次标记为 unhealthy

jobAutoRecovery

unhealthyhealth_fail_count >= 3 的实例:

  1. SSH 执行 systemctl restart <service>
  2. 等待 10 秒后重新检查
  3. 恢复则重置计数,否则继续累加

jobServerMetrics

对所有活跃服务器 SSH 采集 CPU、内存、磁盘、负载,写入 server_metrics 表。

jobTokenUsageSync

SSH 读取 OC gateway 的 usage 统计,更新 oc_instances.total_requests / total_tokens

jobLLMSelfHeal

LLM 网关子 Key 自动修复(详见 LLM 网关文档):

  1. 验证子 Key 有效性 → 无效则重新生成
  2. 检查实例配置一致性
  3. 异常去重后告警(仅状态变化 + >50% 失败时发 TG)

jobSyncLLMSpend

  1. 零点重置 current_daily_spend
  2. 每月 1 号重置 current_monthly_spend
  3. oneapi.logs 聚合花费

运行日志

CREATE TABLE job_runs (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
job_name VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
duration_ms INT DEFAULT 0,
detail TEXT,
created_at DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3)
);

2. 任务队列(Task Queue)

任务类型

type说明处理函数
init_instance初始化 OC 实例runInitInstance
apply_marketplace_item安装捕虾场物品runApplyMarketplaceItem
bootstrap_profession_instance初始化职业虾实例runBootstrapProfession
sync_models_to_instance同步模型列表到实例runSyncModelsV2
sync_key_to_instance同步 LLM 子 Key 到实例runSyncKeyToInstance

任务生命周期

pending → processing → done
↘ failed (attempts < 5 → 自动重试)
↘ dead (attempts >= 5)

数据库

CREATE TABLE task_queue (
id VARCHAR(64) PRIMARY KEY,
type VARCHAR(50) NOT NULL,
payload TEXT,
status VARCHAR(20) DEFAULT 'pending',
attempts INT DEFAULT 0,
error TEXT,
created_at DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
updated_at DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
KEY idx_status (status)
);

3. API

定时任务日志

GET /api/v1/job-runs?job_name=llm_self_heal&limit=20

任务队列

GET  /api/v1/task-queue?status=pending&type=sync_key_to_instance&limit=20
POST /api/v1/task-queue/:id/retry

管理任务(别名)

GET  /api/v1/tasks             → 列表
POST /api/v1/tasks → 手动创建任务
GET /api/v1/tasks/:id → 详情
POST /api/v1/tasks/:id/retry → 重试

4. 如何添加新任务类型

  1. scheduler.goqueueWorker switch 中添加 case
  2. 实现处理函数 func runXxx(payload map[string]interface{}) (string, bool)
  3. 在需要的地方调用 enqueueTask(type, payload, source) 入队