diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index e06d0d05549c..b60158ee1e1a 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1375,6 +1375,8 @@ jobs: export JAVA_HOME=/usr/lib/jvm/java-17-openjdk export PATH=$JAVA_HOME/bin:$PATH java -version + $MVN_CMD clean test-compile -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ + -Pspark-ut -Pdelta -DskipTests -Dmaven.source.skip $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala index 6df44e779d25..137956f5c248 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.delta -import org.apache.spark.internal.{LoggingShims, MDC} +import org.apache.spark.internal.{Logging, MDC => SparkMDC} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._ @@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat( tablePath: Option[String] = None, isCDCRead: Boolean = false) extends GlutenParquetFileFormat - with LoggingShims { + with Logging { // Validate either we have all arguments for DV enabled read or none of them. if (hasTablePath) { SparkSession.getActiveSession.map { session => @@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat( case AlwaysTrue() => Some(AlwaysTrue()) case AlwaysFalse() => Some(AlwaysFalse()) case _ => - logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}") + logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}") None } } diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index f609a6130b84..73b9a8fdbaf5 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.extension.columnar.transition.{Convention, Transitions} import org.apache.spark._ -import org.apache.spark.internal.{LoggingShims, MDC} +import org.apache.spark.internal.{Logging, MDC => SparkMDC} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.SparkSession @@ -63,7 +63,7 @@ import java.util.{Date, UUID} * values to data files. Specifically L123-126, L132, and L140 where it adds option * WRITE_PARTITION_COLUMNS */ -object GlutenDeltaFileFormatWriter extends LoggingShims { +object GlutenDeltaFileFormatWriter extends Logging { /** * A variable used in tests to check whether the output ordering of the query matches the @@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val ret = f val commitMsgs = ret.map(_.commitMsg) - logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") + logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.") val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } - logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " + - log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.") + logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " + + log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.") processStats(description.statsTrackers, ret.map(_.summary.stats), duration) logInfo(log"Finished processing stats for write job " + - log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") + log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.") // return a set of all the partition paths that were updated during this job ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) } catch { case cause: Throwable => - logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause) + logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause) committer.abortJob(job) throw cause } @@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { })(catchBlock = { // If there is an error, abort the task dataWriter.abort() - logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.") + logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.") }, finallyBlock = { dataWriter.close() }) diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala index a9ae44921351..85cde8a79b51 100644 --- a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala +++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala @@ -17,8 +17,19 @@ package org.apache.spark.sql.delta object DeltaInsertIntoTableSuiteShims { - val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]" + private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1") - // Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT. - val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = "INVALID_DEFAULT_VALUE.NOT_CONSTANT" + val INSERT_INTO_TMP_VIEW_ERROR_MSG = + if (isSpark41) { + "[TABLE_OR_VIEW_NOT_FOUND]" + } else { + "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]" + } + + val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = + if (isSpark41) { + "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION" + } else { + "INVALID_DEFAULT_VALUE.NOT_CONSTANT" + } } diff --git a/pom.xml b/pom.xml index 676c7252ac1b..e3becb24d173 100644 --- a/pom.xml +++ b/pom.xml @@ -1332,8 +1332,8 @@ spark-sql-columnar-shims-spark41 4.1.1 1.10.0 - delta-spark - 4.0.0 + delta-spark_4.1 + 4.1.0 40 1.1.0 2.18.2 diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala new file mode 100644 index 000000000000..9595883c75f9 --- /dev/null +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager => Spark41CheckpointFileManager} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path, PathFilter} + +import java.io.OutputStream + +/** + * Binary compatibility shim for Delta 4.0, which was compiled against Spark 4.0's + * CheckpointFileManager package before Spark 4.1 moved it under streaming.checkpointing. + */ +trait CheckpointFileManager { + def createAtomic( + path: Path, + overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream + + def open(path: Path): FSDataInputStream + + def list(path: Path, filter: PathFilter): Array[FileStatus] + + def list(path: Path): Array[FileStatus] = { + list( + path, + new PathFilter { + override def accept(path: Path): Boolean = true + }) + } + + def mkdirs(path: Path): Unit + + def exists(path: Path): Boolean + + def delete(path: Path): Unit + + def isLocal: Boolean + + def createCheckpointDirectory(): Path +} + +object CheckpointFileManager { + def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = { + new Spark41CheckpointFileManagerAdapter( + Spark41CheckpointFileManager.create(path, hadoopConf)) + } + + abstract class CancellableFSDataOutputStream(outputStream: OutputStream) + extends org.apache.hadoop.fs.FSDataOutputStream( + outputStream, + null.asInstanceOf[FileSystem.Statistics]) { + def cancel(): Unit + } + + private class Spark41CheckpointFileManagerAdapter( + delegate: Spark41CheckpointFileManager) + extends CheckpointFileManager { + override def createAtomic( + path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new CancellableFSDataOutputStreamAdapter(delegate.createAtomic(path, overwriteIfPossible)) + } + + override def open(path: Path): FSDataInputStream = delegate.open(path) + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = + delegate.list(path, filter) + + override def mkdirs(path: Path): Unit = delegate.mkdirs(path) + + override def exists(path: Path): Boolean = delegate.exists(path) + + override def delete(path: Path): Unit = delegate.delete(path) + + override def isLocal: Boolean = delegate.isLocal + + override def createCheckpointDirectory(): Path = delegate.createCheckpointDirectory() + } + + private class CancellableFSDataOutputStreamAdapter( + delegate: Spark41CheckpointFileManager.CancellableFSDataOutputStream) + extends CancellableFSDataOutputStream(delegate) { + override def close(): Unit = delegate.close() + + override def cancel(): Unit = delegate.cancel() + } +} diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala new file mode 100644 index 000000000000..0101198e360f --- /dev/null +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.{Encoder, SQLContext} +import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream} + +object MemoryStream { + def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = { + RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext) + } + + def apply[A: Encoder]( + numPartitions: Int)( + implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = { + RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession) + } +}