Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3009,12 +3009,6 @@
"<errors>"
]
},
"COLUMN_ID_MISMATCH" : {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have in-memory-table testings like in #55376 also where we test connector have implemented various scenarios?

"message" : [
"Column IDs have changed:",
"<errors>"
]
},
"METADATA_COLUMNS_MISMATCH" : {
"message" : [
"Metadata columns have changed:",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

object FieldMetadataUtils {
// Metadata key for the field ID used to track column identity across schema evolution
val FIELD_ID_METADATA_KEY = "FIELD_ID"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.json4s.JsonDSL._
import org.apache.spark.SparkException
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.util.{CollationFactory, QuotingUtils, StringConcat}
import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY}
import org.apache.spark.util.SparkSchemaUtils

Expand Down Expand Up @@ -243,6 +244,43 @@ case class StructField(
metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
}

/**
* Updates the field with an ID for column identity tracking.
*/
def withId(id: String): StructField = {
val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putString(FIELD_ID_METADATA_KEY, id)
.build()
copy(metadata = newMetadata)
}

/**
* Returns the ID of this field, if set.
*/
def id: Option[String] = {
if (metadata.contains(FIELD_ID_METADATA_KEY)) {
Some(metadata.getString(FIELD_ID_METADATA_KEY))
} else {
None
}
}

/**
* Returns a copy of this field with the field ID removed, or this field if no ID is set.
*/
def clearId(): StructField = {
if (metadata.contains(FIELD_ID_METADATA_KEY)) {
val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.remove(FIELD_ID_METADATA_KEY)
.build()
copy(metadata = newMetadata)
} else {
this
}
}

private def getDDLDefault = getDefault()
.orElse(getCurrentDefaultValue())
.map(" DEFAULT " + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.spark.sql.connector.catalog;

import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import javax.annotation.Nullable;

import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.connector.ColumnImpl;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;

/**
* An interface representing a column of a {@link Table}. It defines basic properties of a column,
Expand All @@ -40,11 +44,11 @@
public interface Column {

static Column create(String name, DataType dataType) {
return create(name, dataType, true);
return newBuilder(name, dataType).build();
}

static Column create(String name, DataType dataType, boolean nullable) {
return create(name, dataType, nullable, null, null);
return newBuilder(name, dataType).nullable(nullable).build();
}

static Column create(
Expand All @@ -53,16 +57,11 @@ static Column create(
boolean nullable,
String comment,
String metadataInJSON) {
return new ColumnImpl(
name,
dataType,
nullable,
comment,
/* defaultValue = */ null,
/* generationExpression = */ null,
/* identityColumnSpec = */ null,
metadataInJSON,
/* id = */ null);
return newBuilder(name, dataType)
.nullable(nullable)
.comment(comment)
.metadataInJSON(metadataInJSON)
.build();
}

static Column create(
Expand All @@ -72,88 +71,54 @@ static Column create(
String comment,
ColumnDefaultValue defaultValue,
String metadataInJSON) {
return new ColumnImpl(
name,
dataType,
nullable,
comment,
defaultValue,
/* generationExpression = */ null,
/* identityColumnSpec = */ null,
metadataInJSON,
/* id = */ null);
return newBuilder(name, dataType)
.nullable(nullable)
.comment(comment)
.defaultValue(defaultValue)
.metadataInJSON(metadataInJSON)
.build();
}

/**

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching newly added create methods to a builder. Hasn't been released yet.

* Creates a column with a generation expression in SQL string form.
*
* @since 4.3.0
* @deprecated Use
* {@link #create(String, DataType, boolean, String, GenerationExpression, String)} instead.
*/
@Deprecated
static Column create(
String name,
DataType dataType,
boolean nullable,
String comment,
String generationExpression,
String metadataInJSON) {
GenerationExpression genExpr = generationExpression != null
? new GenerationExpression(generationExpression) : null;
return new ColumnImpl(
name,
dataType,
nullable,
comment,
/* defaultValue = */ null,
genExpr,
/* identityColumnSpec = */ null,
metadataInJSON,
/* id = */ null);
return newBuilder(name, dataType)
.nullable(nullable)
.comment(comment)
.generationExpression(generationExpression)
.metadataInJSON(metadataInJSON)
.build();
}

/**
* Creates a column with a generation expression object.
*
* @since 4.3.0
*/
static Column create(
String name,
DataType dataType,
boolean nullable,
String comment,
GenerationExpression generationExpression,
IdentityColumnSpec identityColumnSpec,
String metadataInJSON) {
return new ColumnImpl(
name,
dataType,
nullable,
comment,
/* defaultValue = */ null,
generationExpression,
/* identityColumnSpec = */ null,
metadataInJSON,
/* id = */ null);
return newBuilder(name, dataType)
.nullable(nullable)
.comment(comment)
.identityColumnSpec(identityColumnSpec)
.metadataInJSON(metadataInJSON)
.build();
}

static Column create(
String name,
DataType dataType,
boolean nullable,
String comment,
IdentityColumnSpec identityColumnSpec,
String metadataInJSON) {
return new ColumnImpl(
name,
dataType,
nullable,
comment,
/* defaultValue = */ null,
/* generationExpression = */ null,
identityColumnSpec,
metadataInJSON,
/* id = */ null);
/**
* Creates a builder for a column.
*
* @param name the name of the column
* @param dataType the data type of the column
* @return a new builder
* @since 4.2.0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional to be cherry-picked into 4.2.

*/
static Builder newBuilder(String name, DataType dataType) {
return new Builder(name, dataType);
}

/**
Expand Down Expand Up @@ -243,12 +208,97 @@ default GenerationExpression columnGenerationExpression() {
* others.
* <p>
* This API covers top-level columns only. Nested struct fields, array elements, and map
* keys/values do not have separate IDs. Connectors that track nested field IDs can encode
* them into the returned top-level Column ID string to detect nested changes, since Spark
* only compares string equality.
* keys/values carry their own IDs in struct field metadata. Spark validates both top-level and
* nested field IDs as part of schema compatibility checks. See {@link StructField#id()}.
*/
@Nullable
default String id() {
return null;
}

/**
* A builder for {@link Column}.
*
* @since 4.2.0
*/
class Builder {
private final String name;
private final DataType dataType;
private boolean nullable = true;
private String comment = null;
private ColumnDefaultValue defaultValue = null;
private GenerationExpression genExpr = null;
private IdentityColumnSpec identityColumnSpec = null;
private String metadataInJSON = null;
private String id = null;

private Builder(String name, DataType dataType) {
this.name = Objects.requireNonNull(name, "name must not be null");
this.dataType = Objects.requireNonNull(dataType, "dataType must not be null");
}

public Builder nullable(boolean nullable) {
this.nullable = nullable;
return this;
}

public Builder comment(String comment) {
this.comment = comment;
return this;
}

public Builder defaultValue(ColumnDefaultValue defaultValue) {
this.defaultValue = defaultValue;
return this;
}

public Builder generationExpression(String sql) {
this.genExpr = sql != null ? new GenerationExpression(sql) : null;
return this;
}

public Builder generationExpression(GenerationExpression generationExpr) {
this.genExpr = generationExpr;
return this;
}

public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
this.identityColumnSpec = identityColumnSpec;
return this;
}

public Builder metadataInJSON(String metadataInJSON) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably call it metadata.

this.metadataInJSON = metadataInJSON;
return this;
}

public Builder id(String id) {
this.id = id;
return this;
}

public Column build() {
validateState();
return new ColumnImpl(
name, dataType, nullable, comment, defaultValue,
genExpr, identityColumnSpec, metadataInJSON, id);
}

private void validateState() {
if (hasConflictingDefinitions()) {
throw new SparkIllegalArgumentException(
"INTERNAL_ERROR",
Map.of("message",
"Column '" + name + "' cannot have more than one definition of: " +
"default value, generation expression, identity column spec"));
}
}

private boolean hasConflictingDefinitions() {
long definitionCount = Stream.of(defaultValue, genExpr, identityColumnSpec)
.filter(Objects::nonNull)
.count();
return definitionCount > 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,6 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper {
// Make sure the table was not dropped and recreated.
ref.info.tableId.foreach(V2TableUtil.validateTableId(ref.name, _, table))

// Detect columns that were dropped and re-added with the same name but a different
// column ID. This catches replacements that preserve the schema but change identity.
val colIdErrors = V2TableUtil.validateColumnIds(
table = table,
originalCapturedCols = ref.info.columns)
if (colIdErrors.nonEmpty) {
throw QueryCompilationErrors.columnIdMismatchAfterAnalysis(ref.name, colIdErrors)
}

// Do not allow schema evolution to pre-analysed dataframes that are later used in
// transactional writes. This is because the entire plans was built based on the original schema
// and any schema change would make the plan structurally invalid. This is inline with the
Expand Down Expand Up @@ -187,7 +178,8 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper {
val dataErrors = V2TableUtil.validateCapturedColumns(
table,
ref.info.columns,
mode = ALLOW_NEW_TOP_LEVEL_FIELDS)
mode = ALLOW_NEW_TOP_LEVEL_FIELDS,
checkFieldIds = false)
if (dataErrors.nonEmpty) {
throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation(
ctx.viewName,
Expand Down
Loading