Skip to content

Commit fa39ddf

Browse files
enhancement(scheduler): honor QueueOrderFn in preempt action
Cherry-pick of #5142 to release-1.14. - Replace non-deterministic map iteration of queues with priority queue using ssn.QueueOrderFn - Replace shared underRequest slice with per-queue underRequestByQueue map - Add regression test for multi-queue preemptorTasks overwrite in TestPreempt - Add "preemption with priority queues" test in TestTopologyAwarePreempt - Add capacity plugin to TestTopologyAwarePreempt for queue ordering Agent-Logs-Url: https://github.com/volcano-sh/volcano/sessions/f63fac78-512b-47db-9996-ce7e75c36606 Co-authored-by: JesseStutler <38534065+JesseStutler@users.noreply.github.com>
1 parent 852c6a4 commit fa39ddf

2 files changed

Lines changed: 91 additions & 10 deletions

File tree

pkg/scheduler/actions/preempt/preempt.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
109109
preemptorsMap := map[api.QueueID]*util.PriorityQueue{}
110110
preemptorTasks := map[api.JobID]*util.PriorityQueue{}
111111

112-
var underRequest []*api.JobInfo
113-
queues := map[api.QueueID]*api.QueueInfo{}
112+
underRequestByQueue := map[api.QueueID][]*api.JobInfo{}
114113

115114
for _, job := range ssn.Jobs {
116115
if job.IsPending() {
@@ -122,12 +121,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
122121
continue
123122
}
124123

125-
if queue, found := ssn.Queues[job.Queue]; !found {
124+
if _, found := ssn.Queues[job.Queue]; !found {
125+
klog.V(3).Infof("Queue <%s> not found for Job <%s/%s>, skip preemption", job.Queue, job.Namespace, job.Name)
126126
continue
127-
} else if _, existed := queues[queue.UID]; !existed {
128-
klog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
129-
queue.Name, job.Namespace, job.Name)
130-
queues[queue.UID] = queue
131127
}
132128

133129
// check job if starving for more resources.
@@ -146,7 +142,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
146142
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
147143
}
148144
preemptorsMap[job.Queue].Push(job)
149-
underRequest = append(underRequest, job)
145+
underRequestByQueue[job.Queue] = append(underRequestByQueue[job.Queue], job)
150146
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
151147
for _, task := range job.TaskStatusIndex[api.Pending] {
152148
if task.SchGated {
@@ -156,9 +152,22 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
156152
}
157153
}
158154

