Skip to content

Commit a9002ef

Browse files
committed
add some pages
1 parent 58b6306 commit a9002ef

6 files changed

Lines changed: 397 additions & 0 deletions

File tree

content/zh/docs/features/event.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
---
2+
weight: 202
3+
title: "事件"
4+
description: "关于如何发送事件和监听事件"
5+
icon: "article"
6+
date: "2025-04-20T17:38:13+08:00"
7+
lastmod: "2025-04-20T17:38:13+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
事件一般用于代码解耦。在 OGraph 中,可以让节点发送事件,然后在 pipeline 上监听节点所发送的事件。
13+
14+
例如,节点可以发送一个代表发生了错误的事件,pipeline 捕获到这个事件后可以通过 webhook 上报。
15+
16+
这样节点只要专注于自己的业务逻辑,在 pipeline 层面也不用针对某个节点做错误处理。
17+
18+
## 示例
19+
20+
要让节点支持发送事件,那么首先需要让节点继承(内嵌)ograph.BaseEventNode,然后节点就可以调用 Emit 方法发送事件了。
21+
22+
```go
23+
type TEventNode struct {
24+
ograph.BaseEventNode
25+
}
26+
27+
func (node *TEventNode) Run(ctx context.Context, state ogcore.State) error {
28+
state.Set("msg", "hi, it is a test event.")
29+
node.Emit("test", state) // pass event info by state
30+
return nil
31+
}
32+
```
33+
34+
然后就是在 pipeline 中监听和处理事件了。
35+
36+
```go
37+
pipeline := ograph.NewPipeline()
38+
39+
pipeline.Subscribe(func(event string, obj ogcore.State) bool {
40+
msg, _ := obj.Get("msg")
41+
fmt.Printf("get message from %s event: %v\n", event, msg)
42+
return true
43+
}, eventd.On("test"))
44+
45+
pipeline.Register(ograph.NewElement("n").UseNode(&TEventNode{}))
46+
47+
if err := pipeline.Run(context.Background(), nil); err != nil {
48+
fmt.Println(err)
49+
}
50+
```
51+
52+
OGraph 使用了 github.com/symphony09/eventd 库来实现事件机制,eventd.On("test") 表示监听 test 事件,支持正则匹配事件名,详细用法可以参考 eventd 库的文档。
53+
54+
输出结果如下:
55+
56+
```
57+
get message from test event: hi, it is a test event.
58+
```
59+
60+
## 同步和异步
61+
62+
遵循 go 的哲学, OGraph 发送事件默认是同步的。也就是说 `node.Emit()` 会阻塞节点,如果你需要异步,那么就用 `go node.Emit()`
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
---
2+
weight: 202
3+
title: "并发度限制"
4+
description: "关于如何限制节点并发度"
5+
icon: "article"
6+
date: "2025-04-20T19:25:06+08:00"
7+
lastmod: "2025-04-20T19:25:06+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
在现实世界中,资源总是受限的。CPU 资源是受限的,内存容量是受限的,网络带宽是受限的。
13+
14+
所以如果 pipeline 节点并发度过高可能出现:
15+
16+
1. CPU:严重影响其他进程运行
17+
2. 内存:OOM
18+
19+
OGraph 支持设置并发度限制来避免这些问题。
20+
21+
## 示例
22+
23+
```go
24+
pipeline := ograph.NewPipeline()
25+
26+
n1 := ograph.NewElement("n1").UseFn(func() error {
27+
fmt.Println("n1 running")
28+
time.Sleep(time.Second)
29+
fmt.Println("n1 stop")
30+
return nil
31+
})
32+
33+
n2 := ograph.NewElement("n2").UseFn(func() error {
34+
fmt.Println("n2 running")
35+
fmt.Println("n2 stop")
36+
return nil
37+
})
38+
pipeline.Register(n1).Register(n2)
39+
40+
pipeline.ParallelismLimit = 1
41+
42+
if err := pipeline.Run(context.Background(), nil); err != nil {
43+
fmt.Println(err)
44+
}
45+
```
46+
47+
输出结果:
48+
49+
```
50+
n1 running
51+
n1 stop
52+
n2 running
53+
n2 stop
54+
```
55+
56+
从输出结果可以看到,虽然 n1 节点和 n2 节点之间没有依赖关系,但在设置并发度为一后,同一时间只能有一个节点在运行。因此即使 n1 节点陷入睡眠了 1 秒,期间 n2 也不能开始运行。
57+
58+
{{< alert context="info" text="这里说的并发度,更准确的说,应该是指并行度。" />}}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
---
2+
weight: 202
3+
title: "暂停、继续和取消"
4+
description: "关于如何暂停、继续和取消 pipeline"
5+
icon: "article"
6+
date: "2025-04-20T15:22:48+08:00"
7+
lastmod: "2025-04-20T15:22:48+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
## 示例
13+
14+
OGraph Pipeline 支持暂停运行、继续运行和取消运行,示例代码如下:
15+
16+
```go
17+
pipeline := ograph.NewPipeline()
18+
19+
var startTime time.Time
20+
21+
a := ograph.NewElement("a").UseFn(func() error {
22+
fmt.Println("a running")
23+
startTime = time.Now()
24+
time.Sleep(time.Second)
25+
return nil
26+
})
27+
28+
b := ograph.NewElement("b").UseFn(func() error {
29+
fmt.Println("b running, after", time.Since(startTime))
30+
time.Sleep(time.Second)
31+
return nil
32+
})
33+
34+
c := ograph.NewElement("c").UseFn(func() error {
35+
fmt.Println("c running, after", time.Since(startTime))
36+
return nil
37+
})
38+
39+
pipeline.Register(a).Register(b, ograph.Rely(a)).Register(c, ograph.Rely(b))
40+
41+
ctx, cancel := context.WithCancel(context.Background())
42+
43+
pause, continueRun, wait := pipeline.AsyncRun(ctx, nil)
44+
45+
time.Sleep(time.Millisecond * 500)
46+
47+
pause() // pause before run node b
48+
49+
time.Sleep(time.Second)
50+
51+
continueRun() // b continue run, after 1.5s (0.5+1), not 1s
52+
53+
time.Sleep(time.Millisecond * 500)
54+
55+
cancel() // cancel before run node c, so node c will never be ran
56+
57+
err := wait() // wait pipeline finish
58+
59+
fmt.Println(err) // should get context canceled err
60+
```
61+
62+
## 流程图
63+
64+
示例代码运行过程如下:
65+
66+
```mermaid
67+
%%{init: { 'logLevel': 'debug', 'theme': 'default', 'timeline': {'disableMulticolor': true} } }%%
68+
timeline
69+
title Execution process
70+
0s~0.5s : Node a running
71+
0.5s~1s : Node a finish
72+
: Pipeline pause
73+
1s~1.5s : Pipeline pause
74+
1.5s~2s : Pipeline continue : Node b running
75+
2s~2.5s : Node b finish : Pipeline cancel
76+
77+
```
78+
79+
{{< alert context="info" text="暂停不会影响正在运行中的节点。如果节点本身不支持取消,那么取消也不会影响运行中的节点。" />}}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
---
2+
weight: 202
3+
title: "优先级调度"
4+
description: "关于如何在资源有限的情况下使节点分优先级调度"
5+
icon: "article"
6+
date: "2025-04-20T18:46:29+08:00"
7+
lastmod: "2025-04-20T18:46:29+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
在某些资源受限的情况下,可以使用 OGraph 提供的优先级调度功能,使得重要节点优先执行。
13+
14+
例如在图片处理程序中,需要对一批图片进行计算处理。这些图片大小不一,希望优先处理小图片,这样在短时间内可以处理尽可能多的图片(图片越小,处理越快)。
15+
16+
这种情况下就需要限制并发度,并且让小图片任务先执行。(在完全并发情况下,大图片任务会挤占小图片任务的CPU时间)
17+
18+
## 示例
19+
20+
```go
21+
pipeline := ograph.NewPipeline()
22+
23+
n1 := ograph.NewElement("n1").UseFn(func() error {
24+
fmt.Println("n1 running")
25+
return nil
26+
}).SetPriority(99)
27+
28+
n2 := ograph.NewElement("n2").UseFn(func() error {
29+
fmt.Println("n2 running")
30+
return nil
31+
}).SetPriority(1)
32+
33+
pipeline.Register(n1).Register(n2)
34+
35+
pipeline.ParallelismLimit = 1
36+
37+
if err := pipeline.Run(context.Background(), nil); err != nil {
38+
fmt.Println(err)
39+
}
40+
```
41+
42+
上面代码中用 `pipeline.ParallelismLimit = 1` 限制了并发度,如果不限制并发度,节点 n2 还是会马上执行。
43+
44+
建议对计算密集任务限制并发度为 CPU 核数,对于需要占用大量内存或者其他资源的任务,设置为总的资源量除于任务平均消耗量。
45+
46+
输出结果:
47+
48+
```
49+
n1 running
50+
n2 running
51+
```
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
---
2+
weight: 201
3+
title: "子 Pipeline"
4+
description: "关于如何嵌套子 pipeline"
5+
icon: "article"
6+
date: "2025-04-20T18:19:09+08:00"
7+
lastmod: "2025-04-20T18:19:09+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
OGraph 支持嵌套子 Pipeline 来分治复杂的工作流,并且对嵌套深度没有限制。
13+
14+
## 示例
15+
16+
实现嵌套 pipeline 是非常简单的,只要声明一个 pipeline,然后像普通节点一样在另一个 pipeline 中注册它就可以了。
17+
18+
```go
19+
study := newSubPipeline("LearnProgramming", "LearnEnglish")
20+
relax := newSubPipeline("PlayGame", "Sleep")
21+
22+
studyThings := ograph.NewElement("StudyThings").UseNode(study)
23+
relaxThings := ograph.NewElement("RelaxThings").UseNode(relax)
24+
25+
day := ograph.NewPipeline()
26+
day.Register(studyThings).Register(relaxThings, ograph.Rely(studyThings))
27+
28+
if err := day.Run(context.TODO(), nil); err != nil {
29+
fmt.Println(err)
30+
}
31+
```
32+
33+
上面代码中,创建了两个子 pipeline,模拟两类事情,然后再在主 pipeline 中注册为两个节点,并指定依赖关系。
34+
35+
为了代码简洁,构建子pipeline的逻辑放到了一个函数中,实际上和一般的创建过程没有任何区别
36+
37+
```go
38+
func newSubPipeline(things ...string) *ograph.Pipeline {
39+
pipeline := ograph.NewPipeline()
40+
41+
for _, thing := range things {
42+
thing := thing
43+
pipeline.Register(ograph.NewElement(thing).UseFn(func() error {
44+
fmt.Printf("->%s", thing)
45+
return nil
46+
}))
47+
}
48+
49+
return pipeline
50+
}
51+
```
52+
53+
输出结果:
54+
55+
```
56+
->LearnEnglish->LearnProgramming->Sleep->PlayGame
57+
```
58+
59+
子pipeline内部节点没有指定依赖关系,随机顺序打印,主 pipeline 指定了子 pipeline 的依赖关系,以固定顺序打印。

0 commit comments

Comments
 (0)