Skip to content

Commit a692133

Browse files
committed
feat: worker 支持通过 cron 触发
1 parent 6a43a9f commit a692133

33 files changed

Lines changed: 1570 additions & 213 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,6 @@ dist/
1111

1212
# Go
1313
coverage.out
14+
15+
# other
16+
.work_note.md

.vscode/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{
2+
}

AGENTS.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@
2424

2525
```text
2626
cmd/main.go # 程序入口,启动 Router/Admin 双服务
27-
internal/admin # Admin API(鉴权、Worker 管理、文件管理)
28-
internal/router # Router 动态执行入口
29-
internal/executor # 脚本执行与超时控制
30-
internal/requestparse # JSON/form/multipart 解析
31-
internal/registry # 路由内存索引
32-
internal/db # SQLite 持久化
33-
internal/model # 领域模型(包含 FuncInput/FuncRequest/FuncOutput)
34-
pages/src # Admin 服务的前端源码(Settings/Workers/WorkerInfo/Dependencies)
35-
public # 前端构建产物(由后端直接托管)
27+
internal/admin # admin 服务,负责项目所有管理功能
28+
internal/router # worker 的 http 路由定义
29+
internal/executor # http、cron 触发调用 worker 时的执行器
30+
internal/common # 公共通用模块,JSON/form/multipart 解析
31+
internal/db # 数据库层,使用 ORM 提供操作数据的接口
32+
internal/model # 数据库实体与 Go 实体
33+
pages/ # 前端源代码
3634
migrations/001_init.sql # 建表 SQL
3735
.github/workflows # CI(镜像构建推送)
3836
```
@@ -138,6 +136,7 @@ docker compose up --build
138136
3. 关键流程允许添加简洁中文注释,避免注释噪音。
139137
4. 新增路径、包名、模型名时,必须全局搜索引用并一次性更新。
140138
5. 禁止使用无意义命名和难以理解的随意缩写;允许使用行业内约定俗成、语义明确的常见缩写(如 `cfg``reg``dao`),但所有变量名、函数名、方法名都必须名如其意,看到名称就能理解其职责和用途。
139+
6. 避免做不需要的兜底开发,一般情况下,你只需要考虑当下就行,不必考虑到未来的各种参数值不合法问题。在不明确时,询问我是否需要进行兜底。
141140

142141
## 8. 变更流程(代理执行)
143142

