Skip to content

[SPARK-55075][K8S] Track executor pod creation errors with ExecutorFailureTracker#53840

Closed
parthchandra wants to merge 4 commits intoapache:masterfrom
parthchandra:k8s-failures
Closed

[SPARK-55075][K8S] Track executor pod creation errors with ExecutorFailureTracker#53840
parthchandra wants to merge 4 commits intoapache:masterfrom
parthchandra:k8s-failures

Conversation

@parthchandra
Copy link
Contributor

@parthchandra parthchandra commented Jan 16, 2026

What changes were proposed in this pull request?

Adds tracking of executor pod creation with the ExecutorFailureTracker.

Why are the changes needed?

If there are unrecoverable pod creation errors then Spark continues to try and create pods instead of failing. An example is where a note book server is constrained to have a maximum number of pods and the user tries to start a notebook with twice the number of executors as the limit. In this case the user gets and 'Unauthorized' message in the logs but Spark will keep on trying to spin up new pods. By tracking pod creation failures we can stop trying after reaching max executor failures.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New unit tests added

Was this patch authored or co-authored using generative AI tooling?

Unit tests generated using Claude Code.

@github-actions
Copy link

JIRA Issue Information

=== Improvement SPARK-55075 ===
Summary: Fail application after too many executor pod creation errors
Assignee: None
Status: Open
Affected: ["4.1.1"]


This comment was automatically generated by GitHub Actions

@parthchandra
Copy link
Contributor Author

@dongjoon-hyun, please take a look.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @parthchandra !

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to clarify your target error case examples in the PR description more clearly because ExecutorPodsAllocator has a spark.kubernetes.allocation.maxPendingPods management already.

If there are unrecoverable pod creation errors then Spark continues to try and create pods instead of failing.

@pan3793
Copy link
Member

pan3793 commented Jan 19, 2026

@parthchandra, do you think if we can merge this case into spark.executor.maxNumFailures? I think that fail to create the executor pod is also a kind of executor failure

@parthchandra
Copy link
Contributor Author

ConfigBuilder("spark.kubernetes.allocation.pod.creation.retries")

@parthchandra
Copy link
Contributor Author

@dongjoon-hyun @pan3793 let me get some more information from the customer that reported the issue about whether spark.kubernetes.allocation.maxPendingPods and spark.executor.maxNumFailures help with the issue.

@dongjoon-hyun
Copy link
Member

Got it. Thank you, @parthchandra .

@parthchandra
Copy link
Contributor Author

Sorry for taking so long to get back on this -
Here's an example of the use case we are talking about. In this case we have a notebook server configured by an administrator for a maximum of 16 pods. The user requests a notebook with a Spark configuration with 32 executors. Because this exceeds the the 'quota' we get pod creation failures and Spark keeps trying to request pods.
Setting spark.kubernetes.allocation.maxPendingPods had no effect.

The Splunk log for this has the following repeated for every attempt -

timestamp="2026-02-05T00:54:20,029+0000",level="WARN",threadName="kubernetes-executor-snapshots-subscribers-0",appName="spark-driver",logger="org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl",jobName="notebooks-spark-job",sourceId="91xvti54j6iq",jobInstanceId="1770251444978-uiqn8uubzrtjq1t1f38rkpk00m33z",organizationName="Default",instanceName="SparkDriver",version="04106e4c-0ef4-4db4-addb-2de65a0d7c17",attemptId="1",message="Exception when notifying snapshot subscriber.",exception="io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://*************** Message: Unauthorized. Received status: Status(apiVersion=v1, code=401, details=null, kind=Status, message=Unauthorized, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Unauthorized, status=Failure, additionalProperties={}).
	at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:507)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleCreate(OperationSupport.java:340)
	at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:754)
	at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:98)
	at io.fabric8.kubernetes.client.dsl.internal.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:42)
	at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.create(BaseOperation.java:1155)
	at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.create(BaseOperation.java:98)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:440)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:417)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36(ExecutorPodsAllocator.scala:370)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36$adapted(ExecutorPodsAllocator.scala:363)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:363)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:134)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:134)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:143)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:131)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:85)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://192.168.0.1:443/************. Message: Unauthorized. Received status: Status(apiVersion=v1, code=401, details=null, kind=Status, message=Unauthorized, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Unauthorized, status=Failure, additionalProperties={}).
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:660)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:640)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:589)
	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:549)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:142)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at io.fabric8.kubernetes.client.http.ByteArrayBodyHandler.onBodyDone(ByteArrayBodyHandler.java:51)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$OkHttpAsyncBody.doConsume(OkHttpClientImpl.java:136)
	... 3 more"

