调度管理(定时任务 + 任务队列)
Admin API 的自动化调度系统,包含定时 Cron Jobs 和异步任务队列。
最后更新:2026-03-10
1. 定时任务(Cron Jobs)
使用 robfig/cron 运行,与 task_queue 任务队列协同工作。
| 任务 | 频率 | 说明 |
|---|---|---|
queueWorker | 每 5 秒 | 消费 task_queue 表中待处理任务 |
jobHealthCheck | 每 5 分钟 | SSH 检查所有实例存活状态 |
jobAutoRecovery | 每 10 分钟 | 自动恢复不健康的实例(restart service) |
jobServerMetrics | 每 5 分钟 | 采集服务器 CPU/内存/磁盘指标 |
jobTokenUsageSync | 每 15 分钟 | 同步实例 token 使用量 |
jobLLMSelfHeal | 每 5 分钟 | LLM 子 Key 健康检查 + 自动修复 |
jobSyncLLMSpend | 每 10 分钟 | 同步子 Key 日/月花费 + 零点重置 |
任务详情
jobHealthCheck
- 遍历所有未删除实例
- SSH 执行
systemctl is-active <service> - 更新
health_status和health_fail_count - 连续失败 ≥ 3 次标记为
unhealthy
jobAutoRecovery
对 unhealthy 且 health_fail_count >= 3 的实例:
- SSH 执行
systemctl restart <service> - 等待 10 秒后重新检查
- 恢复则重置计数,否则继续累加
jobServerMetrics
对所有活跃服务器 SSH 采集 CPU、内存、磁盘、负载,写入 server_metrics 表。
jobTokenUsageSync
SSH 读取 OC gateway 的 usage 统计,更新 oc_instances.total_requests / total_tokens。
jobLLMSelfHeal
LLM 网关子 Key 自动修复(详见 LLM 网关文档):
- 验证子 Key 有效性 → 无效则重新生成
- 检查实例配置一致性
- 异常去重后告警(仅状态变化 + >50% 失败时发 TG)
jobSyncLLMSpend
- 零点重置
current_daily_spend - 每月 1 号重置
current_monthly_spend - 从
oneapi.logs聚合花费
运行日志
CREATE TABLE job_runs (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
job_name VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
duration_ms INT DEFAULT 0,
detail TEXT,
created_at DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3)
);
2. 任务队列(Task Queue)
任务类型
| type | 说明 | 处理函数 |
|---|---|---|
init_instance | 初始化 OC 实例 | runInitInstance |
apply_marketplace_item | 安装捕虾场物品 | runApplyMarketplaceItem |
bootstrap_profession_instance | 初始化职业虾实例 | runBootstrapProfession |
sync_models_to_instance | 同步模型列表到实例 | runSyncModelsV2 |
sync_key_to_instance | 同步 LLM 子 Key 到实例 | runSyncKeyToInstance |
任务生命周期
pending → processing → done
↘ failed (attempts < 5 → 自动重试)
↘ dead (attempts >= 5)
数据库
CREATE TABLE task_queue (
id VARCHAR(64) PRIMARY KEY,
type VARCHAR(50) NOT NULL,
payload TEXT,
status VARCHAR(20) DEFAULT 'pending',
attempts INT DEFAULT 0,
error TEXT,
created_at DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
updated_at DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
KEY idx_status (status)
);
3. API
定时任务日志
GET /api/v1/job-runs?job_name=llm_self_heal&limit=20
任务队列
GET /api/v1/task-queue?status=pending&type=sync_key_to_instance&limit=20
POST /api/v1/task-queue/:id/retry
管理任务(别名)
GET /api/v1/tasks → 列表
POST /api/v1/tasks → 手动创建任务
GET /api/v1/tasks/:id → 详情
POST /api/v1/tasks/:id/retry → 重试
4. 如何添加新任务类型
- 在
scheduler.go的queueWorkerswitch 中添加 case - 实现处理函数
func runXxx(payload map[string]interface{}) (string, bool) - 在需要的地方调用
enqueueTask(type, payload, source)入队