Skip to content

Commit 6365d39

Browse files
committed
add example
1 parent a9002ef commit 6365d39

1 file changed

Lines changed: 105 additions & 0 deletions

File tree

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
---
2+
weight: 999
3+
title: "并行合并排序"
4+
description: ""
5+
icon: "article"
6+
date: "2025-05-04T22:58:38+08:00"
7+
lastmod: "2025-05-04T22:58:38+08:00"
8+
draft: false
9+
toc: true
10+
---
11+
12+
## 样例说明
13+
14+
一个 OGraph 的主要使用场景是并行计算加速。以合并排序为例,常见的合并排序算法没有利用多核 CPU 并行计算。
15+
16+
在合并排序的过程,需要对每个子序列进行排序,如果每个子序列的排序可以同时进行,那么就极大的的缩短计算时间。
17+
18+
在这种情况下,OGraph 可以让计算任务并行化,并且不让代码变成一团乱麻。
19+
20+
## 样例代码
21+
22+
以下代码实现了并行化的合并排序,代码中首先定义了一个合并函数用于合并两个已经排序完成的序列。
23+
24+
然后定义了一些排序任务,用于对子序列进行排序,并用合并函数合并到最终结果中。还定义了一个检查任务,用于对最终结果进行检验。
25+
26+
最后新建一个pipeline,注册这些任务并执行。
27+
28+
(为了代码简洁,对子序列的排序直接使用了标准库,而不是像常规合并排序一样递归地使用合并函数。)
29+
30+
```go
31+
package main
32+
33+
import (
34+
"context"
35+
"fmt"
36+
"log"
37+
"math/rand"
38+
"slices"
39+
"sync"
40+
41+
"github.com/symphony09/ograph"
42+
)
43+
44+
func merge(left, right []int) []int {
45+
result := make([]int, 0, len(left)+len(right))
46+
for len(left) > 0 || len(right) > 0 {
47+
if len(left) == 0 {
48+
return append(result, right...)
49+
}
50+
if len(right) == 0 {
51+
return append(result, left...)
52+
}
53+
if left[0] < right[0] {
54+
result = append(result, left[0])
55+
left = left[1:]
56+
} else {
57+
result = append(result, right[0])
58+
right = right[1:]
59+
}
60+
}
61+
return result
62+
}
63+
64+
func main() {
65+
size := 1000
66+
randomInts := make([]int, 0, size)
67+
sortedInts := make([]int, 0, size)
68+
mux := &sync.Mutex{}
69+
70+
for i := 0; i < size; i++ {
71+
randomInts = append(randomInts, rand.Intn(10000))
72+
}
73+
74+
sortTasks := make([]*ograph.Element, 0, 10)
75+
76+
for i := 0; i < 10; i++ {
77+
part := randomInts[i*100 : (i+1)*100]
78+
79+
t := ograph.NewElement(fmt.Sprintf("t%d", i)).UseFn(func() error {
80+
slices.Sort(part)
81+
mux.Lock()
82+
sortedInts = merge(sortedInts, part)
83+
mux.Unlock()
84+
return nil
85+
})
86+
87+
sortTasks = append(sortTasks, t)
88+
}
89+
90+
checkTask := ograph.NewElement("check").UseFn(func() error {
91+
if len(sortedInts) != 1000 || !slices.IsSorted(sortedInts) {
92+
return fmt.Errorf("got wrong sort result")
93+
}
94+
return nil
95+
})
96+
97+
p := ograph.NewPipeline()
98+
err := p.Register(checkTask, ograph.Rely(sortTasks...)).Run(context.Background(), nil)
99+
if err != nil {
100+
log.Fatalf("run merge sort pipeline failed, err: %v\n", err)
101+
}
102+
103+
fmt.Println("result:", sortedInts)
104+
}
105+
```

0 commit comments

Comments
 (0)