timestamp="2026-02-05T00:54:20,010+0000",level="INFO",threadName="kubernetes-executor-snapshots-subscribers-0",appName="spark-driver",logger="org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator",jobName="notebooks-spark-job",sourceId="91xvti54j6iq",jobInstanceId="1770251444978-uiqn8uubzrtjq1t1f38rkpk00m33z",organizationName="Default",instanceName="SparkDriver",version="04106e4c-0ef4-4db4-addb-2de65a0d7c17",attemptId="1",message="Cannot list PVC resources. Please check account permissions."

timestamp="2026-02-05T00:54:19,996+0000",level="INFO",threadName="kubernetes-executor-snapshots-subscribers-0",appName="spark-driver",logger="org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator",jobName="notebooks-spark-job",sourceId="91xvti54j6iq",jobInstanceId="1770251444978-uiqn8uubzrtjq1t1f38rkpk00m33z",organizationName="Default",instanceName="SparkDriver",version="04106e4c-0ef4-4db4-addb-2de65a0d7c17",attemptId="1",message="Going to request 30 executors from Kubernetes for ResourceProfile Id: 0, target: 32, known: 0, sharedSlotFromPendingPods: 64."

timestamp="2026-02-05T00:54:19,996+0000",level="DEBUG",threadName="kubernetes-executor-snapshots-subscribers-0",appName="spark-driver",logger="org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator",jobName="notebooks-spark-job",sourceId="91xvti54j6iq",jobInstanceId="1770251444978-uiqn8uubzrtjq1t1f38rkpk00m33z",organizationName="Default",instanceName="SparkDriver",version="04106e4c-0ef4-4db4-addb-2de65a0d7c17",attemptId="1",message="ResourceProfile Id: 0 (pod allocation status: 0 running, 0 unknown pending, 0 scheduler backend known pending, 0 unknown newly created, 0 scheduler backend known newly created)"

@parthchandra
Copy link
Contributor Author

@parthchandra, do you think if we can merge this case into spark.executor.maxNumFailures? I think that fail to create the executor pod is also a kind of executor failure

I've added a new api in the ExecutorPodsLifecycleManager to call failureTracker.registerExecutorFailure and pass the ExecutorPodsLifecycleManager to the ExecutorPodsAllocator. This should now get propagated as an executor failure .
This required a bunch of places to be changed since I had to add an additional parameter to the ExecutorPodsAllocator constructor. There are no functional changes in the test suites which had to change as a result.

@pan3793
Copy link
Member

pan3793 commented Feb 6, 2026

I've added a new api in the ExecutorPodsLifecycleManager to call failureTracker.registerExecutorFailure and pass the ExecutorPodsLifecycleManager to the ExecutorPodsAllocator. This should now get propagated as an executor failure.

@parthchandra, yeah, I think this is sufficient to fix your problem.

Adds a retry for executor pod creation ...

this does not help in your case, more generally, for permanent error. I would rather not add such logic, because:

  • ExecutorPodsAllocator will continue to request new pods as long as the pod number does not reach the requested number, so a few transient pod creation errors do not matter.

  • I think ExecutorFailureTracker is designed to capture all kinds of executor failures, e.g.

    • executor (pod on K8s, container on YARN) launch failures,
    • executor bootstrap failures, e.g., due to wrong setup of env, network, or config
    • executor running failures, e.g., due to OOM.
    • etc.

    without pod creation retry logic,

    1. for permanent errors (your case), it fails fast
    2. for rare transient errors, it won't reach spark.executor.maxNumFailures
    3. for frequently transient errors, it usually indicates that your cluster is overloaded or some services are unstable, in that case, user should either increase the spark.executor.maxNumFailures, or let the app fail to expose those potential issues.

If you really like to have separate configurations for pod creation error, maybe you can enhance the ExecutorFailureTracker to accept kind on registerExecutorFailure?

@parthchandra
Copy link
Contributor Author

@pan3793 I think what you say makes sense. Now that we've tied the failure tracker in, the retry logic is somewhat redundant. Removing it will also reduce the number of configuration options making it easier to use.

@parthchandra
Copy link
Contributor Author

updated to remove the retry logic

@parthchandra
Copy link
Contributor Author

@dongjoon-hyun @pan3793 ptal.
Also, the ci failure is in the linter check which cannot find python. Re-running has made no difference. Please suggest what I can do here.

@pan3793
Copy link
Member

pan3793 commented Feb 10, 2026

