[AURON #2030] Add Native Scan Support for Apache Hudi Copy-On-Write Tables.#2031
[AURON #2030] Add Native Scan Support for Apache Hudi Copy-On-Write Tables.#2031slfan1989 wants to merge 2 commits intoapache:masterfrom
Conversation
…rite Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…rite Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
|
Spark 3.0 and 3.1 don’t support time travel yet, so I’ll revise the unit tests accordingly. |
There was a problem hiding this comment.
Pull request overview
Adds a new optional auron-hudi integration module that detects Apache Hudi Copy-On-Write scans and converts eligible FileSourceScanExec nodes into Auron native Parquet/ORC scans, with CI/build plumbing to validate Spark/Hudi compatibility.
Changes:
- Introduce
thirdparty/auron-hudimodule withHudiConvertProvider+HudiScanSupportand a new test suite. - Wire external convert providers into
AuronConvertersscan conversion path, guarded by a newENABLE_HUDI_SCANSQL conf option. - Add build profiles/scripts and a dedicated GitHub Actions workflow to build/test the Hudi module across Spark/JDK matrix.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala | Hudi detection + metadata fallbacks (table type/base format/time travel) used to decide native conversion. |
| thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala | SPI provider that converts eligible Hudi scans into native Parquet/ORC scans with timestamp fallback. |
| thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider | Registers the Hudi convert provider via Java ServiceLoader. |
| thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala | Adds unit/integration-style tests for detection, fallback, and provider conversion. |
| thirdparty/auron-hudi/src/test/resources/log4j2.properties | Test logging configuration for the new module. |
| thirdparty/auron-hudi/pom.xml | Module build + Maven enforcer constraints for Spark/Hudi compatibility. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala | Prefers enabled external convert providers for FileSourceScanExec conversion. |
| spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java | Adds ENABLE_HUDI_SCAN configuration option. |
| pom.xml | Adds a hudi-0.15 Maven profile to include the new module and set Hudi properties. |
| dev/reformat | Enables the Hudi profile during formatting checks. |
| auron-build.sh | Adds --hudi option and corresponding Maven profile/property wiring. |
| .github/workflows/hudi.yml | Adds a dedicated CI workflow to build/test the Hudi module across Spark/JDK versions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case Some(HudiScanSupport.ParquetFormat) => | ||
| assert(SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) | ||
| // Hudi falls back to Spark when timestamp scanning is disabled. | ||
| if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()) { | ||
| if (scan.requiredSchema.exists(e => | ||
| NativeConverters.existTimestampType(e.dataType))) { | ||
| return exec | ||
| } |
There was a problem hiding this comment.
This conversion relies on assert(ENABLE_SCAN_PARQUET.get()) to enforce config, but JVM assertions may be disabled in production, which would allow native conversion even when Parquet scanning is configured off. Replace the assert with an explicit runtime check (and return the original exec when disabled) to ensure config is always respected without throwing/logging warnings.
| <rules> | ||
| <requireProperty> | ||
| <property>hudiEnabled</property> | ||
| <regex>true</regex> |
There was a problem hiding this comment.
The enforcer rule hudiEnabled uses regex true without anchors, so values like true1 would incorrectly pass. Anchor the regex (e.g., ^true$) to ensure only the exact expected value is accepted.
| <regex>true</regex> | |
| <regex>^true$</regex> |
| for celebornver in celeborn-0.5 celeborn-0.6 | ||
| do | ||
| run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9 | ||
| run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Phudi,hudi-0.15 -Pflink,flink-1.18 -Piceberg,iceberg-1.9 |
There was a problem hiding this comment.
-Phudi,hudi-0.15 enables a hudi profile that doesn't appear to exist in the root pom (only hudi-0.15 is defined). This will at least emit Maven warnings during reformat and may hide real profile issues; consider dropping the nonexistent hudi profile and only enabling -Phudi-0.15 (or just -Phudi-0.15 consistent with auron-build.sh).
| run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Phudi,hudi-0.15 -Pflink,flink-1.18 -Piceberg,iceberg-1.9 | |
| run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Phudi-0.15 -Pflink,flink-1.18 -Piceberg,iceberg-1.9 |
| private def tableTypeFromMeta(options: Map[String, String]): Option[String] = { | ||
| val basePath = options.get("path").map(normalizePath) | ||
| basePath.flatMap { path => | ||
| try { | ||
| val hadoopConf = SparkSession.active.sessionState.newHadoopConf() | ||
| val base = new Path(path) | ||
| val fs = base.getFileSystem(hadoopConf) | ||
| val propsPath = new Path(base, ".hoodie/hoodie.properties") | ||
| if (!fs.exists(propsPath)) { |
There was a problem hiding this comment.
tableTypeFromMeta/baseFileFormatFromMeta read and parse .hoodie/hoodie.properties via Hadoop FS on every isSupported/fileFormat evaluation when options/catalog don’t contain the needed keys. Since conversion runs during planning, repeated FS opens here can add noticeable latency for queries with many scans; consider caching the resolved table type/base format per base path (and possibly downgrading exception logs to debug to avoid noisy logs on transient FS issues).
| case Some(HudiScanSupport.OrcFormat) => | ||
| assert(SparkAuronConfiguration.ENABLE_SCAN_ORC.get()) | ||
| // ORC follows the same timestamp fallback rule as Parquet. | ||
| if (!SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()) { | ||
| if (scan.requiredSchema.exists(e => | ||
| NativeConverters.existTimestampType(e.dataType))) { | ||
| return exec | ||
| } |
There was a problem hiding this comment.
Same issue as the Parquet branch: using assert(ENABLE_SCAN_ORC.get()) as a config gate is unsafe if assertions are disabled, and it forces fallback via exceptions/log warnings when ORC scanning is off. Use an explicit check and return the original Spark plan when disabled.
|
|
||
| class HudiConvertProvider extends AuronConvertProvider with Logging { | ||
|
|
||
| override def isEnabled: Boolean = { | ||
| val sparkVersion = org.apache.spark.SPARK_VERSION | ||
| val major = sparkVersion.takeWhile(_.isDigit) | ||
| val supported = major.nonEmpty && major.toInt >= 3 && major.toInt < 4 |
There was a problem hiding this comment.
Spark version gating only checks the major version (>=3 && <4), which would also enable this provider on any future Spark 3.x beyond the supported range (the PR scope says 3.0–3.5). Consider parsing major+minor and explicitly restricting to 3.0–3.5 to avoid accidentally enabling on unsupported Spark versions.
| class HudiConvertProvider extends AuronConvertProvider with Logging { | |
| override def isEnabled: Boolean = { | |
| val sparkVersion = org.apache.spark.SPARK_VERSION | |
| val major = sparkVersion.takeWhile(_.isDigit) | |
| val supported = major.nonEmpty && major.toInt >= 3 && major.toInt < 4 | |
| import scala.util.Try | |
| class HudiConvertProvider extends AuronConvertProvider with Logging { | |
| override def isEnabled: Boolean = { | |
| val sparkVersion = org.apache.spark.SPARK_VERSION | |
| val versionParts = sparkVersion.split("[\\.-]", 3) | |
| val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption) | |
| val maybeMinor = | |
| if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None | |
| val supported = (for { | |
| major <- maybeMajor | |
| minor <- maybeMinor | |
| } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false) |
| </requireProperty> | ||
| <requireProperty> | ||
| <property>shortSparkVersion</property> | ||
| <regex>3\.0|3\.1|3\.2|3\.3|3\.4|3\.5</regex> |
There was a problem hiding this comment.
The shortSparkVersion regex is not anchored, so unsupported values like 3.10 would match the 3\.1 alternative and incorrectly pass. Anchor the regex to the full string (e.g., ^(3\.0|3\.1|3\.2|3\.3|3\.4|3\.5)$).
| <regex>3\.0|3\.1|3\.2|3\.3|3\.4|3\.5</regex> | |
| <regex>^(3\.0|3\.1|3\.2|3\.3|3\.4|3\.5)$</regex> |
| </requireProperty> | ||
| <requireProperty> | ||
| <property>hudiVersion</property> | ||
| <regex>0\.15\.0</regex> |
There was a problem hiding this comment.
The hudiVersion regex is not anchored, so a version like 0.15.0-SNAPSHOT could still match 0\.15\.0 and pass. If the intent is to pin strictly to 0.15.0, anchor the regex (e.g., ^0\.15\.0$).
| <regex>0\.15\.0</regex> | |
| <regex>^0\.15\.0$</regex> |
| private def normalizePath(rawPath: String): String = { | ||
| try { | ||
| val uri = new URI(rawPath) | ||
| if (uri.getScheme == null) { | ||
| rawPath | ||
| } else { | ||
| uri.getPath | ||
| } | ||
| } catch { | ||
| case _: Throwable => rawPath | ||
| } |
There was a problem hiding this comment.
normalizePath() drops URI scheme/authority by returning only uri.getPath for schemed paths (e.g., s3a://bucket/path becomes /path). That will cause Hadoop Path/FS resolution to use the wrong filesystem and fail to find .hoodie/hoodie.properties on non-local stores. Preserve the full URI (or use Hadoop Path parsing) instead of stripping to the path component.
| -am | ||
| -Pscala-${{ matrix.scalaver }} | ||
| -Pspark-${{ matrix.sparkver }} | ||
| -Phudi-0.15 |
There was a problem hiding this comment.
If in the future we will be supporting multiple versions of hudi? Can we model this to be similar to how we have it for our other third party plugins? (check comment above for more info)
| - sparkver: "3.0" | ||
| scalaver: "2.12" | ||
| javaver: "8" | ||
| - sparkver: "3.1" | ||
| scalaver: "2.12" | ||
| javaver: "8" | ||
| - sparkver: "3.2" | ||
| scalaver: "2.12" | ||
| javaver: "8" | ||
| - sparkver: "3.3" | ||
| scalaver: "2.12" | ||
| javaver: "8" | ||
| - sparkver: "3.4" | ||
| scalaver: "2.12" | ||
| javaver: "17" | ||
| - sparkver: "3.5" | ||
| scalaver: "2.12" | ||
| javaver: "17" | ||
| - sparkver: "3.5" | ||
| scalaver: "2.12" | ||
| javaver: "21" |
There was a problem hiding this comment.
Can we simplify this like so, similar to the structure we have in flink/paimon related yml files?
matrix:
paimon: [ "0.15" ]
javaver: [ "8", "17" ]
scalaver: [ "2.12" ]
module: [ "thirdparty/auron-hudi" ]
sparkver: [ "spark-3.5" ]
| .withDefaultValue(true); | ||
|
|
||
| public static final ConfigOption<Boolean> ENABLE_HUDI_SCAN = new SQLConfOption<>(Boolean.class) | ||
| .withKey("auron.enable.hudi.scan") |
There was a problem hiding this comment.
Should there be a granularity for COW tables?
Which issue does this PR close?
Closes #2030
Rationale for this change
This PR adds native scan support for Hudi Copy-On-Write (COW) tables, enabling Auron to accelerate Hudi table reads by converting
FileSourceScanExecoperations to native Parquet/ORC scan implementations.What changes are included in this PR?
1. New Module:
thirdparty/auron-hudiHudiConvertProvider: ImplementsAuronConvertProviderSPI to intercept and convert HudiFileSourceScanExecto native scansHoodieParquetFileFormat,HoodieOrcFileFormat)NativeParquetScanExecorNativeOrcScanExecHudiScanSupport: Core detection and validation logicNewHoodie*format rejection.hoodie/hoodie.propertiesas.of.instant,as.of.timestampoptions)2. Configuration
spark.auron.enable.hudi.scanconfig option (default:true)3. Build & Integration
Maven: New profile
hudi-0.15with enforcer ruleshudiEnabled=truepropertyBuild Script: Enhanced
auron-build.sh--hudi <VERSION>parameterhudiEnabledpropertyCI/CD: New workflow
.github/workflows/hudi.ymlAre there any user-facing changes?
New Configuration Option
How was this patch tested?
Add Junit Test.