Skip to content

Commit ef07730

Browse files
committed
add some pages
1 parent 6365d39 commit ef07730

9 files changed

Lines changed: 519 additions & 0 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
---
2+
weight: 999
3+
title: "Parallel Merge Sort"
4+
description: ""
5+
icon: "article"
6+
date: "2025-05-04T22:58:38+08:00"
7+
lastmod: "2025-05-04T22:58:38+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
## Example Description
13+
14+
A primary use case for OGraph is accelerating parallel computations. Taking merge sort as an example, conventional merge sort algorithms do not leverage multi-core CPU parallelism.
15+
16+
During the merge sort process, each subsequence needs to be sorted. If these sorts can be performed concurrently, it significantly reduces computation time.
17+
18+
In this scenario, OGraph enables task parallelism without complicating the code structure.
19+
20+
## Example Code
21+
22+
The following code implements a parallelized merge sort. It first defines a merge function to combine two already sorted sequences.
23+
24+
Then, it defines sorting tasks for subsequences, merges them into the final result using the merge function, and defines a validation task to check the final result.
25+
26+
Finally, a pipeline is created, these tasks are registered, and the pipeline is executed.
27+
28+
(For code brevity, standard library sorting is used for subsequences rather than recursively applying the merge function as in conventional merge sort.)
29+
30+
```go
31+
package main
32+
33+
import (
34+
"context"
35+
"fmt"
36+
"log"
37+
"math/rand"
38+
"slices"
39+
"sync"
40+
41+
"github.com/symphony09/ograph"
42+
)
43+
44+
func merge(left, right []int) []int {
45+
result := make([]int, 0, len(left)+len(right))
46+
for len(left) > 0 || len(right) > 0 {
47+
if len(left) == 0 {
48+
return append(result, right...)
49+
}
50+
if len(right) == 0 {
51+
return append(result, left...)
52+
}
53+
if left[0] < right[0] {
54+
result = append(result, left[0])
55+
left = left[1:]
56+
} else {
57+
result = append(result, right[0])
58+
right = right[1:]
59+
}
60+
}
61+
return result
62+
}
63+
64+
func main() {
65+
size := 1000
66+
randomInts := make([]int, 0, size)
67+
sortedInts := make([]int, 0, size)
68+
mux := &sync.Mutex{}
69+
70+
for i := 0; i < size; i++ {
71+
randomInts = append(randomInts, rand.Intn(10000))
72+
}
73+
74+
sortTasks := make([]*ograph.Element, 0, 10)
75+
76+
for i := 0; i < 10; i++ {
77+
part := randomInts[i*100 : (i+1)*100]
78+
79+
t := ograph.NewElement(fmt.Sprintf("t%d", i)).UseFn(func() error {
80+
slices.Sort(part)
81+
mux.Lock()
82+
sortedInts = merge(sortedInts, part)
83+
mux.Unlock()
84+
return nil
85+
})
86+
87+
sortTasks = append(sortTasks, t)
88+
}
89+
90+
checkTask := ograph.NewElement("check").UseFn(func() error {
91+
if len(sortedInts) != 1000 || !slices.IsSorted(sortedInts) {
92+
return fmt.Errorf("got wrong sort result")
93+
}
94+
return nil
95+
})
96+
97+
p := ograph.NewPipeline()
98+
err := p.Register(checkTask, ograph.Rely(sortTasks...)).Run(context.Background(), nil)
99+
if err != nil {
100+
log.Fatalf("run merge sort pipeline failed, err: %v\n", err)
101+
}
102+
103+
fmt.Println("result:", sortedInts)
104+
}

content/en/docs/features/event.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
weight: 202
3+
title: "Events"
4+
description: "How to send events and listen for events"
5+
icon: "article"
6+
date: "2025-04-20T17:38:13+08:00"
7+
lastmod: "2025-04-20T17:38:13+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
Events are generally used for code decoupling. In OGraph, nodes can send events, and pipelines can listen for events sent by nodes.
13+
14+
For example, a node can send an event representing an error occurrence. After the pipeline captures this event, it can report it via webhook.
15+
16+
This way, nodes only need to focus on their business logic, and the pipeline layer doesn't need to handle errors for specific nodes.
17+
18+
## Example
19+
20+
To enable a node to send events, first make the node inherit (embed) ograph.BaseEventNode. Then the node can call the Emit method to send events.
21+
22+
```go
23+
type TEventNode struct {
24+
ograph.BaseEventNode
25+
}
26+
27+
func (node *TEventNode) Run(ctx context.Context, state ogcore.State) error {
28+
state.Set("msg", "hi, it is a test event.")
29+
node.Emit("test", state) // pass event info by state
30+
return nil
31+
}
32+
```
33+
34+
Then, listen and handle events in the pipeline.
35+
36+
```go
37+
pipeline := ograph.NewPipeline()
38+
39+
pipeline.Subscribe(func(event string, obj ogcore.State) bool {
40+
msg, _ := obj.Get("msg")
41+
fmt.Printf("get message from %s event: %v\n", event, msg)
42+
return true
43+
}, eventd.On("test"))
44+
45+
pipeline.Register(ograph.NewElement("n").UseNode(&TEventNode{}))
46+
47+
if err := pipeline.Run(context.Background(), nil); err != nil {
48+
fmt.Println(err)
49+
}
50+
```
51+
52+
OGraph uses the github.com/symphony09/eventd library to implement the event mechanism. eventd.On("test") indicates listening for the "test" event, supporting regular expression matching for event names. For detailed usage, please refer to the eventd library's documentation.
53+
54+
Output result:
55+
```
56+
get message from test event: hi, it is a test event.
57+
```
58+
59+
## Synchronous and Asynchronous
60+
61+
Following Go's philosophy, OGraph sends events synchronously by default. That is, `node.Emit()` will block the node. If you need asynchronous processing, use `go node.Emit()`.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
---
2+
weight: 202
3+
title: "Parallelism Limit"
4+
description: "How to limit node parallelism"
5+
icon: "article"
6+
date: "2025-04-20T19:25:06+08:00"
7+
lastmod: "2025-04-20T19:25:06+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
In the real world, resources are always limited. CPU resources are limited, memory capacity is limited, and network bandwidth is limited.
13+
14+
If the pipeline node parallelism is too high, it may lead to:
15+
16+
1. CPU: Severely impact other processes
17+
2. Memory: Out Of Memory (OOM)
18+
19+
OGraph supports setting parallelism limits to avoid these issues.
20+
21+
## Example
22+
23+
```go
24+
pipeline := ograph.NewPipeline()
25+
26+
n1 := ograph.NewElement("n1").UseFn(func() error {
27+
fmt.Println("n1 running")
28+
time.Sleep(time.Second)
29+
fmt.Println("n1 stop")
30+
return nil
31+
})
32+
33+
n2 := ograph.NewElement("n2").UseFn(func() error {
34+
fmt.Println("n2 running")
35+
fmt.Println("n2 stop")
36+
return nil
37+
})
38+
pipeline.Register(n1).Register(n2)
39+
40+
pipeline.ParallelismLimit = 1
41+
42+
if err := pipeline.Run(context.Background(), nil); err != nil {
43+
fmt.Println(err)
44+
}
45+
```
46+
47+
Output result:
48+
49+
```
50+
n1 running
51+
n1 stop
52+
n2 running
53+
n2 stop
54+
```
55+
56+
From the output, you can see that although there's no dependency between n1 and n2 nodes, after setting the parallelism to 1, only one node can run at a time. Therefore, even if n1 node sleeps for 1 second, n2 cannot start during that period.
57+
58+
{{< alert context="info" text="What's mentioned here as 'parallelism' is more accurately referred to as 'concurrency' in technical terms." />}}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
---
2+
weight: 202
3+
title: "Pause, Continue, and Cancel"
4+
description: "How to pause, continue, and cancel a pipeline"
5+
icon: "article"
6+
date: "2025-04-20T15:22:48+08:00"
7+
lastmod: "2025-04-20T15:22:48+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
## Example
13+
14+
OGraph Pipeline supports pausing, resuming, and canceling execution. Here's an example:
15+
16+
```go
17+
pipeline := ograph.NewPipeline()
18+
19+
var startTime time.Time
20+
21+
a := ograph.NewElement("a").UseFn(func() error {
22+
fmt.Println("a running")
23+
startTime = time.Now()
24+
time.Sleep(time.Second)
25+
return nil
26+
})
27+
28+
b := ograph.NewElement("b").UseFn(func() error {
29+
fmt.Println("b running, after", time.Since(startTime))
30+
time.Sleep(time.Second)
31+
return nil
32+
})
33+
34+
c := ograph.NewElement("c").UseFn(func() error {
35+
fmt.Println("c running, after", time.Since(startTime))
36+
return nil
37+
})
38+
39+
pipeline.Register(a).Register(b, ograph.Rely(a)).Register(c, ograph.Rely(b))
40+
41+
ctx, cancel := context.WithCancel(context.Background())
42+
43+
pause, continueRun, wait := pipeline.AsyncRun(ctx, nil)
44+
45+
time.Sleep(time.Millisecond * 500)
46+
47+
pause() // pause before running node b
48+
49+
time.Sleep(time.Second)
50+
51+
continueRun() // b continues running, after 1.5s (0.5+1), not 1s
52+
53+
time.Sleep(time.Millisecond * 500)
54+
55+
cancel() // cancel before running node c, so node c will never execute
56+
57+
err := wait() // wait for pipeline completion
58+
59+
fmt.Println(err) // should get context canceled error
60+
```
61+
62+
## Execution Process
63+
64+
The example code execution flow is as follows:
65+
66+
```mermaid
67+
%%{init: { 'logLevel': 'debug', 'theme': 'default', 'timeline': {'disableMulticolor': true} } }%%
68+
timeline
69+
title Execution process
70+
0s~0.5s : Node a running
71+
0.5s~1s : Node a finishes
72+
: Pipeline pauses
73+
1s~1.5s : Pipeline paused
74+
1.5s~2s : Pipeline continues : Node b running
75+
2s~2.5s : Node b finishes : Pipeline canceled
76+
```
77+
78+
{{< alert context="info" text="Pausing does not affect nodes already running. If a node does not support cancellation, cancellation will not affect nodes already running." />}}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
---
2+
weight: 202
3+
title: "Priority Scheduling"
4+
description: "How to prioritize node execution when resources are limited"
5+
icon: "article"
6+
date: "2025-04-20T18:46:29+08:00"
7+
lastmod: "2025-04-20T18:46:29+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
In resource-constrained scenarios, OGraph's priority scheduling feature allows important nodes to execute first.
13+
14+
For example, in an image processing application that needs to process a batch of images of varying sizes, smaller images should be prioritized to process as many images as possible in a short time (smaller images take less time to process).
15+
16+
In this scenario, it's necessary to limit concurrency and let smaller image tasks execute first. (In fully concurrent scenarios, large image tasks would consume CPU time from smaller tasks)
17+
18+
## Example
19+
20+
```go
21+
pipeline := ograph.NewPipeline()
22+
23+
n1 := ograph.NewElement("n1").UseFn(func() error {
24+
fmt.Println("n1 running")
25+
return nil
26+
}).SetPriority(99)
27+
28+
n2 := ograph.NewElement("n2").UseFn(func() error {
29+
fmt.Println("n2 running")
30+
return nil
31+
}).SetPriority(1)
32+
33+
pipeline.Register(n1).Register(n2)
34+
35+
pipeline.ParallelismLimit = 1
36+
37+
if err := pipeline.Run(context.Background(), nil); err != nil {
38+
fmt.Println(err)
39+
}
40+
```
41+
42+
In the code above, `pipeline.ParallelismLimit = 1` limits concurrency. Without this limit, node n2 would still execute immediately.
43+
44+
It's recommended to set concurrency limits for compute-intensive tasks to the number of CPU cores, and for tasks requiring significant memory or other resources, set it to total resources divided by average resource consumption per task.
45+
46+
Output:
47+
48+
```
49+
n1 running
50+
n2 running
51+
```

0 commit comments

Comments
 (0)