there were some infra changes these days, rebasing your patch to the latest master branch might help

Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parthchandra, thanks for update, overall LGTM, only small nits.

would like @dongjoon-hyun and @attilapiros(who modified ExecutorFailureTracker last time) have a look

BTW, please update the PR title and description to reflect the final state


test("Pod creation failures are tracked immediately without retries") {
// Make all pod creation attempts fail
when(podResource.create()).thenThrow(new KubernetesClientException("Simulated failure"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: make the error message more accurate

Suggested change
when(podResource.create()).thenThrow(new KubernetesClientException("Simulated failure"))
when(podResource.create()).thenThrow(new KubernetesClientException("Simulated pod creation failure"))

k8sConf.resourceProfileId.toInt), Seq.empty)
}

test("Pod creation failures are tracked immediately without retries") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed before, the retry mechanism does not help here, we don't need to mention that, the case name might be

Suggested change
test("Pod creation failures are tracked immediately without retries") {
test("Pod creation failures are tracked by ExecutorFailureTracker") {

log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
None
}
if (optCreatedExecutorPod.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for cases without an else branch, you can write it in

optCreatedExecutorPod.foreach { createdExecutorPod =>
  ...
}

@parthchandra parthchandra changed the title [SPARK-55075][K8S] Fail application after too many executor pod creation errors [SPARK-55075][K8S] Track executor pod creation errors with ExecutorFailureTracker Feb 12, 2026
@parthchandra
Copy link
Contributor Author

@pan3793 addressed your comments.
@dongjoon-hyun, @attilapiros, TIA for taking a look.

@dongjoon-hyun
Copy link
Member

Ack, @pan3793 and @parthchandra . Will take a look Today.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this seems to introduce a breaking change to the exposed developer API since 3.3.0, @parthchandra .

Image

Please note that we documented like "a full class name of a class implementing AbstractPodsAllocator" which has been unchanged for last 5 years. I believe we can change at Apache Spark 5.0.0 in 2027 (if we needed this).

Image

Please introduce a new way to hand over ExecutorPodsLifecycleManager instead of modifying the existing StatefulSetPodsAllocator or DeploymentPodsAllocator. If we need to change them, it means the user-implemented classes are broken already.

@pan3793
Copy link
Member

pan3793 commented Feb 13, 2026

To avoid touching the constructor of *PodsAllocators, how about adding a new method in AbstractPodsAllocator? it will be called after the constructor immediately

@DeveloperApi
abstract class AbstractPodsAllocator {
  ...
  def setExecutorPodsLifecycleManager(lifecycleManager: ExecutorPodsLifecycleManager): Unit = {}
  ...
}

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 13, 2026

+1 for @pan3793 's direction.

BTW, it should be invoked only for the new classes which supports it. The exist class might be compiled on the old AbstractPodsAllocator. So, there are two ways.

  1. It could be a new interface.
  2. Or, we can use Java reflection to check the existence of the class.

@parthchandra
Copy link
Contributor Author

@dongjoon-hyun Thank you for catching this! Let me go with the direction from @pan3793 and also ensure that it is backward compatible.

@parthchandra
Copy link
Contributor Author

@dongjoon-hyun @pan3793 Would you be able to look at this one more time? Thanks

None
}
optCreatedExecutorPod.foreach { createdExecutorPod =>
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why an error during pod creation handled differently from an error coming during adding the owner reference and creating PVC?

In both case the nonfatal error ends in deleting the pod so why the 2nd case does not tracked as a failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors from owner reference and PVC creation are handled differently it appears (since they throw an exception). I've added them to the tracking by the lifecycle manager but also retained the current behaviour of throwing an exception for these cases.
Would you prefer we stop throwing an exception and have everything handled by the lifecycle manager?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I have missed that line. Could you please run a manual test and check what happens with the exception which thrown here (by throwing an exception here directly and running one of the integration test)?
I am afraid it can go way up to CoarseGrainedSchedulerBackend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exception being thrown is fine. It. is propagated up through requestNewExecutors() → onNewSnapshots(). The exception is caught at ExecutorPodsSnapshotStoreImpl.processSnapshotsInternal and logged as a warning.
The test test("SPARK-41410: An exception during PVC creation should not increase PVC counter") is testing explicitly for an exception to be thrown in this case. I modified the code to throw an Exception and the test passed (as long as the exception is a KubernetesClientException).

@dongjoon-hyun
Copy link
Member

Ack, @parthchandra . I'll take a look again today. Thank you!

@dongjoon-hyun dongjoon-hyun dismissed their stale review February 18, 2026 16:56

Stale review.

k8sConf.resourceProfileId.toInt), Seq.empty)
}