155+
// If plugin defines queue order function, use it to order queues.
156+
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
157+
for queueID := range preemptorsMap {
158+
if queue, found := ssn.Queues[queueID]; found {
159+
queues.Push(queue)
160+
}
161+
}
162+
159163
ph := util.NewPredicateHelper()
160164
// Preemption between Jobs within Queue.
161-
for _, queue := range queues {
165+
for {
166+
if queues.Empty() {
167+
break
168+
}
169+
170+
queue := queues.Pop().(*api.QueueInfo)
162171
for {
163172
preemptors := preemptorsMap[queue.UID]
164173

@@ -226,7 +235,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
226235
}
227236

228237
// Preemption between Task within Job.
229-
for _, job := range underRequest {
238+
for _, job := range underRequestByQueue[queue.UID] {
230239
// Here we need to use a scoped intraJob priority queue instead of overwriting preemptorTasks[job.UID].
231240
// The original preemptorTasks map is populated during job discovery (lines above)
232241
// and consumed by the "Preemption between Jobs within Queue" loop.

pkg/scheduler/actions/preempt/preempt_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"volcano.sh/volcano/pkg/scheduler/api"
3636
"volcano.sh/volcano/pkg/scheduler/conf"
3737
"volcano.sh/volcano/pkg/scheduler/framework"
38+
"volcano.sh/volcano/pkg/scheduler/plugins/capacity"
3839
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
3940
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
4041
"volcano.sh/volcano/pkg/scheduler/plugins/predicates"
@@ -267,6 +268,50 @@ func TestPreempt(t *testing.T) {
267268
ExpectEvictNum: 0,
268269
ExpectEvicted: []string{}, // no victims should be reclaimed
269270
},
271+
{
272+
// Regression test for the preemptorTasks overwrite issue in multi-queue preemption.
273+
//
274+
// Instead of:
275+
// intraJobPreemptors := util.NewPriorityQueue(ssn.TaskOrderFn)
276+
// We have used:
277+
// preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
278+
// in the "Preemption between Task within Job" loop, which caused preemptorTasks to be overwritten/drained across queues.
279+
// This test verifies that the preemptorTasks for pg3 (high-priority preemptor in q2) is not overwritten/drained when processing q1, so that pg3 can successfully preempt pg2.
280+
//
281+
// Scenario:
282+
// - q1 has a running non-starving job (pg1) and no preemptor.
283+
// - q2 has a low-priority running victim (pg2) and a high-priority starving
284+
// preemptor job (pg3).
285+
// - underRequest is shared across queues.
286+
//
287+
// Buggy behavior:
288+
// - While processing q1, the intra-job pass overwrites/drains
289+
// preemptorTasks[pg3], so q2 later sees no preemptor and skips eviction.
290+
//
291+
// Why this was flaky:
292+
// - Queue iteration order came from a Go map, so the run usually passed when
293+
// q2 was visited first, but failed when q1 was visited first.
294+
Name: "multi-queue: preemptorTasks must not be overwritten by intra-job preemption of another queue",
295+
PodGroups: []*schedulingv1beta1.PodGroup{
296+
util.BuildPodGroup("pg1", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue),
297+
util.BuildPodGroupWithPrio("pg2", "c1", "q2", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
298+
util.BuildPodGroupWithPrio("pg3", "c1", "q2", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
299+
},
300+
Pods: []*v1.Pod{
301+
util.BuildPod("c1", "q1-runner1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
302+
util.BuildPod("c1", "q2-preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg2", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
303+
util.BuildPod("c1", "q2-preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg3", make(map[string]string), make(map[string]string)),
304+
},
305+
Nodes: []*v1.Node{
306+
util.BuildNode("n1", api.BuildResourceList("2", "2G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
307+
},
308+
Queues: []*schedulingv1beta1.Queue{
309+
util.BuildQueue("q1", 1, nil),
310+
util.BuildQueue("q2", 1, api.BuildResourceList("4", "4G")),
311+
},
312+
ExpectEvicted: []string{"c1/q2-preemptee1"},
313+
ExpectEvictNum: 1,
314+
},
270315
}
271316

272317
trueValue := true
@@ -318,6 +363,7 @@ func TestPreempt(t *testing.T) {
318363

319364
func TestTopologyAwarePreempt(t *testing.T) {
320365
plugins := map[string]framework.PluginBuilder{
366+
capacity.PluginName: capacity.New,
321367
conformance.PluginName: conformance.New,
322368
gang.PluginName: gang.New,
323369
priority.PluginName: priority.New,
@@ -600,6 +646,28 @@ func TestTopologyAwarePreempt(t *testing.T) {
600646
ExpectEvicted: []string{"c1/q2-preemptee1"},
601647
ExpectEvictNum: 1,
602648
},
649+
{
650+
Name: "preemption with priority queues",
651+
PodGroups: []*schedulingv1beta1.PodGroup{
652+
util.BuildPodGroupWithPrio("pg3", "c1", "q2", 1, nil, schedulingv1beta1.PodGroupRunning, "high-priority"),
653+
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, nil, schedulingv1beta1.PodGroupRunning, "low-priority"),
654+
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, nil, schedulingv1beta1.PodGroupInqueue, "high-priority"),
655+
},
656+
Pods: []*v1.Pod{
657+
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg3", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
658+
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
659+
util.BuildPodWithPreemptionPolicy("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string), v1.PreemptLowerPriority),
660+
},
661+
Nodes: []*v1.Node{
662+
util.BuildNode("n1", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "2"}}...), make(map[string]string)),
663+
},
664+
Queues: []*schedulingv1beta1.Queue{
665+
util.BuildQueueWithPriorityAndResourcesQuantity("q1", 1, api.BuildResourceList("1", "1G"), api.BuildResourceList("1", "1G")),
666+
util.BuildQueueWithPriorityAndResourcesQuantity("q2", 10, api.BuildResourceList("1", "1G"), api.BuildResourceList("1", "1G")),
667+
},
668+
ExpectEvictNum: 1,
669+
ExpectEvicted: []string{"c1/preemptee1"},
670+
},
603671
}
604672

605673
trueValue := true
@@ -636,6 +704,10 @@ func TestTopologyAwarePreempt(t *testing.T) {
636704
EnabledPreemptable: &trueValue,
637705
EnabledPredicate: &trueValue,
638706
},
707+
{
708+
Name: capacity.PluginName,
709+
EnabledQueueOrder: &trueValue,
710+
},
639711
},
640712
}}
641713

0 commit comments

Comments
 (0)