Skip to content

Commit 3df2364

Browse files
author
Sumedh Wale
committed
[SNAP-2231] Limit maximum cores for a job to physical cores on a node
- overrides in SnappyTaskSchedulerImpl to track per executor cores used by a job and cap it to number of physical cores on a node - combined some maps in TaskSchedulerImpl to recover performance due to above and improve further compared to base TaskSchedulerImpl - property "spark.scheduler.limitJobCores=false" can be set to revert to previous behaviour
1 parent 1cf9817 commit 3df2364

4 files changed

Lines changed: 264 additions & 9 deletions

File tree

cluster/src/main/scala/org/apache/spark/scheduler/SnappyTaskSchedulerImpl.scala

Lines changed: 248 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,259 @@
1616
*/
1717
package org.apache.spark.scheduler
1818

19-
import org.apache.spark.SparkContext
20-
import org.apache.spark.sql.SnappyContext
19+
import java.util.concurrent.ConcurrentHashMap
20+
import java.util.concurrent.atomic.AtomicInteger
21+
import java.util.function.{LongFunction, ToLongFunction}
22+
23+
import scala.collection.mutable.ArrayBuffer
24+
25+
import com.koloboke.function.{LongObjPredicate, ObjLongToLongFunction}
26+
import io.snappydata.Property
27+
import io.snappydata.collection.{LongObjectHashMap, ObjectLongHashMap}
28+
29+
import org.apache.spark.scheduler.TaskLocality.TaskLocality
30+
import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext}
31+
import org.apache.spark.{SparkContext, SparkException, TaskNotSerializableException}
2132