test("Pod creation failures are tracked by ExecutorFailureTracker") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

k8sConf.resourceProfileId.toInt), Seq.empty)
}

test("Pod creation failures are tracked by ExecutorFailureTracker") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, it would be great to add a JIRA ID.

- test("Pod creation failures are tracked by ExecutorFailureTracker") {
+ test("SPARK-55075: Pod creation failures are tracked by ExecutorFailureTracker") {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sc: SparkContext,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
lifecycleManager: ExecutorPodsLifecycleManager) = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use the following style to minimize the change. For example, KubernetesClusterManagerSuite?

-      lifecycleManager: ExecutorPodsLifecycleManager) = {
+      lifecycleManager: Option[ExecutorPodsLifecycleManager] = None) = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Done.

@dongjoon-hyun
Copy link
Member

Than you for updating, @parthchandra .

* Optional lifecycle manager for tracking executor pod lifecycle events.
* Set via setExecutorPodsLifecycleManager for backward compatibility.
*/
protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the comment, Optional lifecycle manager, we had better follow the Scala style.

- protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _
+ protected var executorPodsLifecycleManager: Option[ExecutorPodsLifecycleManager] = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* This method is optional and may not exist in custom implementations based on older versions.
*/
def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = {
executorPodsLifecycleManager = manager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe,

- executorPodsLifecycleManager = manager
+ executorPodsLifecycleManager = Some(manager)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val failureCount = totalFailedPodCreations.incrementAndGet()
if (executorPodsLifecycleManager != null) {
executorPodsLifecycleManager.registerPodCreationFailure()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of null check, Scala prefers the following.

- if (executorPodsLifecycleManager != null) {
-   executorPodsLifecycleManager.registerPodCreationFailure()
- }
+ executorPodsLifecycleManager.foreach(_.registerPodCreationFailure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

executorPodsLifecycleManager.registerPodCreationFailure()
}
logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 473-479 seems to repeated twice here and 506-513. Could you make a method to remove the code duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} catch {
case _: NoSuchMethodException =>
logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " +
"Pod creation failures will not be tracked.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For K8s Deployment and StatefulSet, the following will be correct. Could you revise the message?

- logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " +
-   "Pod creation failures will not be tracked.")
+ logInfo("No need to track pod creation failure because allocator does not require it.")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method setExecutorPodsLifecycleManager is added in AbstractPodsAllocator, not the derived class. Is reflection really required here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the previous comment about needing to use reflection to maintain backwards compatibility. But you're right reflection is not needed. Removed.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 19, 2026

To @parthchandra , from my side, the PR looks almost ready. I left a few more comments. Have a safe travel, 🛬 !

@dongjoon-hyun dongjoon-hyun self-assigned this Feb 19, 2026
Copy link
Contributor Author

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also rebased on latest

None
}
optCreatedExecutorPod.foreach { createdExecutorPod =>
try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exception being thrown is fine. It. is propagated up through requestNewExecutors() → onNewSnapshots(). The exception is caught at ExecutorPodsSnapshotStoreImpl.processSnapshotsInternal and logged as a warning.
The test test("SPARK-41410: An exception during PVC creation should not increase PVC counter") is testing explicitly for an exception to be thrown in this case. I modified the code to throw an Exception and the test passed (as long as the exception is a KubernetesClientException).

* Optional lifecycle manager for tracking executor pod lifecycle events.
* Set via setExecutorPodsLifecycleManager for backward compatibility.
*/
protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* This method is optional and may not exist in custom implementations based on older versions.
*/
def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = {
executorPodsLifecycleManager = manager
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val failureCount = totalFailedPodCreations.incrementAndGet()
if (executorPodsLifecycleManager != null) {
executorPodsLifecycleManager.registerPodCreationFailure()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

executorPodsLifecycleManager.registerPodCreationFailure()
}
logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} catch {
case _: NoSuchMethodException =>
logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " +
"Pod creation failures will not be tracked.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the previous comment about needing to use reflection to maintain backwards compatibility. But you're right reflection is not needed. Removed.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs). Thank you, @parthchandra.

Let's wait for a few days more to make it sure that we addressed all other reviewers' comments.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@dongjoon-hyun
Copy link
Member

Thank you all! Merged to master for Apache Spark 4.2.0.

@parthchandra
Copy link
Contributor Author

Thank you @dongjoon-hyun, @pan3793, @attilapiros !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants