[SPARK-55075][K8S] Track executor pod creation errors with ExecutorFailureTracker#53840
[SPARK-55075][K8S] Track executor pod creation errors with ExecutorFailureTracker#53840parthchandra wants to merge 4 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== Improvement SPARK-55075 === This comment was automatically generated by GitHub Actions |
|
@dongjoon-hyun, please take a look. |
|
Thank you for pinging me, @parthchandra ! |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
It would be great to clarify your target error case examples in the PR description more clearly because ExecutorPodsAllocator has a spark.kubernetes.allocation.maxPendingPods management already.
If there are unrecoverable pod creation errors then Spark continues to try and create pods instead of failing.
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
Outdated
Show resolved
Hide resolved
|
@parthchandra, do you think if we can merge this case into |
|
|
@dongjoon-hyun @pan3793 let me get some more information from the customer that reported the issue about whether |
|
Got it. Thank you, @parthchandra . |
|
Sorry for taking so long to get back on this - The Splunk log for this has the following repeated for every attempt - |
I've added a new api in the |
@parthchandra, yeah, I think this is sufficient to fix your problem.
this does not help in your case, more generally, for permanent error. I would rather not add such logic, because:
If you really like to have separate configurations for pod creation error, maybe you can enhance the |
|
@pan3793 I think what you say makes sense. Now that we've tied the failure tracker in, the retry logic is somewhat redundant. Removing it will also reduce the number of configuration options making it easier to use. |
|
updated to remove the retry logic |
...netes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
Outdated
Show resolved
Hide resolved
|
@dongjoon-hyun @pan3793 ptal. |
|
there were some infra changes these days, rebasing your patch to the latest master branch might help |
8005dba to
8f9944c
Compare
There was a problem hiding this comment.
@parthchandra, thanks for update, overall LGTM, only small nits.
would like @dongjoon-hyun and @attilapiros(who modified ExecutorFailureTracker last time) have a look
BTW, please update the PR title and description to reflect the final state
|
|
||
| test("Pod creation failures are tracked immediately without retries") { | ||
| // Make all pod creation attempts fail | ||
| when(podResource.create()).thenThrow(new KubernetesClientException("Simulated failure")) |
There was a problem hiding this comment.
small nit: make the error message more accurate
| when(podResource.create()).thenThrow(new KubernetesClientException("Simulated failure")) | |
| when(podResource.create()).thenThrow(new KubernetesClientException("Simulated pod creation failure")) |
| k8sConf.resourceProfileId.toInt), Seq.empty) | ||
| } | ||
|
|
||
| test("Pod creation failures are tracked immediately without retries") { |
There was a problem hiding this comment.
as we discussed before, the retry mechanism does not help here, we don't need to mention that, the case name might be
| test("Pod creation failures are tracked immediately without retries") { | |
| test("Pod creation failures are tracked by ExecutorFailureTracker") { |
| log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) | ||
| None | ||
| } | ||
| if (optCreatedExecutorPod.nonEmpty) { |
There was a problem hiding this comment.
for cases without an else branch, you can write it in
optCreatedExecutorPod.foreach { createdExecutorPod =>
...
}|
@pan3793 addressed your comments. |
|
Ack, @pan3793 and @parthchandra . Will take a look Today. |
There was a problem hiding this comment.
Unfortunately, this seems to introduce a breaking change to the exposed developer API since 3.3.0, @parthchandra .
Please note that we documented like "a full class name of a class implementing AbstractPodsAllocator" which has been unchanged for last 5 years. I believe we can change at Apache Spark 5.0.0 in 2027 (if we needed this).
Please introduce a new way to hand over ExecutorPodsLifecycleManager instead of modifying the existing StatefulSetPodsAllocator or DeploymentPodsAllocator. If we need to change them, it means the user-implemented classes are broken already.
|
To avoid touching the constructor of @DeveloperApi
abstract class AbstractPodsAllocator {
...
def setExecutorPodsLifecycleManager(lifecycleManager: ExecutorPodsLifecycleManager): Unit = {}
...
} |
|
+1 for @pan3793 's direction. BTW, it should be invoked only for the new classes which supports it. The exist class might be compiled on the old
|
|
@dongjoon-hyun Thank you for catching this! Let me go with the direction from @pan3793 and also ensure that it is backward compatible. |
a4a3b25 to
907cf42
Compare
|
@dongjoon-hyun @pan3793 Would you be able to look at this one more time? Thanks |
| None | ||
| } | ||
| optCreatedExecutorPod.foreach { createdExecutorPod => | ||
| try { |
There was a problem hiding this comment.
Why an error during pod creation handled differently from an error coming during adding the owner reference and creating PVC?
In both case the nonfatal error ends in deleting the pod so why the 2nd case does not tracked as a failure?
There was a problem hiding this comment.
Errors from owner reference and PVC creation are handled differently it appears (since they throw an exception). I've added them to the tracking by the lifecycle manager but also retained the current behaviour of throwing an exception for these cases.
Would you prefer we stop throwing an exception and have everything handled by the lifecycle manager?
There was a problem hiding this comment.
Actually I have missed that line. Could you please run a manual test and check what happens with the exception which thrown here (by throwing an exception here directly and running one of the integration test)?
I am afraid it can go way up to CoarseGrainedSchedulerBackend.
There was a problem hiding this comment.
I think the exception being thrown is fine. It. is propagated up through requestNewExecutors() → onNewSnapshots(). The exception is caught at ExecutorPodsSnapshotStoreImpl.processSnapshotsInternal and logged as a warning.
The test test("SPARK-41410: An exception during PVC creation should not increase PVC counter") is testing explicitly for an exception to be thrown in this case. I modified the code to throw an Exception and the test passed (as long as the exception is a KubernetesClientException).
|
Ack, @parthchandra . I'll take a look again today. Thank you! |
| k8sConf.resourceProfileId.toInt), Seq.empty) | ||
| } | ||
|
|
||
| test("Pod creation failures are tracked by ExecutorFailureTracker") { |
| k8sConf.resourceProfileId.toInt), Seq.empty) | ||
| } | ||
|
|
||
| test("Pod creation failures are tracked by ExecutorFailureTracker") { |
There was a problem hiding this comment.
In addition, it would be great to add a JIRA ID.
- test("Pod creation failures are tracked by ExecutorFailureTracker") {
+ test("SPARK-55075: Pod creation failures are tracked by ExecutorFailureTracker") {| sc: SparkContext, | ||
| kubernetesClient: KubernetesClient, | ||
| snapshotsStore: ExecutorPodsSnapshotsStore, | ||
| lifecycleManager: ExecutorPodsLifecycleManager) = { |
There was a problem hiding this comment.
Shall we use the following style to minimize the change. For example, KubernetesClusterManagerSuite?
- lifecycleManager: ExecutorPodsLifecycleManager) = {
+ lifecycleManager: Option[ExecutorPodsLifecycleManager] = None) = {There was a problem hiding this comment.
Good suggestion. Done.
|
Than you for updating, @parthchandra . |
| * Optional lifecycle manager for tracking executor pod lifecycle events. | ||
| * Set via setExecutorPodsLifecycleManager for backward compatibility. | ||
| */ | ||
| protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _ |
There was a problem hiding this comment.
Like the comment, Optional lifecycle manager, we had better follow the Scala style.
- protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _
+ protected var executorPodsLifecycleManager: Option[ExecutorPodsLifecycleManager] = None| * This method is optional and may not exist in custom implementations based on older versions. | ||
| */ | ||
| def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = { | ||
| executorPodsLifecycleManager = manager |
There was a problem hiding this comment.
Maybe,
- executorPodsLifecycleManager = manager
+ executorPodsLifecycleManager = Some(manager)| val failureCount = totalFailedPodCreations.incrementAndGet() | ||
| if (executorPodsLifecycleManager != null) { | ||
| executorPodsLifecycleManager.registerPodCreationFailure() | ||
| } |
There was a problem hiding this comment.
Instead of null check, Scala prefers the following.
- if (executorPodsLifecycleManager != null) {
- executorPodsLifecycleManager.registerPodCreationFailure()
- }
+ executorPodsLifecycleManager.foreach(_.registerPodCreationFailure)| executorPodsLifecycleManager.registerPodCreationFailure() | ||
| } | ||
| logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + | ||
| log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) |
There was a problem hiding this comment.
Line 473-479 seems to repeated twice here and 506-513. Could you make a method to remove the code duplication?
| } catch { | ||
| case _: NoSuchMethodException => | ||
| logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " + | ||
| "Pod creation failures will not be tracked.") |
There was a problem hiding this comment.
For K8s Deployment and StatefulSet, the following will be correct. Could you revise the message?
- logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " +
- "Pod creation failures will not be tracked.")
+ logInfo("No need to track pod creation failure because allocator does not require it.")There was a problem hiding this comment.
Method setExecutorPodsLifecycleManager is added in AbstractPodsAllocator, not the derived class. Is reflection really required here?
There was a problem hiding this comment.
I was following the previous comment about needing to use reflection to maintain backwards compatibility. But you're right reflection is not needed. Removed.
|
To @parthchandra , from my side, the PR looks almost ready. I left a few more comments. Have a safe travel, 🛬 ! |
...ore/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
Outdated
Show resolved
Hide resolved
fe26091 to
99b2112
Compare
parthchandra
left a comment
There was a problem hiding this comment.
Also rebased on latest
| None | ||
| } | ||
| optCreatedExecutorPod.foreach { createdExecutorPod => | ||
| try { |
There was a problem hiding this comment.
I think the exception being thrown is fine. It. is propagated up through requestNewExecutors() → onNewSnapshots(). The exception is caught at ExecutorPodsSnapshotStoreImpl.processSnapshotsInternal and logged as a warning.
The test test("SPARK-41410: An exception during PVC creation should not increase PVC counter") is testing explicitly for an exception to be thrown in this case. I modified the code to throw an Exception and the test passed (as long as the exception is a KubernetesClientException).
| * Optional lifecycle manager for tracking executor pod lifecycle events. | ||
| * Set via setExecutorPodsLifecycleManager for backward compatibility. | ||
| */ | ||
| protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _ |
| * This method is optional and may not exist in custom implementations based on older versions. | ||
| */ | ||
| def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = { | ||
| executorPodsLifecycleManager = manager |
| val failureCount = totalFailedPodCreations.incrementAndGet() | ||
| if (executorPodsLifecycleManager != null) { | ||
| executorPodsLifecycleManager.registerPodCreationFailure() | ||
| } |
| executorPodsLifecycleManager.registerPodCreationFailure() | ||
| } | ||
| logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + | ||
| log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) |
...ore/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
Outdated
Show resolved
Hide resolved
| } catch { | ||
| case _: NoSuchMethodException => | ||
| logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " + | ||
| "Pod creation failures will not be tracked.") |
There was a problem hiding this comment.
I was following the previous comment about needing to use reflection to maintain backwards compatibility. But you're right reflection is not needed. Removed.
.../core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM (Pending CIs). Thank you, @parthchandra.
Let's wait for a few days more to make it sure that we addressed all other reviewers' comments.
|
Thank you all! Merged to master for Apache Spark 4.2.0. |
|
Thank you @dongjoon-hyun, @pan3793, @attilapiros ! |
What changes were proposed in this pull request?
Adds tracking of executor pod creation with the ExecutorFailureTracker.
Why are the changes needed?
If there are unrecoverable pod creation errors then Spark continues to try and create pods instead of failing. An example is where a note book server is constrained to have a maximum number of pods and the user tries to start a notebook with twice the number of executors as the limit. In this case the user gets and 'Unauthorized' message in the logs but Spark will keep on trying to spin up new pods. By tracking pod creation failures we can stop trying after reaching max executor failures.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit tests added
Was this patch authored or co-authored using generative AI tooling?
Unit tests generated using Claude Code.