Skip to content

Commit 62d4b56

Browse files
authored
Merge pull request #6 from joshjms/add-start-finish-times-in-report
sandbox: Add max concurrency to sandbox manager
2 parents 25be7bd + cc74c23 commit 62d4b56

8 files changed

Lines changed: 183 additions & 68 deletions

File tree

job/job.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,12 @@ func (j *Job) execute(ctx context.Context) (sandbox.Report, error) {
9595
cfg.Stdin = proc.Stdin
9696

9797
containerId := fmt.Sprintf("%s-%d", j.ID, j.step)
98-
s, err := sandbox.GetManager().NewSandbox(containerId, cfg)
99-
defer sandbox.GetManager().DestroySandbox(containerId)
100-
101-
if err != nil {
98+
if err := sandbox.GetManager().NewSandbox(containerId, cfg); err != nil {
10299
return sandbox.Report{}, fmt.Errorf("cannot create sandbox for process %d: %v", j.step, err)
103100
}
101+
defer sandbox.GetManager().DestroySandbox(containerId)
104102

105-
report, err := s.Run(ctx)
103+
report, err := sandbox.GetManager().RunSandbox(ctx, containerId)
106104
if err != nil {
107105
return sandbox.Report{}, fmt.Errorf("error running process %d: %v", j.step, err)
108106
}

job/job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestMain(m *testing.M) {
1717
config.UseDefaults()
1818

1919
job.NewJobPool()
20-
sandbox.NewManager()
20+
sandbox.NewManager(config.MaxConcurrency)
2121

2222
exitCode := m.Run()
2323

sandbox/manager.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sandbox
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67

@@ -13,21 +14,22 @@ type Manager struct {
1314
sandboxes map[string]*Sandbox
1415
allocatedRanges map[string]int
1516

16-
allocator *allocator.Allocator
17+
allocator *allocator.Allocator
18+
maxConcurrency int
1719

18-
mu sync.Mutex
20+
mu sync.Mutex
21+
sem chan struct{}
1922
}
2023

21-
func NewManager() error {
22-
alloc, err := allocator.NewAllocator()
23-
if err != nil {
24-
return err
25-
}
24+
func NewManager(maxConcurrency int) error {
25+
alloc := allocator.NewAllocator()
2626

2727
m = &Manager{
2828
sandboxes: make(map[string]*Sandbox),
2929
allocatedRanges: make(map[string]int),
3030
allocator: alloc,
31+
maxConcurrency: maxConcurrency,
32+
sem: make(chan struct{}, maxConcurrency),
3133
}
3234
return nil
3335
}
@@ -36,17 +38,17 @@ func GetManager() *Manager {
3638
return m
3739
}
3840

39-
func (m *Manager) NewSandbox(id string, cfg *Config) (*Sandbox, error) {
41+
func (m *Manager) NewSandbox(id string, cfg *Config) error {
4042
m.mu.Lock()
4143
defer m.mu.Unlock()
4244

4345
if _, exists := m.sandboxes[id]; exists {
44-
return nil, fmt.Errorf("sandbox with id %q already exists", id)
46+
return fmt.Errorf("sandbox with id %q already exists", id)
4547
}
4648

4749
idx, rng := m.allocator.Allocate()
4850
if idx == -1 {
49-
return nil, fmt.Errorf("no available uid/gid ranges")
51+
return fmt.Errorf("no available uid/gid ranges")
5052
}
5153

5254
cfg.UserNamespace = &UserNamespaceConfig{
@@ -66,7 +68,27 @@ func (m *Manager) NewSandbox(id string, cfg *Config) (*Sandbox, error) {
6668
m.sandboxes[id] = sandbox
6769
m.allocatedRanges[id] = idx
6870

69-
return sandbox, nil
71+
return nil
72+
}
73+
74+
func (m *Manager) RunSandbox(ctx context.Context, id string) (Report, error) {
75+
m.sem <- struct{}{}
76+
defer func() { <-m.sem }()
77+
78+
m.mu.Lock()
79+
sandbox, exists := m.sandboxes[id]
80+
m.mu.Unlock()
81+
82+
if !exists {
83+
return Report{}, fmt.Errorf("sandbox with id %q does not exist", id)
84+
}
85+
86+
report, err := sandbox.Run(ctx)
87+
if err != nil {
88+
return Report{}, fmt.Errorf("error running sandbox %q: %w", id, err)
89+
}
90+
91+
return report, nil
7092
}
7193

7294
func (m *Manager) DestroySandbox(id string) error {

sandbox/report.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"os"
77
"syscall"
8+
"time"
89
)
910

1011
type Status string
@@ -29,6 +30,9 @@ type Report struct {
2930
CPUTime uint64
3031
Memory uint64
3132
WallTime int64
33+
34+
StartAt time.Time
35+
FinishAt time.Time
3236
}
3337

3438
func (r Report) String() string {
@@ -45,7 +49,7 @@ func (r Report) String() string {
4549
return fmt.Sprintf("status: %s\nexit code: %d\nsignal: %d\nstdout: %s\nstderr:%s\ncpu:%d usec\nmemory:%d bytes\n", r.Status, r.ExitCode, r.Signal, stdoutTrim, stderrTrim, r.CPUTime, r.Memory)
4650
}
4751

48-
func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessState, timeLimitExceeded bool) (Report, error) {
52+
func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessState, timeLimitExceeded bool, startAt, finishAt time.Time) (Report, error) {
4953
stdout, err := io.ReadAll(stdoutBuf)
5054
if err != nil {
5155
return Report{}, fmt.Errorf("error reading stdout: %w", err)
@@ -87,5 +91,7 @@ func (s *Sandbox) makeReport(stdoutBuf, stderrBuf io.Reader, state *os.ProcessSt
8791
Stderr: string(stderr),
8892
CPUTime: stats.GetCPU().GetUsageUsec(),
8993
Memory: stats.GetMemory().GetMaxUsage(),
94+
StartAt: startAt,
95+
FinishAt: finishAt,
9096
}, nil
9197
}

sandbox/sandbox.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ func (s *Sandbox) Run(ctx context.Context) (Report, error) {
8080
Init: true,
8181
}
8282

83+
startAt := time.Now()
84+
8385
if err := container.Run(process); err != nil {
8486
return Report{}, fmt.Errorf("error running container: %w", err)
8587
}
@@ -99,7 +101,9 @@ func (s *Sandbox) Run(ctx context.Context) (Report, error) {
99101
state, _ := process.Wait()
100102
processFinished <- struct{}{}
101103

102-
return s.makeReport(&stdoutBuf, &stderrBuf, state, timeLimitExceeded)
104+
finishAt := time.Now()
105+
106+
return s.makeReport(&stdoutBuf, &stderrBuf, state, timeLimitExceeded, startAt, finishAt)
103107
}
104108

105109
func getRlimits(cfg *RlimitConfig) []configs.Rlimit {

sandbox/sandbox_test.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sandbox_test
33
import (
44
"os"
55
"path/filepath"
6+
"sort"
67
"testing"
78

89
"github.com/joshjms/castletown/config"
@@ -14,7 +15,7 @@ func TestMain(m *testing.M) {
1415
sandbox.Init()
1516
config.UseDefaults()
1617

17-
sandbox.NewManager()
18+
sandbox.NewManager(2)
1819

1920
files, err := os.ReadDir("test_files")
2021
require.NoError(nil, err, "failed to read test files directory: %v", err)
@@ -113,7 +114,8 @@ func TestSandboxRusageConsistency(t *testing.T) {
113114
var minCpuUsage, maxCpuUsage uint64
114115

115116
for i := 0; i < 10; i++ {
116-
report := tc.Run(t)
117+
reports := tc.Run(t)
118+
report := reports[0]
117119

118120
if i == 0 {
119121
minCpuUsage = report.CPUTime
@@ -128,3 +130,37 @@ func TestSandboxRusageConsistency(t *testing.T) {
128130

129131
require.Less(t, maxCpuUsage-minCpuUsage, uint64(10000), "cpu usage inconsistent")
130132
}
133+
134+
func TestSandboxConcurrency(t *testing.T) {
135+
expectedStatus := sandbox.STATUS_OK
136+
137+
tc := sandbox.Testcase{
138+
File: "test_files/sleep.cpp",
139+
ExpectedStatus: &expectedStatus,
140+
TimeLimitMs: 3000,
141+
Concurrency: 5,
142+
}
143+
144+
reports := tc.Run(t)
145+
146+
startTimes := make([]int64, len(reports))
147+
finishTimes := make([]int64, len(reports))
148+
149+
for i, report := range reports {
150+
startTimes[i] = report.StartAt.UnixMilli()
151+
finishTimes[i] = report.FinishAt.UnixMilli()
152+
}
153+
154+
sort.Slice(startTimes, func(i, j int) bool {
155+
return startTimes[i] < startTimes[j]
156+
})
157+
sort.Slice(finishTimes, func(i, j int) bool {
158+
return finishTimes[i] < finishTimes[j]
159+
})
160+
161+
for i := 2; i < len(startTimes); i++ {
162+
require.Less(t, finishTimes[i-2], startTimes[i], "semaphore didn't work correctly")
163+
}
164+
165+
tc.Run(t)
166+
}

sandbox/test_files/sleep.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#include <chrono>
2+
#include <iostream>
3+
4+
int main() {
5+
using clock = std::chrono::steady_clock;
6+
using namespace std::chrono;
7+
8+
std::cout << "Spinning for ~2 seconds...\n";
9+
10+
const auto start = clock::now();
11+
const auto target = start + seconds(2);
12+
13+
// Busy loop until we reach target time (100% CPU on one core)
14+
while (clock::now() < target) {
15+
// nothing: pure spin
16+
}
17+
18+
std::cout << "Done.\n";
19+
return 0;
20+
}

0 commit comments

Comments
 (0)