Skip to content

Commit eed1bd6

Browse files
Merge pull request #5142 from hajnalmt/fix/preempt-queue-order
enhancement(scheduler): honor QueueOrderFn in preempt action
2 parents 4aa23c2 + 1db1000 commit eed1bd6

2 files changed

Lines changed: 47 additions & 10 deletions

File tree

pkg/scheduler/actions/preempt/preempt.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
109109
preemptorsMap := map[api.QueueID]*util.PriorityQueue{}
110110
preemptorTasks := map[api.JobID]*util.PriorityQueue{}
111111

112-
var underRequest []*api.JobInfo
113-
queues := map[api.QueueID]*api.QueueInfo{}
112+
underRequestByQueue := map[api.QueueID][]*api.JobInfo{}
114113

115114
for _, job := range ssn.Jobs {
116115
if job.IsPending() {
@@ -122,12 +121,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
122121
continue
123122
}
124123

125-
if queue, found := ssn.Queues[job.Queue]; !found {
124+
if _, found := ssn.Queues[job.Queue]; !found {
125+
klog.V(3).Infof("Queue <%s> not found for Job <%s/%s>, skip preemption", job.Queue, job.Namespace, job.Name)
126126
continue
127-
} else if _, existed := queues[queue.UID]; !existed {
128-
klog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
129-
queue.Name, job.Namespace, job.Name)
130-
queues[queue.UID] = queue
131127
}
132128

133129
// check job if starving for more resources.
@@ -146,7 +142,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
146142
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
147143
}
148144
preemptorsMap[job.Queue].Push(job)
149-
underRequest = append(underRequest, job)
145+
underRequestByQueue[job.Queue] = append(underRequestByQueue[job.Queue], job)
150146
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
151147
for _, task := range job.TaskStatusIndex[api.Pending] {
152148
if task.SchGated {
@@ -156,9 +152,22 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
156152
}
157153
}
158154

155+
// If plugin defines queue order function, use it to order queues.
156+
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
157+
for queueID := range preemptorsMap {
158+
if queue, found := ssn.Queues[queueID]; found {
159+
queues.Push(queue)
160+
}
161+
}
162+
159163
ph := util.NewPredicateHelper()
160164
// Preemption between Jobs within Queue.
161-
for _, queue := range queues {
165+
for {
166+
if queues.Empty() {
167+
break
168+
}
169+
170+
queue := queues.Pop().(*api.QueueInfo)
162171
for {
163172
preemptors := preemptorsMap[queue.UID]
164173

@@ -226,7 +235,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
226235
}
227236

228237
// Preemption between Task within Job.
229-
for _, job := range underRequest {
238+
for _, job := range underRequestByQueue[queue.UID] {
230239
// Here we need to use a scoped intraJob priority queue instead of overwriting preemptorTasks[job.UID].
231240
// The original preemptorTasks map is populated during job discovery (lines above)
232241
// and consumed by the "Preemption between Jobs within Queue" loop.

pkg/scheduler/actions/preempt/preempt_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"volcano.sh/volcano/pkg/scheduler/api"
3636
"volcano.sh/volcano/pkg/scheduler/conf"
3737
"volcano.sh/volcano/pkg/scheduler/framework"
38+
"volcano.sh/volcano/pkg/scheduler/plugins/capacity"
3839
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
3940
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
4041
"volcano.sh/volcano/pkg/scheduler/plugins/predicates"
@@ -425,6 +426,7 @@ func TestPreempt(t *testing.T) {
425426

426427
func TestTopologyAwarePreempt(t *testing.T) {
427428
plugins := map[string]framework.PluginBuilder{
429+
capacity.PluginName: capacity.New,
428430
conformance.PluginName: conformance.New,
429431
gang.PluginName: gang.New,
430432
priority.PluginName: priority.New,
@@ -663,6 +665,28 @@ func TestTopologyAwarePreempt(t *testing.T) {
663665
ExpectEvictNum: 1,
664666
ExpectEvicted: []string{"c1/preemptee2"},
665667
},
668+
{
669+
Name: "preemption with priority queues",
670+
PodGroups: []*schedulingv1beta1.PodGroup{
671+
util.BuildPodGroupWithPrio("pg3", "c1", "q2", 1, nil, schedulingv1beta1.PodGroupRunning, "high-priority"),
672+
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, nil, schedulingv1beta1.PodGroupRunning, "low-priority"),
673+
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, nil, schedulingv1beta1.PodGroupInqueue, "high-priority"),
674+
},
675+
Pods: []*v1.Pod{
676+
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg3", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
677+
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
678+
util.BuildPodWithPreemptionPolicy("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string), v1.PreemptLowerPriority),
679+
},
680+
Nodes: []*v1.Node{
681+
util.BuildNode("n1", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "2"}}...), make(map[string]string)),
682+
},
683+
Queues: []*schedulingv1beta1.Queue{
684+
util.BuildQueueWithPriorityAndResourcesQuantity("q1", 1, api.BuildResourceList("1", "1G"), api.BuildResourceList("1", "1G")),
685+
util.BuildQueueWithPriorityAndResourcesQuantity("q2", 10, api.BuildResourceList("1", "1G"), api.BuildResourceList("1", "1G")),
686+
},
687+
ExpectEvictNum: 1,
688+
ExpectEvicted: []string{"c1/preemptee1"},
689+
},
666690
}
667691

668692
trueValue := true
@@ -699,6 +723,10 @@ func TestTopologyAwarePreempt(t *testing.T) {
699723
EnabledPreemptable: &trueValue,
700724
EnabledPredicate: &trueValue,
701725
},
726+
{
727+
Name: capacity.PluginName,
728+
EnabledQueueOrder: &trueValue,
729+
},
702730
},
703731
}}
704732

0 commit comments

Comments
 (0)