2233
private[spark] class SnappyTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {
2334

2435
override def postStartHook(): Unit = {
2536
SnappyContext.initGlobalSparkContext(sc)
2637
super.postStartHook()
2738
}
39+
40+
private type CoresAndAttempts = (ObjectLongHashMap[String], LongObjectHashMap[TaskSetManager])
41+
42+
private val limitJobCores = Property.LimitJobCores.get(sc.conf)
43+
44+
private val (maxExecutorTaskCores, numExecutors) = {
45+
val map = new ConcurrentHashMap[String, Integer](16, 0.7f, 1)
46+
if (limitJobCores) {
47+
for ((executorId, blockId) <- SnappyContext.getAllBlockIds) {
48+
addBlockId(executorId, blockId, map)
49+
}
50+
}
51+
(map, new AtomicInteger(map.size()))
52+
}
53+
private val stageCoresAndAttempts =
54+
LongObjectHashMap.withExpectedSize[CoresAndAttempts](32)
55+
private val taskIdExecutorAndManager =
56+
LongObjectHashMap.withExpectedSize[(String, TaskSetManager)](32)
57+
58+
private val createNewStageMap = new LongFunction[CoresAndAttempts] {
59+
override def apply(stageId: Long): CoresAndAttempts =
60+
ObjectLongHashMap.withExpectedSize[String](8) ->
61+
LongObjectHashMap.withExpectedSize[TaskSetManager](2)
62+
}
63+
private val lookupExecutorCores = new ToLongFunction[String] {
64+
override def applyAsLong(executorId: String): Long = {
65+
maxExecutorTaskCores.get(executorId) match {
66+
case null => Int.MaxValue // no restriction
67+
case c => c.intValue()
68+
}
69+
}
70+
}
71+
private val addCPUsOnTaskFinish = new ObjLongToLongFunction[String] {
72+
override def applyAsLong(execId: String, availableCores: Long): Long =
73+
availableCores + CPUS_PER_TASK
74+
}
75+
76+
private def addBlockId(executorId: String, blockId: BlockAndExecutorId,
77+
map: ConcurrentHashMap[String, Integer]): Boolean = {
78+
if (limitJobCores && blockId.executorCores > 0 && blockId.numProcessors > 0 &&
79+
blockId.numProcessors < blockId.executorCores) {
80+
map.put(executorId, Int.box(blockId.numProcessors)) == null
81+
} else false
82+
}
83+
84+
private[spark] def addBlockId(executorId: String, blockId: BlockAndExecutorId): Unit = {
85+
if (addBlockId(executorId, blockId, maxExecutorTaskCores)) {
86+
numExecutors.incrementAndGet()
87+
}
88+
}
89+
90+
private[spark] def removeBlockId(executorId: String): Unit = {
91+
maxExecutorTaskCores.remove(executorId) match {
92+
case null =>
93+
case _ => numExecutors.decrementAndGet()
94+
}
95+
}
96+
97+
override protected def getTaskSetManagerForSubmit(taskSet: TaskSet): TaskSetManager = {
98+
val manager = createTaskSetManager(taskSet, maxTaskFailures)
99+
val stage = taskSet.stageId
100+
val (stageAvailableCores, stageTaskSets) = stageCoresAndAttempts.computeIfAbsent(
101+
stage, createNewStageMap)
102+
val conflictingTaskSet = !stageTaskSets.forEachWhile(new LongObjPredicate[TaskSetManager] {
103+
override def test(attempt: Long, ts: TaskSetManager): Boolean = {
104+
ts.taskSet == taskSet || ts.isZombie
105+
}
106+
})
107+
if (conflictingTaskSet) {
108+
throw new IllegalStateException(
109+
s"more than one active taskSet for stage $stage: $stageTaskSets")
110+
}
111+
if (stageAvailableCores.size() > 0) stageAvailableCores.clear()
112+
stageTaskSets.justPut(taskSet.stageAttemptId, manager)
113+
manager
114+
}
115+
116+
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
117+
logInfo(s"Cancelling stage $stageId")
118+
stageCoresAndAttempts.get(stageId) match {
119+
case null =>
120+
case (_, attempts) => attempts.forEachWhile(new LongObjPredicate[TaskSetManager] {
121+
override def test(attempt: Long, tsm: TaskSetManager): Boolean = {
122+
// There are two possible cases here:
123+
// 1. The task set manager has been created and some tasks have been scheduled.
124+
// In this case, send a kill signal to the executors to kill the task
125+
// and then abort the stage.
126+
// 2. The task set manager has been created but no tasks has been scheduled.
127+
// In this case, simply abort the stage.
128+
tsm.runningTasksSet.foreach { tid =>
129+
val execId = taskIdExecutorAndManager.get(tid)._1
130+
backend.killTask(tid, execId, interruptThread)
131+
}
132+
val msg = s"Stage $stageId cancelled"
133+
tsm.abort(msg)
134+
logInfo(msg)
135+
true
136+
}
137+
})
138+
}
139+
}
140+
141+
override def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
142+
val taskSet = manager.taskSet
143+
stageCoresAndAttempts.get(taskSet.stageId) match {
144+
case null =>
145+
case (_, taskSetsForStage) =>
146+
taskSetsForStage.remove(taskSet.stageAttemptId)
147+
if (taskSetsForStage.size() == 0) {
148+
stageCoresAndAttempts.remove(taskSet.stageId)
149+
}
150+
}
151+
manager.parent.removeSchedulable(manager)
152+
if (isInfoEnabled) {
153+
logInfo(s"Removed TaskSet ${taskSet.id}, whose tasks have all completed, from pool " +
154+
manager.parent.name)
155+
}
156+
}
157+
158+
/**
159+
* Avoid giving all the available cores on a node to a single task. This serves two purposes:
160+
*
161+
* a) Keeps some cores free for any concurrent tasks that may be submitted after
162+
* the first has been scheduled.
163+
*
164+
* b) Since the snappy executors use (2 * physical cores) to aid in more concurrency,
165+
* it helps reduce disk activity for a single task and improves performance for
166+
* disk intensive queries.
167+
*/
168+
override protected def resourceOfferSingleTaskSet(
169+
taskSet: TaskSetManager,
170+
maxLocality: TaskLocality,
171+
shuffledOffers: Seq[WorkerOffer],
172+
availableCpus: Array[Int],
173+
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]): Boolean = {
174+
// reduce the available CPUs for a single taskSet if more than physical cores are exposed
175+
val availableCores = if (numExecutors.get() > 0) {
176+
val coresAndAttempts = stageCoresAndAttempts.get(taskSet.taskSet.stageId)
177+
if (coresAndAttempts ne null) coresAndAttempts._1 else null
178+
} else null
179+
var launchedTask = false
180+
for (i <- shuffledOffers.indices) {
181+
val execId = shuffledOffers(i).executorId
182+
if ((availableCpus(i) >= CPUS_PER_TASK) &&
183+
((availableCores eq null) ||
184+
(availableCores.computeIfAbsent(execId, lookupExecutorCores) >= CPUS_PER_TASK))) {
185+
try {
186+
val host = shuffledOffers(i).host
187+
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
188+
tasks(i) += task
189+
val tid = task.taskId
190+
taskIdExecutorAndManager.justPut(tid, execId -> taskSet)
191+
executorIdToRunningTaskIds(execId).add(tid)
192+
if (availableCores ne null) {
193+
availableCores.addValue(execId, -CPUS_PER_TASK)
194+
}
195+
availableCpus(i) -= CPUS_PER_TASK
196+
assert(availableCpus(i) >= 0)
197+
launchedTask = true
198+
}
199+
} catch {
200+
case _: TaskNotSerializableException =>
201+
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
202+
// Do not offer resources for this task, but don't throw an error to allow other
203+
// task sets to be submitted.
204+
return launchedTask
205+
}
206+
}
207+
}
208+
launchedTask
209+
}
210+
211+
override protected[scheduler] def getTaskSetManager(taskId: Long): Option[TaskSetManager] = {
212+
taskIdExecutorAndManager.get(taskId) match {
213+
case null => None
214+
case (_, manager) => Some(manager)
215+
}
216+
}
217+
218+
override protected def getExecutorAndManager(
219+
taskId: Long): Option[(() => String, TaskSetManager)] = {
220+
taskIdExecutorAndManager.get(taskId) match {
221+
case null => None
222+
case (execId, manager) => Some(() => execId, manager)
223+
}
224+
}
225+
226+
override def error(message: String): Unit = synchronized {
227+
if (stageCoresAndAttempts.size() > 0) {
228+
// Have each task set throw a SparkException with the error
229+
stageCoresAndAttempts.forEachWhile(new LongObjPredicate[CoresAndAttempts] {
230+
override def test(stageId: Long, p: CoresAndAttempts): Boolean = {
231+
p._2.forEachWhile(new LongObjPredicate[TaskSetManager] {
232+
override def test(attempt: Long, manager: TaskSetManager): Boolean = {
233+
try {
234+
manager.abort(message)
235+
} catch {
236+
case e: Exception => logError("Exception in error callback", e)
237+
}
238+
true
239+
}
240+
})
241+
}
242+
})
243+
}
244+
else {
245+
// No task sets are active but we still got an error. Just exit since this
246+
// must mean the error is during registration.
247+
// It might be good to do something smarter here in the future.
248+
throw new SparkException(s"Exiting due to error from cluster scheduler: $message")
249+
}
250+
}
251+
252+
override protected def cleanupTaskState(tid: Long): Unit = {
253+
taskIdExecutorAndManager.remove(tid) match {
254+
case null =>
255+
case (executorId, taskSet) =>
256+
executorIdToRunningTaskIds.get(executorId) match {
257+
case Some(s) => s.remove(tid)
258+
case None =>
259+
}
260+
stageCoresAndAttempts.get(taskSet.taskSet.stageId) match {
261+
case null =>
262+
case (cores, _) => cores.computeIfPresent(executorId, addCPUsOnTaskFinish)
263+
}
264+
}
265+
}
266+
267+
override private[scheduler] def taskSetManagerForAttempt(
268+
stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = {
269+
stageCoresAndAttempts.get(stageId) match {
270+
case null => None
271+
case (_, attempts) => Option(attempts.get(stageAttemptId))
272+
}
273+
}
28274
}

cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc
2020

2121
import org.apache.spark.SparkContext
2222
import org.apache.spark.rpc.{RpcEndpointAddress, RpcEnv}
23-
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskSchedulerImpl}
23+
import org.apache.spark.scheduler._
2424
import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext}
2525

2626
class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl,
@@ -87,18 +87,22 @@ class BlockManagerIdListener(sc: SparkContext)
8787
override def onBlockManagerAdded(
8888
msg: SparkListenerBlockManagerAdded): Unit = synchronized {
8989
val executorId = msg.blockManagerId.executorId
90-
SnappyContext.getBlockIdIfNull(executorId) match {
90+
val blockId = SnappyContext.getBlockIdIfNull(executorId) match {
9191
case None =>
9292
val numCores = sc.schedulerBackend.defaultParallelism()
93-
SnappyContext.addBlockId(executorId, new BlockAndExecutorId(
94-
msg.blockManagerId, numCores, numCores))
95-
case Some(b) => b._blockId = msg.blockManagerId
93+
val bid = new BlockAndExecutorId(msg.blockManagerId, numCores, numCores)
94+
SnappyContext.addBlockId(executorId, bid)
95+
bid
96+
case Some(b) => b._blockId = msg.blockManagerId; b
9697
}
98+
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].addBlockId(executorId, blockId)
9799
}
98100

99101
override def onBlockManagerRemoved(
100102
msg: SparkListenerBlockManagerRemoved): Unit = {
101-
SnappyContext.removeBlockId(msg.blockManagerId.executorId)
103+
val executorId = msg.blockManagerId.executorId
104+
SnappyContext.removeBlockId(executorId)
105+
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].removeBlockId(executorId)
102106
}
103107

104108
override def onExecutorRemoved(msg: SparkListenerExecutorRemoved): Unit =

core/src/main/scala/io/snappydata/Literals.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ object Property extends Enumeration {
225225
"If true then cluster startup will wait for Spark jobserver to be fully initialized " +
226226
"before marking lead as 'RUNNING'. Default is false.", Some(false), prefix = null)
227227

228+
val LimitJobCores = Val(s"${Constant.SPARK_PREFIX}scheduler.limitJobCores",
229+
"If true then cores given to a single job will be limited to physical cores on a host." +
230+
"This allows executors to be configured with more than physical cores to allow for some " +
231+
"concurrency of long jobs with short jobs. Default is true.", Some(true), prefix = null)
232+
228233
val SnappyConnection = Val[String](s"${Constant.PROPERTY_PREFIX}connection",
229234
"Host and client port combination in the form [host:clientPort]. This " +
230235
"is used by smart connector to connect to SnappyData cluster using " +

spark

Submodule spark updated from 9f2322a to 630426d

0 commit comments

Comments
 (0)