cmd/main.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ import (
1616

1717
"callit/internal/admin"
1818
"callit/internal/config"
19+
"callit/internal/cron"
1920
"callit/internal/db"
20-
"callit/internal/registry"
21+
"callit/internal/executor"
2122
"callit/internal/router"
2223

2324
"golang.org/x/sync/errgroup"
@@ -38,15 +39,21 @@ func main() {
3839
log.Fatalf("加载应用配置失败: %v", err)
3940
}
4041

41-
reg := registry.New()
42+
reg := router.New()
4243
funcs, err := store.Worker.ListEnabled(context.Background())
4344
if err != nil {
4445
log.Fatalf("加载启用函数失败: %v", err)
4546
}
4647
reg.Reload(funcs)
4748

48-
routerEngine := router.NewEngine(store, reg, cfg.DataDir)
49-
adminEngine := admin.NewEngine(store, reg, cfg)
49+
invoker := executor.NewService(store, cfg.DataDir)
50+
cronManager := cron.NewManager(store, invoker, time.Local)
51+
if err := cronManager.Start(context.Background()); err != nil {
52+
log.Fatalf("启动 cron 调度器失败: %v", err)
53+
}
54+
55+
routerEngine := router.NewEngine(store, reg, cfg.DataDir, invoker)
56+
adminEngine := admin.NewEngine(store, reg, cronManager, cfg)
5057
handler := serverRouteHandler(adminEngine, routerEngine, cfg.AdminPrefix)
5158

5259
srv := &http.Server{

internal/admin/message/request.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,19 @@ type ChatStreamRequest struct {
5454
Message string `json:"message"`
5555
HistoryLimit int `json:"history_limit"`
5656
}
57+
58+
// CreateCronTaskRequest 表示创建 cron_task 的请求体。
59+
type CreateCronTaskRequest struct {
60+
Cron string `json:"cron"`
61+
}
62+
63+
// UpdateCronTaskRequest 表示更新 cron_task 的请求体。
64+
type UpdateCronTaskRequest struct {
65+
ID int64 `json:"id,string"`
66+
Cron string `json:"cron"`
67+
}
68+
69+
// CronTaskIDRequest 表示仅包含 cron_task 主键 ID 的请求体。
70+
type CronTaskIDRequest struct {
71+
ID int64 `json:"id,string"`
72+
}

internal/admin/server.go

Lines changed: 194 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"callit/internal/admin/chat"
2323
"callit/internal/admin/message"
2424
"callit/internal/common"
25+
"callit/internal/common/snowflake"
2526
"callit/internal/config"
27+
"callit/internal/cron"
2628
"callit/internal/db"
2729
"callit/internal/model"
28-
"callit/internal/registry"
30+
"callit/internal/router"
2931

3032
"github.com/gin-gonic/gin"
3133
"github.com/google/uuid"
@@ -41,12 +43,14 @@ const (
4143

4244
// Server 表示 Admin 服务。
4345
type Server struct {
44-
store *db.Store
45-
reg *registry.Registry
46-
dataDir string
47-
adminToken string
48-
chatHandler *chat.Handler
49-
configMu sync.RWMutex
46+
store *db.Store
47+
reg *router.Registry
48+
cronReloader interface{ Reload(context.Context) error }
49+
idGenerator *snowflake.Generator
50+
dataDir string
51+
adminToken string
52+
chatHandler *chat.Handler
53+
configMu sync.RWMutex
5054

5155
dependencyTaskMu sync.Mutex
5256
dependencyTaskRunning bool
@@ -69,13 +73,15 @@ func apiError(c *gin.Context, httpStatus int, msg string) {
6973
}
7074

7175
// NewEngine 创建 Admin Gin 引擎。
72-
func NewEngine(store *db.Store, reg *registry.Registry, cfg config.Config) *gin.Engine {
76+
func NewEngine(store *db.Store, reg *router.Registry, cronReloader interface{ Reload(context.Context) error }, cfg config.Config) *gin.Engine {
7377
s := &Server{
74-
store: store,
75-
reg: reg,
76-
dataDir: cfg.DataDir,
77-
adminToken: cfg.AdminToken,
78-
chatHandler: chat.NewHandler(store, cfg.DataDir, cfg.AppConfig),
78+
store: store,
79+
reg: reg,
80+
cronReloader: cronReloader,
81+
idGenerator: snowflake.NewGenerator(1),
82+
dataDir: cfg.DataDir,
83+
adminToken: cfg.AdminToken,
84+
chatHandler: chat.NewHandler(store, cfg.DataDir, cfg.AppConfig),
7985
}
8086
e := gin.New()
8187
e.Use(gin.Recovery(), common.RequestIDMiddleware())
@@ -102,6 +108,10 @@ func NewEngine(store *db.Store, reg *registry.Registry, cfg config.Config) *gin.
102108
api.POST("/workers/disable", s.disableWorker)
103109

104110
api.GET("/workers/:id/logs", s.listWorkerLogs)
111+
api.GET("/workers/:id/crons", s.listWorkerCrons)
112+
api.POST("/workers/:id/crons/create", s.createWorkerCron)
113+
api.POST("/workers/:id/crons/update", s.updateWorkerCron)
114+
api.POST("/workers/:id/crons/delete", s.deleteWorkerCron)
105115

106116
api.GET("/workers/:id/files", s.listWorkerFiles)
107117
api.GET("/workers/:id/files/:filename", s.getFileContent)
@@ -266,7 +276,7 @@ func (s *Server) createWorker(c *gin.Context) {
266276
return
267277
}
268278

269-
if err := s.reloadRegistry(c.Request.Context()); err != nil {
279+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
270280
apiError(c, http.StatusInternalServerError, err.Error())
271281
return
272282
}
@@ -343,7 +353,7 @@ func (s *Server) updateWorker(c *gin.Context) {
343353
return
344354
}
345355

346-
if err := s.reloadRegistry(c.Request.Context()); err != nil {
356+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
347357
apiError(c, http.StatusInternalServerError, err.Error())
348358
return
349359
}
@@ -451,13 +461,17 @@ func (s *Server) deleteWorker(c *gin.Context) {
451461
apiError(c, http.StatusInternalServerError, err.Error())
452462
return
453463
}
464+
if err := s.store.CronTask.DeleteByWorkerID(c.Request.Context(), id); err != nil {
465+
apiError(c, http.StatusInternalServerError, err.Error())
466+
return
467+
}
454468

455469
functionDir := filepath.Join(s.dataDir, "workers", id)
456470
if err := os.RemoveAll(functionDir); err != nil {
457471
apiError(c, http.StatusInternalServerError, fmt.Sprintf("删除函数目录失败: %v", err))
458472
return
459473
}
460-
if err := s.reloadRegistry(c.Request.Context()); err != nil {
474+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
461475
apiError(c, http.StatusInternalServerError, err.Error())
462476
return
463477
}
@@ -807,13 +821,25 @@ func (s *Server) setWorkerEnabled(c *gin.Context, enabled bool) {
807821
apiError(c, http.StatusInternalServerError, err.Error())
808822
return
809823
}
810-
if err := s.reloadRegistry(c.Request.Context()); err != nil {
824+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
811825
apiError(c, http.StatusInternalServerError, err.Error())
812826
return
813827
}
814828
apiSuccess(c, updated)
815829
}
816830

831+
func (s *Server) reloadWorkersState(ctx context.Context) error {
832+
if err := s.reloadRegistry(ctx); err != nil {
833+
return err
834+
}
835+
if s.cronReloader != nil {
836+
if err := s.cronReloader.Reload(ctx); err != nil {
837+
return fmt.Errorf("重载 cron 调度器失败: %w", err)
838+
}
839+
}
840+
return nil
841+
}
842+
817843
func (s *Server) reloadRegistry(ctx context.Context) error {
818844
reloadCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
819845
defer cancel()
@@ -825,6 +851,157 @@ func (s *Server) reloadRegistry(ctx context.Context) error {
825851
return nil
826852
}
827853

854+
func (s *Server) listWorkerCrons(c *gin.Context) {
855+
workerID, ok := s.requireWorker(c)
856+
if !ok {
857+
return
858+
}
859+
860+
tasks, err := s.store.CronTask.ListByWorkerID(c.Request.Context(), workerID)
861+
if err != nil {
862+
apiError(c, http.StatusInternalServerError, err.Error())
863+
return
864+
}
865+
apiSuccess(c, tasks)
866+
}
867+
868+
func (s *Server) createWorkerCron(c *gin.Context) {
869+
workerID, ok := s.requireWorker(c)
870+
if !ok {
871+
return
872+
}
873+
874+
var req message.CreateCronTaskRequest
875+
if err := c.ShouldBindJSON(&req); err != nil {
876+
apiError(c, http.StatusBadRequest, "请求体格式错误")
877+
return
878+
}
879+
880+
cronExpr := strings.TrimSpace(req.Cron)
881+
if err := s.validateCronExpression(cronExpr); err != nil {
882+
apiError(c, http.StatusBadRequest, err.Error())
883+
return
884+
}
885+
886+
taskID, err := s.idGenerator.NextID()
887+
if err != nil {
888+
apiError(c, http.StatusInternalServerError, err.Error())
889+
return
890+
}
891+
892+
created, err := s.store.CronTask.Create(c.Request.Context(), model.CronTask{
893+
ID: taskID,
894+
WorkerID: workerID,
895+
Cron: cronExpr,
896+
})
897+
if err != nil {
898+
apiError(c, http.StatusInternalServerError, err.Error())
899+
return
900+
}
901+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
902+
apiError(c, http.StatusInternalServerError, err.Error())
903+
return
904+
}
905+
apiSuccess(c, created)
906+
}
907+
908+
func (s *Server) updateWorkerCron(c *gin.Context) {
909+
workerID, ok := s.requireWorker(c)
910+
if !ok {
911+
return
912+
}
913+
914+
var req message.UpdateCronTaskRequest
915+
if err := c.ShouldBindJSON(&req); err != nil {
916+
apiError(c, http.StatusBadRequest, "请求体格式错误")
917+
return
918+
}
919+
if req.ID <= 0 {
920+
apiError(c, http.StatusBadRequest, "id 不能为空")
921+
return
922+
}
923+
924+
cronExpr := strings.TrimSpace(req.Cron)
925+
if err := s.validateCronExpression(cronExpr); err != nil {
926+
apiError(c, http.StatusBadRequest, err.Error())
927+
return
928+
}
929+
930+
updated, err := s.store.CronTask.Update(c.Request.Context(), model.CronTask{
931+
ID: req.ID,
932+
WorkerID: workerID,
933+
Cron: cronExpr,
934+
})
935+
if err != nil {
936+
if errors.Is(err, sql.ErrNoRows) {
937+
apiError(c, http.StatusNotFound, "cron_task 不存在")
938+
return
939+
}
940+
apiError(c, http.StatusInternalServerError, err.Error())
941+
return
942+
}
943+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
944+
apiError(c, http.StatusInternalServerError, err.Error())
945+
return
946+
}
947+
apiSuccess(c, updated)
948+
}
949+
950+
func (s *Server) deleteWorkerCron(c *gin.Context) {
951+
workerID, ok := s.requireWorker(c)
952+
if !ok {
953+
return
954+
}
955+
956+
var req message.CronTaskIDRequest
957+
if err := c.ShouldBindJSON(&req); err != nil {
958+
apiError(c, http.StatusBadRequest, "请求体格式错误")
959+
return
960+
}
961+
if req.ID <= 0 {
962+
apiError(c, http.StatusBadRequest, "id 不能为空")
963+
return
964+
}
965+
966+
if err := s.store.CronTask.Delete(c.Request.Context(), req.ID, workerID); err != nil {
967+
if errors.Is(err, sql.ErrNoRows) {
968+
apiError(c, http.StatusNotFound, "cron_task 不存在")
969+
return
970+
}
971+
apiError(c, http.StatusInternalServerError, err.Error())
972+
return
973+
}
974+
if err := s.reloadWorkersState(c.Request.Context()); err != nil {
975+
apiError(c, http.StatusInternalServerError, err.Error())
976+
return
977+
}
978+
apiSuccess(c, gin.H{"ok": true})
979+
}
980+
981+
func (s *Server) requireWorker(c *gin.Context) (string, bool) {
982+
workerID := strings.TrimSpace(c.Param("id"))
983+
if workerID == "" {
984+
apiError(c, http.StatusBadRequest, "id 不能为空")
985+
return "", false
986+
}
987+
if _, err := s.store.Worker.GetByID(c.Request.Context(), workerID); err != nil {
988+
if errors.Is(err, sql.ErrNoRows) {
989+
apiError(c, http.StatusNotFound, "函数不存在")
990+
return "", false
991+
}
992+
apiError(c, http.StatusInternalServerError, err.Error())
993+
return "", false
994+
}
995+
return workerID, true
996+
}
997+
998+
func (s *Server) validateCronExpression(expr string) error {
999+
if strings.TrimSpace(expr) == "" {
1000+
return fmt.Errorf("cron 不能为空")
1001+
}
1002+
return cron.ValidateExpression(expr)
1003+
}
1004+
8281005
func validateWorker(worker model.Worker) error {
8291006
if strings.TrimSpace(worker.Name) == "" {
8301007
return fmt.Errorf("name 不能为空")

0 commit comments

Comments
 (0)