[AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables.#2016
[AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables.#2016slfan1989 wants to merge 7 commits intoapache:masterfrom
Conversation
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
f954ce5 to
41e9318
Compare
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
| # Check or format all code, including third-party code, with spark-3.4 | ||
| sparkver=spark-3.5 |
There was a problem hiding this comment.
Should the comment be spark-3.5?
| done | ||
|
|
||
| sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4) | ||
| sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5) |
There was a problem hiding this comment.
This change seems unnecessary.
There was a problem hiding this comment.
Pull request overview
This PR adds native scan support for Apache Iceberg Copy-On-Write (COW) tables to the Auron execution engine, enabling direct reads of Iceberg data files through Auron's native path for improved performance. The implementation follows the established SPI (Service Provider Interface) pattern used by other data source integrations like Paimon, with automatic fallback to Spark's execution path for unsupported scenarios.
Changes:
- Adds IcebergConvertProvider SPI extension to detect and convert Iceberg BatchScanExec nodes to native execution
- Implements validation logic to determine COW table eligibility (no delete files, no metadata columns, supported data types)
- Creates NativeIcebergTableScanExec to execute native Iceberg scans with Parquet/ORC format support
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala | SPI provider that checks version compatibility and delegates to IcebergScanSupport |
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala | Core validation logic to determine native scan eligibility and extract FileScanTask metadata via reflection |
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala | Native execution node that converts Iceberg tasks to FilePartitions and generates protobuf scan plans |
| thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala | Integration tests covering COW tables, projections, partitioning, ORC format, and fallback scenarios |
| thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala | Test base configuration with Auron and Iceberg extensions enabled |
| thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider | SPI registration file for IcebergConvertProvider |
| thirdparty/auron-iceberg/pom.xml | Maven enforcer rules to validate Iceberg version (1.10.1) and Spark version (3.4-4.0) compatibility |
| spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java | Adds ENABLE_ICEBERG_SCAN configuration option |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala | Adds default value handling for shuffle manager configuration |
| spark-extension/pom.xml | Adds arrow-memory-core and arrow-memory-netty dependencies |
| pom.xml | Adds Iceberg version properties and enforcer rules for all Spark version profiles |
| auron-build.sh | Updates Iceberg version support to 1.10.1 and Spark version range to 3.4-4.0 |
| dev/reformat | Updates formatting script to include Iceberg module with version 1.10.1 |
| .github/workflows/iceberg.yml | CI workflow for testing Iceberg integration across Spark 3.4, 3.5, 4.0 with multiple Java versions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| done | ||
|
|
||
| sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4) | ||
| sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5) |
There was a problem hiding this comment.
The sparkvers array should include "spark-3.4" to match the array iteration below. The comment on line 51 and the code on line 52 both reference spark-3.5, but the array only includes spark-3.5 once. It appears spark-3.4 was removed from the array when it should have been spark-3.5 that was removed since it's already handled in the celeborn loop above. This would cause the code to not be formatted for spark-3.4.
| sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5) | |
| sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4) |
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
The PR description claims "Empty table handling" is tested in the suite, but there is no explicit test case for empty tables. While the code in IcebergScanSupport.scala (line 61-62) handles empty partitions, adding an explicit integration test would provide better coverage and documentation of this behavior. Consider adding a test like: test("iceberg native scan is applied for empty COW table") that creates an empty table and verifies NativeIcebergTableScan is used.
| test("iceberg native scan is applied for empty COW table") { | |
| withTable("local.db.t_empty") { | |
| sql(""" | |
| |create table local.db.t_empty (id int, v string) | |
| |using iceberg | |
| |tblproperties ( | |
| | 'format-version' = '2' | |
| |) | |
| |""".stripMargin) | |
| val df = sql("select * from local.db.t_empty") | |
| df.collect() | |
| val plan = df.queryExecution.executedPlan.toString() | |
| assert(plan.contains("NativeIcebergTableScan")) | |
| } | |
| } |
|
|
||
| Some(IcebergPartitionView(fileTasks)) | ||
| } catch { | ||
| case _: ReflectiveOperationException => None |
There was a problem hiding this comment.
Catching all ReflectiveOperationException here silently swallows important debugging information. When reflection fails, it could be due to Iceberg version incompatibility or internal API changes. Consider logging a warning with the exception details to help diagnose issues in production environments, similar to how IcebergConvertProvider logs warnings for unsupported Spark versions.
| case _: ReflectiveOperationException => None | |
| case e: ReflectiveOperationException => | |
| logWarning( | |
| s"Failed to read Iceberg SparkInputPartition via reflection; falling back to None. " + | |
| s"This may indicate an Iceberg or Spark version incompatibility.", | |
| e) | |
| None |
| Seq.empty | ||
| } | ||
| } catch { | ||
| case _: Throwable => Seq.empty |
There was a problem hiding this comment.
The code at line 111 swallows all exceptions from the reflection operation. For better observability, consider logging exceptions similar to the pattern used in NativeRDD.scala where setAccessible failures are logged. This would help diagnose issues when accessing Iceberg's internal partition structure fails.
| case _: Throwable => Seq.empty | |
| case t: Throwable => | |
| logWarning( | |
| s"Failed to plan input partitions via DataSource V2 batch API for " + | |
| s"${exec.getClass.getName}; falling back to reflective methods.", | |
| t) | |
| Seq.empty |
| fi | ||
|
|
||
| # Check or format all code, including third-party code, with spark-3.5 | ||
| # Check or format all code, including third-party code, with spark-3.4 |
There was a problem hiding this comment.
The comment says "spark-3.4" but the code uses "spark-3.5". Update the comment to accurately reflect the Spark version being used.
| # Check or format all code, including third-party code, with spark-3.4 | |
| # Check or format all code, including third-party code, with spark-3.5 |
Which issue does this PR close?
Closes #2015
Rationale for this change
This PR adds native scan support for Apache Iceberg Copy-On-Write (COW) tables to improve query performance. Currently, Auron lacks direct integration with Iceberg, forcing all Iceberg queries to use Spark's native execution path, missing opportunities for native engine acceleration.
Key Motivations:
What changes are included in this PR?
Core Implementation:
Build & Configuration:
pom.xmlwith Iceberg version management and Maven enforcer rulesauron-build.shto support Iceberg build parametersspark.auron.enable.iceberg.scan(default: true)Supported Features:
Version Support:
Are there any user-facing changes?
No Breaking Changes: Existing functionality remains unchanged. Iceberg support is additive and disabled by default in unsupported scenarios.
How was this patch tested?
Unit & Integration Tests:
Test Environment: