diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml
index e06d0d05549c..b60158ee1e1a 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -1375,6 +1375,8 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
+ $MVN_CMD clean test-compile -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
+ -Pspark-ut -Pdelta -DskipTests -Dmaven.source.skip
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
index 6df44e779d25..137956f5c248 100644
--- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
+++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta
-import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.{Logging, MDC => SparkMDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
@@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat(
tablePath: Option[String] = None,
isCDCRead: Boolean = false)
extends GlutenParquetFileFormat
- with LoggingShims {
+ with Logging {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
SparkSession.getActiveSession.map { session =>
@@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat(
case AlwaysTrue() => Some(AlwaysTrue())
case AlwaysFalse() => Some(AlwaysFalse())
case _ =>
- logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}")
+ logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}")
None
}
}
diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index f609a6130b84..73b9a8fdbaf5 100644
--- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.extension.columnar.transition.{Convention, Transitions}
import org.apache.spark._
-import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.{Logging, MDC => SparkMDC}
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.SparkSession
@@ -63,7 +63,7 @@ import java.util.{Date, UUID}
* values to data files. Specifically L123-126, L132, and L140 where it adds option
* WRITE_PARTITION_COLUMNS
*/
-object GlutenDeltaFileFormatWriter extends LoggingShims {
+object GlutenDeltaFileFormatWriter extends Logging {
/**
* A variable used in tests to check whether the output ordering of the query matches the
@@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val ret = f
val commitMsgs = ret.map(_.commitMsg)
- logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+ logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
- logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
- log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
+ logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
+ log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.")
processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
logInfo(log"Finished processing stats for write job " +
- log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+ log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
// return a set of all the partition paths that were updated during this job
ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty)
} catch {
case cause: Throwable =>
- logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
+ logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
committer.abortJob(job)
throw cause
}
@@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
- logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
+ logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.")
}, finallyBlock = {
dataWriter.close()
})
diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
index a9ae44921351..85cde8a79b51 100644
--- a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
+++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
@@ -17,8 +17,19 @@
package org.apache.spark.sql.delta
object DeltaInsertIntoTableSuiteShims {
- val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+ private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1")
- // Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT.
- val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+ val INSERT_INTO_TMP_VIEW_ERROR_MSG =
+ if (isSpark41) {
+ "[TABLE_OR_VIEW_NOT_FOUND]"
+ } else {
+ "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+ }
+
+ val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG =
+ if (isSpark41) {
+ "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
+ } else {
+ "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+ }
}
diff --git a/pom.xml b/pom.xml
index 676c7252ac1b..e3becb24d173 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1332,8 +1332,8 @@
spark-sql-columnar-shims-spark41
4.1.1
1.10.0
- delta-spark
- 4.0.0
+ delta-spark_4.1
+ 4.1.0
40
1.1.0
2.18.2
diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
new file mode 100644
index 000000000000..9595883c75f9
--- /dev/null
+++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager => Spark41CheckpointFileManager}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path, PathFilter}
+
+import java.io.OutputStream
+
+/**
+ * Binary compatibility shim for Delta 4.0, which was compiled against Spark 4.0's
+ * CheckpointFileManager package before Spark 4.1 moved it under streaming.checkpointing.
+ */
+trait CheckpointFileManager {
+ def createAtomic(
+ path: Path,
+ overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream
+
+ def open(path: Path): FSDataInputStream
+
+ def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+ def list(path: Path): Array[FileStatus] = {
+ list(
+ path,
+ new PathFilter {
+ override def accept(path: Path): Boolean = true
+ })
+ }
+
+ def mkdirs(path: Path): Unit
+
+ def exists(path: Path): Boolean
+
+ def delete(path: Path): Unit
+
+ def isLocal: Boolean
+
+ def createCheckpointDirectory(): Path
+}
+
+object CheckpointFileManager {
+ def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = {
+ new Spark41CheckpointFileManagerAdapter(
+ Spark41CheckpointFileManager.create(path, hadoopConf))
+ }
+
+ abstract class CancellableFSDataOutputStream(outputStream: OutputStream)
+ extends org.apache.hadoop.fs.FSDataOutputStream(
+ outputStream,
+ null.asInstanceOf[FileSystem.Statistics]) {
+ def cancel(): Unit
+ }
+
+ private class Spark41CheckpointFileManagerAdapter(
+ delegate: Spark41CheckpointFileManager)
+ extends CheckpointFileManager {
+ override def createAtomic(
+ path: Path,
+ overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+ new CancellableFSDataOutputStreamAdapter(delegate.createAtomic(path, overwriteIfPossible))
+ }
+
+ override def open(path: Path): FSDataInputStream = delegate.open(path)
+
+ override def list(path: Path, filter: PathFilter): Array[FileStatus] =
+ delegate.list(path, filter)
+
+ override def mkdirs(path: Path): Unit = delegate.mkdirs(path)
+
+ override def exists(path: Path): Boolean = delegate.exists(path)
+
+ override def delete(path: Path): Unit = delegate.delete(path)
+
+ override def isLocal: Boolean = delegate.isLocal
+
+ override def createCheckpointDirectory(): Path = delegate.createCheckpointDirectory()
+ }
+
+ private class CancellableFSDataOutputStreamAdapter(
+ delegate: Spark41CheckpointFileManager.CancellableFSDataOutputStream)
+ extends CancellableFSDataOutputStream(delegate) {
+ override def close(): Unit = delegate.close()
+
+ override def cancel(): Unit = delegate.cancel()
+ }
+}
diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
new file mode 100644
index 000000000000..0101198e360f
--- /dev/null
+++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream}
+
+object MemoryStream {
+ def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
+ RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext)
+ }
+
+ def apply[A: Encoder](
+ numPartitions: Int)(
+ implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
+ RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession)
+ }
+}