Skip to content

[AURON #2030] Add Native Scan Support for Apache Hudi Copy-On-Write Tables.#2031

Open
slfan1989 wants to merge 2 commits intoapache:masterfrom
slfan1989:auron-2030
Open

[AURON #2030] Add Native Scan Support for Apache Hudi Copy-On-Write Tables.#2031
slfan1989 wants to merge 2 commits intoapache:masterfrom
slfan1989:auron-2030

Conversation

@slfan1989
Copy link
Contributor

Which issue does this PR close?

Closes #2030

Rationale for this change

This PR adds native scan support for Hudi Copy-On-Write (COW) tables, enabling Auron to accelerate Hudi table reads by converting FileSourceScanExec operations to native Parquet/ORC scan implementations.

What changes are included in this PR?

1. New Module: thirdparty/auron-hudi

  • HudiConvertProvider: Implements AuronConvertProvider SPI to intercept and convert Hudi FileSourceScanExec to native scans

    • Detects Hudi file formats (HoodieParquetFileFormat, HoodieOrcFileFormat)
    • Converts to NativeParquetScanExec or NativeOrcScanExec
    • Handles timestamp fallback logic automatically
  • HudiScanSupport: Core detection and validation logic

    • File format recognition with NewHoodie* format rejection
    • Table type resolution via multi-source metadata fallback:
      • Options → Catalog → .hoodie/hoodie.properties
    • MOR table detection and rejection
    • Time travel query detection (via as.of.instant, as.of.timestamp options)
    • FileIndex class hierarchy verification

2. Configuration

  • Added spark.auron.enable.hudi.scan config option (default: true)
  • Respects existing Parquet/ORC timestamp scanning configurations
  • Runtime Spark version validation (3.0–3.5 only)

3. Build & Integration

  • Maven: New profile hudi-0.15 with enforcer rules

    • Validates hudiEnabled=true property
    • Restricts Spark to 3.0–3.5
    • Pins Hudi version to 0.15.0
  • Build Script: Enhanced auron-build.sh

    • Added --hudi <VERSION> parameter
    • Version compatibility validation
    • Auto-enables hudiEnabled property
  • CI/CD: New workflow .github/workflows/hudi.yml

    • Matrix testing: Spark 3.0–3.5 × JDK 8/17/21 × Scala 2.12
    • Independent Hudi test pipeline

Are there any user-facing changes?

New Configuration Option

// Enable Hudi native scan (enabled by default)
spark.conf.set("spark.auron.enable.hudi.scan", "true")

How was this patch tested?

Add Junit Test.

…rite Tables.

Signed-off-by: slfan1989 <slfan1989@apache.org>
…rite Tables.

Signed-off-by: slfan1989 <slfan1989@apache.org>
@slfan1989
Copy link
Contributor Author

Spark 3.0 and 3.1 don’t support time travel yet, so I’ll revise the unit tests accordingly.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new optional auron-hudi integration module that detects Apache Hudi Copy-On-Write scans and converts eligible FileSourceScanExec nodes into Auron native Parquet/ORC scans, with CI/build plumbing to validate Spark/Hudi compatibility.

Changes:

  • Introduce thirdparty/auron-hudi module with HudiConvertProvider + HudiScanSupport and a new test suite.
  • Wire external convert providers into AuronConverters scan conversion path, guarded by a new ENABLE_HUDI_SCAN SQL conf option.
  • Add build profiles/scripts and a dedicated GitHub Actions workflow to build/test the Hudi module across Spark/JDK matrix.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala Hudi detection + metadata fallbacks (table type/base format/time travel) used to decide native conversion.
thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala SPI provider that converts eligible Hudi scans into native Parquet/ORC scans with timestamp fallback.
thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider Registers the Hudi convert provider via Java ServiceLoader.
thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala Adds unit/integration-style tests for detection, fallback, and provider conversion.
thirdparty/auron-hudi/src/test/resources/log4j2.properties Test logging configuration for the new module.
thirdparty/auron-hudi/pom.xml Module build + Maven enforcer constraints for Spark/Hudi compatibility.
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala Prefers enabled external convert providers for FileSourceScanExec conversion.
spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java Adds ENABLE_HUDI_SCAN configuration option.
pom.xml Adds a hudi-0.15 Maven profile to include the new module and set Hudi properties.
dev/reformat Enables the Hudi profile during formatting checks.
auron-build.sh Adds --hudi option and corresponding Maven profile/property wiring.
.github/workflows/hudi.yml Adds a dedicated CI workflow to build/test the Hudi module across Spark/JDK versions.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +48 to +55
case Some(HudiScanSupport.ParquetFormat) =>
assert(SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get())
// Hudi falls back to Spark when timestamp scanning is disabled.
if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()) {
if (scan.requiredSchema.exists(e =>
NativeConverters.existTimestampType(e.dataType))) {
return exec
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion relies on assert(ENABLE_SCAN_PARQUET.get()) to enforce config, but JVM assertions may be disabled in production, which would allow native conversion even when Parquet scanning is configured off. Replace the assert with an explicit runtime check (and return the original exec when disabled) to ensure config is always respected without throwing/logging warnings.

Copilot uses AI. Check for mistakes.
<rules>
<requireProperty>
<property>hudiEnabled</property>
<regex>true</regex>
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enforcer rule hudiEnabled uses regex true without anchors, so values like true1 would incorrectly pass. Anchor the regex (e.g., ^true$) to ensure only the exact expected value is accepted.

Suggested change
<regex>true</regex>
<regex>^true$</regex>

Copilot uses AI. Check for mistakes.
for celebornver in celeborn-0.5 celeborn-0.6
do
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Phudi,hudi-0.15 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-Phudi,hudi-0.15 enables a hudi profile that doesn't appear to exist in the root pom (only hudi-0.15 is defined). This will at least emit Maven warnings during reformat and may hide real profile issues; consider dropping the nonexistent hudi profile and only enabling -Phudi-0.15 (or just -Phudi-0.15 consistent with auron-build.sh).

Suggested change
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Phudi,hudi-0.15 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Phudi-0.15 -Pflink,flink-1.18 -Piceberg,iceberg-1.9

Copilot uses AI. Check for mistakes.
Comment on lines +115 to +123
private def tableTypeFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
val base = new Path(path)
val fs = base.getFileSystem(hadoopConf)
val propsPath = new Path(base, ".hoodie/hoodie.properties")
if (!fs.exists(propsPath)) {
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableTypeFromMeta/baseFileFormatFromMeta read and parse .hoodie/hoodie.properties via Hadoop FS on every isSupported/fileFormat evaluation when options/catalog don’t contain the needed keys. Since conversion runs during planning, repeated FS opens here can add noticeable latency for queries with many scans; consider caching the resolved table type/base format per base path (and possibly downgrading exception logs to debug to avoid noisy logs on transient FS issues).

Copilot uses AI. Check for mistakes.
Comment on lines +59 to +66
case Some(HudiScanSupport.OrcFormat) =>
assert(SparkAuronConfiguration.ENABLE_SCAN_ORC.get())
// ORC follows the same timestamp fallback rule as Parquet.
if (!SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()) {
if (scan.requiredSchema.exists(e =>
NativeConverters.existTimestampType(e.dataType))) {
return exec
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as the Parquet branch: using assert(ENABLE_SCAN_ORC.get()) as a config gate is unsafe if assertions are disabled, and it forces fallback via exceptions/log warnings when ORC scanning is off. Use an explicit check and return the original Spark plan when disabled.

Copilot uses AI. Check for mistakes.
Comment on lines +25 to +31

class HudiConvertProvider extends AuronConvertProvider with Logging {

override def isEnabled: Boolean = {
val sparkVersion = org.apache.spark.SPARK_VERSION
val major = sparkVersion.takeWhile(_.isDigit)
val supported = major.nonEmpty && major.toInt >= 3 && major.toInt < 4
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark version gating only checks the major version (>=3 && <4), which would also enable this provider on any future Spark 3.x beyond the supported range (the PR scope says 3.0–3.5). Consider parsing major+minor and explicitly restricting to 3.0–3.5 to avoid accidentally enabling on unsupported Spark versions.

Suggested change
class HudiConvertProvider extends AuronConvertProvider with Logging {
override def isEnabled: Boolean = {
val sparkVersion = org.apache.spark.SPARK_VERSION
val major = sparkVersion.takeWhile(_.isDigit)
val supported = major.nonEmpty && major.toInt >= 3 && major.toInt < 4
import scala.util.Try
class HudiConvertProvider extends AuronConvertProvider with Logging {
override def isEnabled: Boolean = {
val sparkVersion = org.apache.spark.SPARK_VERSION
val versionParts = sparkVersion.split("[\\.-]", 3)
val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption)
val maybeMinor =
if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None
val supported = (for {
major <- maybeMajor
minor <- maybeMinor
} yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)

Copilot uses AI. Check for mistakes.
</requireProperty>
<requireProperty>
<property>shortSparkVersion</property>
<regex>3\.0|3\.1|3\.2|3\.3|3\.4|3\.5</regex>
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shortSparkVersion regex is not anchored, so unsupported values like 3.10 would match the 3\.1 alternative and incorrectly pass. Anchor the regex to the full string (e.g., ^(3\.0|3\.1|3\.2|3\.3|3\.4|3\.5)$).

Suggested change
<regex>3\.0|3\.1|3\.2|3\.3|3\.4|3\.5</regex>
<regex>^(3\.0|3\.1|3\.2|3\.3|3\.4|3\.5)$</regex>

Copilot uses AI. Check for mistakes.
</requireProperty>
<requireProperty>
<property>hudiVersion</property>
<regex>0\.15\.0</regex>
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hudiVersion regex is not anchored, so a version like 0.15.0-SNAPSHOT could still match 0\.15\.0 and pass. If the intent is to pin strictly to 0.15.0, anchor the regex (e.g., ^0\.15\.0$).

Suggested change
<regex>0\.15\.0</regex>
<regex>^0\.15\.0$</regex>

Copilot uses AI. Check for mistakes.
Comment on lines +245 to +255
private def normalizePath(rawPath: String): String = {
try {
val uri = new URI(rawPath)
if (uri.getScheme == null) {
rawPath
} else {
uri.getPath
}
} catch {
case _: Throwable => rawPath
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalizePath() drops URI scheme/authority by returning only uri.getPath for schemed paths (e.g., s3a://bucket/path becomes /path). That will cause Hadoop Path/FS resolution to use the wrong filesystem and fail to find .hoodie/hoodie.properties on non-local stores. Preserve the full URI (or use Hadoop Path parsing) instead of stripping to the path component.

Copilot uses AI. Check for mistakes.
-am
-Pscala-${{ matrix.scalaver }}
-Pspark-${{ matrix.sparkver }}
-Phudi-0.15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in the future we will be supporting multiple versions of hudi? Can we model this to be similar to how we have it for our other third party plugins? (check comment above for more info)

Comment on lines +43 to +63
- sparkver: "3.0"
scalaver: "2.12"
javaver: "8"
- sparkver: "3.1"
scalaver: "2.12"
javaver: "8"
- sparkver: "3.2"
scalaver: "2.12"
javaver: "8"
- sparkver: "3.3"
scalaver: "2.12"
javaver: "8"
- sparkver: "3.4"
scalaver: "2.12"
javaver: "17"
- sparkver: "3.5"
scalaver: "2.12"
javaver: "17"
- sparkver: "3.5"
scalaver: "2.12"
javaver: "21"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this like so, similar to the structure we have in flink/paimon related yml files?

      matrix:
        paimon: [ "0.15" ]
        javaver: [ "8", "17" ]
        scalaver: [ "2.12" ]
        module: [ "thirdparty/auron-hudi" ]
        sparkver: [ "spark-3.5" ]

.withDefaultValue(true);

public static final ConfigOption<Boolean> ENABLE_HUDI_SCAN = new SQLConfOption<>(Boolean.class)
.withKey("auron.enable.hudi.scan")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a granularity for COW tables?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Add Native Scan Support for Apache Hudi Copy-On-Write Tables

3 participants