From e3749adfaa34c7f1237162ff480bc9f9ecf5c6fc Mon Sep 17 00:00:00 2001 From: SuchodolskiEdvin Date: Fri, 27 Feb 2026 15:35:40 +0000 Subject: [PATCH 1/5] Support onSchemaChange for incremental tables This change introduces support for the onSchemaChange option in incremental tables for the BigQuery adapter. It adds the incrementalSchemaChangeBody() strategy to handle schema changes. End-to-end tests have been acreated to verify the new functionality. --- cli/api/BUILD | 15 ++ cli/api/dbadapters/execution_sql.ts | 305 +++++++++++++++++++++-- cli/api/dbadapters/execution_sql_test.ts | 126 ++++++++++ 3 files changed, 429 insertions(+), 17 deletions(-) create mode 100644 cli/api/dbadapters/execution_sql_test.ts diff --git a/cli/api/BUILD b/cli/api/BUILD index 352f3ef06..ed2562829 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -4,6 +4,8 @@ load("//tools:node_modules.bzl", "node_modules") package(default_visibility = ["//visibility:public"]) +load("//tools:ts_library.bzl", "ts_library") + ts_library( name = "api", srcs = glob( @@ -55,6 +57,19 @@ node_modules( ], ) +# ts_test_suite( +# name = "tests", +# srcs = ["dbadapters/execution_sql_test.ts"], +# deps = [ +# ":api", +# "//core", +# "//protos:ts", +# "//testing", +# "@npm//@types/chai", +# "@npm//chai", +# ], +# ) + ts_test_suite( name = "tests", srcs = [ diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index f226b9e91..25a44cc93 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -1,3 +1,4 @@ +import * as crypto from "crypto"; import * as semver from "semver"; import { concatenateQueries, Task, Tasks } from "df/cli/api/dbadapters/tasks"; @@ -121,6 +122,232 @@ from (${query}) as insertions`; return this.CompilationSql.resolveTarget(target); } + private createProcedureName(target: dataform.ITarget, uniqueId: string): string { + // Procedure names cannot contain hyphens. + const sanitizedUniqueId = uniqueId.replace(/-/g, "_"); + return this.resolveTarget({ + ...target, + name: `df_osc_${sanitizedUniqueId}` + }); + } + + private safeCallProcedure( + procedureName: string, + emptyTempTableName: string, + dataTempTableName: string + ): string { + return ` +BEGIN + CALL ${procedureName}(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS ${emptyTempTableName}; + DROP TABLE IF EXISTS ${dataTempTableName}; + DROP PROCEDURE IF EXISTS ${procedureName}; + RAISE; +END; +DROP PROCEDURE IF EXISTS ${procedureName};`; + } + + private inferSchemaSql(emptyTempTableName: string, query: string): string { + return ` +-- Infer schema of new query. +CREATE OR REPLACE TABLE ${emptyTempTableName} AS ( + SELECT * FROM (${query}) AS insertions LIMIT 0 +);`; + } + + private compareSchemasSql( + database: string, + schema: string, + targetName: string, + shortEmptyTableName: string + ): string { + return ` +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\` + WHERE table_name = '${targetName}' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\` + WHERE table_name = '${shortEmptyTableName}' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +);`; + } + + private applySchemaChangeStrategySql( + table: dataform.ITable, + qualifiedTargetTableName: string + ): string { + const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; + let sql = ` +-- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`; + + switch (onSchemaChange) { + case dataform.OnSchemaChange.FAIL: + sql += ` +IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t", + columns_added, + columns_removed + ); +END IF; +`; + break; + case dataform.OnSchemaChange.EXTEND: + sql += ` +IF ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t", + columns_removed + ); +END IF; + +FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO + EXECUTE IMMEDIATE FORMAT( + "ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s", + column_info.column_name, + column_info.data_type + ); +END FOR; +`; + break; + case dataform.OnSchemaChange.SYNCHRONIZE: + const uniqueKeys = table.uniqueKey || []; + sql += ` +FOR removed_column_name IN (SELECT * FROM UNNEST(columns_removed)) DO + IF removed_column_name IN UNNEST(${JSON.stringify(uniqueKeys)}) THEN + RAISE USING MESSAGE = FORMAT( + "Cannot drop column %s as it is part of the unique key for table ${qualifiedTargetTableName}", + removed_column_name + ); + ELSE + EXECUTE IMMEDIATE FORMAT( + "ALTER TABLE ${qualifiedTargetTableName} DROP COLUMN IF EXISTS %s", + removed_column_name + ); + END IF; +END FOR; + +FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO + EXECUTE IMMEDIATE FORMAT( + "ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s", + column_info.column_name, + column_info.data_type + ); +END FOR; +`; + break; + } + return sql; + } + + private runFinalDmlSql( + table: dataform.ITable, + qualifiedTargetTableName: string, + dataTempTableName: string + ): string { + let finalDmlSql = "\n-- Run final MERGE/INSERT."; + + // Create temp table for incremental data. + finalDmlSql += ` +CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS ( + SELECT * FROM (${table.incrementalQuery || table.query}) +);`; + + // Generate dynamic column lists from temp_table_columns. + finalDmlSql += ` +DECLARE dataform_columns_list STRING; +SET dataform_columns_list = ( + SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '') + FROM UNNEST(temp_table_columns) +);`; + + // Run final MERGE/INSERT. + if (table.uniqueKey && table.uniqueKey.length > 0) { + const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and "); + finalDmlSql += ` +DECLARE dataform_columns_merge STRING; +SET dataform_columns_merge = ( + SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '') + FROM UNNEST(temp_table_columns) +); + +IF ARRAY_LENGTH(temp_table_columns) > 0 THEN + EXECUTE IMMEDIATE ( + "MERGE \`${qualifiedTargetTableName}\` T " || + "USING \`${dataTempTableName}\` S " || + "ON ${mergeOnClause} " || + "WHEN MATCHED THEN " || + " UPDATE SET " || dataform_columns_merge || " " || + "WHEN NOT MATCHED THEN " || + " INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")" + ); +END IF; +`; + } else { + finalDmlSql += ` +IF ARRAY_LENGTH(temp_table_columns) > 0 THEN + EXECUTE IMMEDIATE ( + "INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " || + "SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`" + ); +END IF; +`; + } + return finalDmlSql; + } + + private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string { + return ` +-- Cleanup temporary tables. +DROP TABLE IF EXISTS ${emptyTempTableName}; +DROP TABLE IF EXISTS ${dataTempTableName}; + `; + } + + private incrementalSchemaChangeBody( + table: dataform.ITable, + qualifiedTargetTableName: string, + emptyTempTableName: string, + dataTempTableName: string, + shortEmptyTableName: string + ): string { + const statements: string[] = [ + this.inferSchemaSql(emptyTempTableName, table.incrementalQuery || table.query), + this.compareSchemasSql( + table.target.database, + table.target.schema, + table.target.name, + shortEmptyTableName + ), + this.applySchemaChangeStrategySql(table, qualifiedTargetTableName), + this.runFinalDmlSql(table, qualifiedTargetTableName, dataTempTableName), + this.cleanupSql(emptyTempTableName, dataTempTableName) + ]; + + return statements.join("\n\n"); + } + public publishTasks( table: dataform.ITable, runConfig: dataform.IRunConfig, @@ -141,23 +368,67 @@ from (${query}) as insertions`; if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) { tasks.add(Task.statement(this.createOrReplace(table))); } else { - tasks.add( - Task.statement( - table.uniqueKey && table.uniqueKey.length > 0 - ? this.mergeInto( - table.target, - tableMetadata?.fields.map(f => f.name), - this.where(table.incrementalQuery || table.query, table.where), - table.uniqueKey, - table.bigquery && table.bigquery.updatePartitionFilter - ) - : this.insertInto( - table.target, - tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), - this.where(table.incrementalQuery || table.query, table.where) - ) - ) - ); + const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; + switch (onSchemaChange) { + case dataform.OnSchemaChange.FAIL: + case dataform.OnSchemaChange.EXTEND: + case dataform.OnSchemaChange.SYNCHRONIZE: + const uniqueId = crypto.randomUUID().replace(/-/g, "_"); + + const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; + const emptyTempTableName = this.resolveTarget({ + ...table.target, + name: shortEmptyTableName + }); + + const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); + const dataTempTableName = this.resolveTarget({ + ...table.target, + name: shortDataTableName + }); + + const procedureName = this.createProcedureName(table.target, uniqueId); + const procedureBody = this.incrementalSchemaChangeBody( + table, + this.resolveTarget(table.target), + emptyTempTableName, + dataTempTableName, + shortEmptyTableName + ); + + const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() +OPTIONS(strict_mode=false) +BEGIN +${procedureBody} +END;`; + const callProcedureSql = this.safeCallProcedure( + procedureName, + emptyTempTableName, + dataTempTableName + ); + tasks.add(Task.statement(createProcedureSql + "\n" + callProcedureSql)); + break; + case dataform.OnSchemaChange.IGNORE: + default: + tasks.add( + Task.statement( + table.uniqueKey && table.uniqueKey.length > 0 + ? this.mergeInto( + table.target, + tableMetadata?.fields.map(f => f.name), + this.where(table.incrementalQuery || table.query, table.where), + table.uniqueKey, + table.bigquery && table.bigquery.updatePartitionFilter + ) + : this.insertInto( + table.target, + tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), + this.where(table.incrementalQuery || table.query, table.where) + ) + ) + ); + break; + } } } else { tasks.add(Task.statement(this.createOrReplace(table))); diff --git a/cli/api/dbadapters/execution_sql_test.ts b/cli/api/dbadapters/execution_sql_test.ts new file mode 100644 index 000000000..ee9e2a1f8 --- /dev/null +++ b/cli/api/dbadapters/execution_sql_test.ts @@ -0,0 +1,126 @@ +import { expect } from "chai"; + +import { ExecutionSql } from "df/cli/api/dbadapters/execution_sql"; +import { dataform } from "df/protos/ts"; +import { suite, test } from "df/testing"; + +suite("ExecutionSql with 'onSchemaChange'", () => { + const executionSql = new ExecutionSql( + { + defaultDatabase: "project-id", + defaultSchema: "dataset-id" + }, + "2.0.0" + ); + + const baseTable: dataform.ITable = { + type: "incremental", + enumType: dataform.TableType.INCREMENTAL, + target: { + database: "project-id", + schema: "dataset-id", + name: "incremental_on_schema_change" + }, + query: "select 1 as id, 'a' as field1", + incrementalQuery: "select 1 as id, 'a' as field1, 'new' as field2" + }; + + const tableMetadata: dataform.ITableMetadata = { + type: dataform.TableMetadata.Type.TABLE, + fields: [ + { + name: "id", + primitive: dataform.Field.Primitive.INTEGER + }, + { + name: "field1", + primitive: dataform.Field.Primitive.STRING + } + ] + }; + + test("generates procedure for FAIL strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.FAIL + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build()[0].statement; + expect(procedureSql).to.match( + /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i + ); + expect(procedureSql).to.include( + `"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t"` + ); + expect(procedureSql).to.match(/call `project-id.dataset-id.df_osc_.*`\(\)/i); + expect(procedureSql).to.include("EXCEPTION WHEN ERROR THEN"); + expect(procedureSql).to.match(/drop procedure if exists `project-id.dataset-id.df_osc_.*`/i); + }); + + test("generates procedure for EXTEND strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.EXTEND + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build()[0].statement; + + expect(procedureSql).to.match( + /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i + ); + expect(procedureSql).to.include( + `"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t"` + ); + expect(procedureSql).to.include("ADD COLUMN IF NOT EXISTS"); + }); + + test("generates procedure for SYNCHRONIZE strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.SYNCHRONIZE, + uniqueKey: ["id"] + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build()[0].statement; + + expect(procedureSql).to.match( + /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i + ); + expect(procedureSql).to.include("ADD COLUMN IF NOT EXISTS"); + expect(procedureSql).to.include("DROP COLUMN IF EXISTS"); + }); + + test("SYNCHRONIZE strategy prevents dropping unique keys", () => { + const tableWithExtraField = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.SYNCHRONIZE, + uniqueKey: ["field_to_be_removed"] + }; + const tasks = executionSql.publishTasks( + tableWithExtraField, + { fullRefresh: false }, + { + ...tableMetadata, + fields: [ + { name: "field_to_be_removed", primitive: dataform.Field.Primitive.STRING } + ] + } + ); + const procedureSql = tasks.build()[0].statement; + expect(procedureSql).to.include( + `"Cannot drop column %s as it is part of the unique key for table` + ); + }); + + test("generates simple merge for IGNORE strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.IGNORE, + uniqueKey: ["id"] + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const mergeSql = tasks.build()[0].statement; + expect(mergeSql).to.include("merge `project-id.dataset-id.incremental_on_schema_change` T"); + expect(mergeSql).to.not.include("create or replace procedure"); + }); +}); From 2205cde9210be4526e2a09fb9e3be27ff0561bbc Mon Sep 17 00:00:00 2001 From: SuchodolskiEdvin Date: Tue, 3 Mar 2026 15:07:17 +0000 Subject: [PATCH 2/5] fix: Private functions moved after public functions --- cli/api/dbadapters/execution_sql.ts | 220 ++++++++++++++-------------- 1 file changed, 110 insertions(+), 110 deletions(-) diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index 25a44cc93..460e1a80e 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -122,7 +122,116 @@ from (${query}) as insertions`; return this.CompilationSql.resolveTarget(target); } - private createProcedureName(target: dataform.ITarget, uniqueId: string): string { + public publishTasks( + table: dataform.ITable, + runConfig: dataform.IRunConfig, + tableMetadata?: dataform.ITableMetadata + ): Tasks { + const tasks = new Tasks(); + + this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement)); + + const baseTableType = this.baseTableType(table.enumType); + if (tableMetadata && tableMetadata.type !== baseTableType) { + tasks.add( + Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType))) + ); + } + + if (table.enumType === dataform.TableType.INCREMENTAL) { + if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) { + tasks.add(Task.statement(this.createOrReplace(table))); + } else { + const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; + switch (onSchemaChange) { + case dataform.OnSchemaChange.FAIL: + case dataform.OnSchemaChange.EXTEND: + case dataform.OnSchemaChange.SYNCHRONIZE: + const uniqueId = crypto.randomUUID().replace(/-/g, "_"); + + const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; + const emptyTempTableName = this.resolveTarget({ + ...table.target, + name: shortEmptyTableName + }); + + const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); + const dataTempTableName = this.resolveTarget({ + ...table.target, + name: shortDataTableName + }); + + const procedureName = this.createProcedureName(table.target, uniqueId); + const procedureBody = this.incrementalSchemaChangeBody( + table, + this.resolveTarget(table.target), + emptyTempTableName, + dataTempTableName, + shortEmptyTableName + ); + + const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() +OPTIONS(strict_mode=false) +BEGIN +${procedureBody} +END;`; + const callProcedureSql = this.safeCallProcedure( + procedureName, + emptyTempTableName, + dataTempTableName + ); + tasks.add(Task.statement(createProcedureSql + "\n" + callProcedureSql)); + break; + case dataform.OnSchemaChange.IGNORE: + default: + tasks.add( + Task.statement( + table.uniqueKey && table.uniqueKey.length > 0 + ? this.mergeInto( + table.target, + tableMetadata?.fields.map(f => f.name), + this.where(table.incrementalQuery || table.query, table.where), + table.uniqueKey, + table.bigquery && table.bigquery.updatePartitionFilter + ) + : this.insertInto( + table.target, + tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), + this.where(table.incrementalQuery || table.query, table.where) + ) + ) + ); + break; + } + } + } else { + tasks.add(Task.statement(this.createOrReplace(table))); + } + + this.postOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement)); + + return tasks.concatenate(); + } + + public assertTasks( + assertion: dataform.IAssertion, + projectConfig: dataform.IProjectConfig, + ): Tasks { + const tasks = new Tasks(); + const target = assertion.target; + // Create the view to check syntax of assertion + tasks.add(Task.statement(this.createOrReplaceView(target, assertion.query))); + + // Add assertion check + tasks.add(Task.assertion(`select sum(1) as row_count from ${this.resolveTarget(target)}`)); + return tasks; + } + + public dropIfExists(target: dataform.ITarget, type: dataform.TableMetadata.Type) { + return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`; + } + + private createProcedureName(target: dataform.ITarget, uniqueId: string): string { // Procedure names cannot contain hyphens. const sanitizedUniqueId = uniqueId.replace(/-/g, "_"); return this.resolveTarget({ @@ -348,115 +457,6 @@ DROP TABLE IF EXISTS ${dataTempTableName}; return statements.join("\n\n"); } - public publishTasks( - table: dataform.ITable, - runConfig: dataform.IRunConfig, - tableMetadata?: dataform.ITableMetadata - ): Tasks { - const tasks = new Tasks(); - - this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement)); - - const baseTableType = this.baseTableType(table.enumType); - if (tableMetadata && tableMetadata.type !== baseTableType) { - tasks.add( - Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType))) - ); - } - - if (table.enumType === dataform.TableType.INCREMENTAL) { - if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) { - tasks.add(Task.statement(this.createOrReplace(table))); - } else { - const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; - switch (onSchemaChange) { - case dataform.OnSchemaChange.FAIL: - case dataform.OnSchemaChange.EXTEND: - case dataform.OnSchemaChange.SYNCHRONIZE: - const uniqueId = crypto.randomUUID().replace(/-/g, "_"); - - const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; - const emptyTempTableName = this.resolveTarget({ - ...table.target, - name: shortEmptyTableName - }); - - const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); - const dataTempTableName = this.resolveTarget({ - ...table.target, - name: shortDataTableName - }); - - const procedureName = this.createProcedureName(table.target, uniqueId); - const procedureBody = this.incrementalSchemaChangeBody( - table, - this.resolveTarget(table.target), - emptyTempTableName, - dataTempTableName, - shortEmptyTableName - ); - - const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() -OPTIONS(strict_mode=false) -BEGIN -${procedureBody} -END;`; - const callProcedureSql = this.safeCallProcedure( - procedureName, - emptyTempTableName, - dataTempTableName - ); - tasks.add(Task.statement(createProcedureSql + "\n" + callProcedureSql)); - break; - case dataform.OnSchemaChange.IGNORE: - default: - tasks.add( - Task.statement( - table.uniqueKey && table.uniqueKey.length > 0 - ? this.mergeInto( - table.target, - tableMetadata?.fields.map(f => f.name), - this.where(table.incrementalQuery || table.query, table.where), - table.uniqueKey, - table.bigquery && table.bigquery.updatePartitionFilter - ) - : this.insertInto( - table.target, - tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), - this.where(table.incrementalQuery || table.query, table.where) - ) - ) - ); - break; - } - } - } else { - tasks.add(Task.statement(this.createOrReplace(table))); - } - - this.postOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement)); - - return tasks.concatenate(); - } - - public assertTasks( - assertion: dataform.IAssertion, - projectConfig: dataform.IProjectConfig, - ): Tasks { - const tasks = new Tasks(); - const target = assertion.target; - // Create the view to check syntax of assertion - tasks.add(Task.statement(this.createOrReplaceView(target, assertion.query))); - - // Add assertion check - tasks.add(Task.assertion(`select sum(1) as row_count from ${this.resolveTarget(target)}`)); - return tasks; - } - - public dropIfExists(target: dataform.ITarget, type: dataform.TableMetadata.Type) { - return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`; - } - private createOrReplace(table: dataform.ITable) { const options = []; if (table.bigquery && table.bigquery.partitionBy && table.bigquery.partitionExpirationDays) { From fd68b672f9af850c9937c335ee34eecb25fec6bf Mon Sep 17 00:00:00 2001 From: SuchodolskiEdvin Date: Tue, 17 Mar 2026 20:24:22 +0000 Subject: [PATCH 3/5] - Split onSchemaChange queries to sub-tasks - Split sql queries to sun-functions - Fixed function names --- cli/api/dbadapters/execution_sql.ts | 124 ++++++++++++++++------------ 1 file changed, 72 insertions(+), 52 deletions(-) diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index 460e1a80e..9e97b5ac0 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -142,45 +142,12 @@ from (${query}) as insertions`; if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) { tasks.add(Task.statement(this.createOrReplace(table))); } else { - const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; + const onSchemaChange = table.onSchemaChange ?? dataform.OnSchemaChange.IGNORE; switch (onSchemaChange) { case dataform.OnSchemaChange.FAIL: case dataform.OnSchemaChange.EXTEND: case dataform.OnSchemaChange.SYNCHRONIZE: - const uniqueId = crypto.randomUUID().replace(/-/g, "_"); - - const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; - const emptyTempTableName = this.resolveTarget({ - ...table.target, - name: shortEmptyTableName - }); - - const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); - const dataTempTableName = this.resolveTarget({ - ...table.target, - name: shortDataTableName - }); - - const procedureName = this.createProcedureName(table.target, uniqueId); - const procedureBody = this.incrementalSchemaChangeBody( - table, - this.resolveTarget(table.target), - emptyTempTableName, - dataTempTableName, - shortEmptyTableName - ); - - const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() -OPTIONS(strict_mode=false) -BEGIN -${procedureBody} -END;`; - const callProcedureSql = this.safeCallProcedure( - procedureName, - emptyTempTableName, - dataTempTableName - ); - tasks.add(Task.statement(createProcedureSql + "\n" + callProcedureSql)); + this.buildIncrementalSchemaChangeTasks(tasks, table); break; case dataform.OnSchemaChange.IGNORE: default: @@ -231,7 +198,47 @@ END;`; return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`; } - private createProcedureName(target: dataform.ITarget, uniqueId: string): string { + private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) { + const uniqueId = crypto.randomUUID().replace(/-/g, "_"); + + const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; + const emptyTempTableName = this.resolveTarget({ + ...table.target, + name: shortEmptyTableName + }); + + const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); + const dataTempTableName = this.resolveTarget({ + ...table.target, + name: shortDataTableName + }); + + const procedureName = this.createProcedureName(table.target, uniqueId); + const procedureBody = this.incrementalSchemaChangeBody( + table, + this.resolveTarget(table.target), + emptyTempTableName, + dataTempTableName, + shortEmptyTableName + ); + + const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() +OPTIONS(strict_mode=false) +BEGIN +${procedureBody} +END;`; + + const callProcedureSql = this.safeCallAndDropProcedure( + procedureName, + emptyTempTableName, + dataTempTableName + ); + tasks.add(Task.statement(createProcedureSql)); + tasks.add(Task.statement(callProcedureSql)); + tasks.add(Task.statement(`DROP PROCEDURE IF EXISTS ${procedureName};`)); + } + + private createProcedureName(target: dataform.ITarget, uniqueId: string): string { // Procedure names cannot contain hyphens. const sanitizedUniqueId = uniqueId.replace(/-/g, "_"); return this.resolveTarget({ @@ -240,7 +247,7 @@ END;`; }); } - private safeCallProcedure( + private safeCallAndDropProcedure( procedureName: string, emptyTempTableName: string, dataTempTableName: string @@ -375,26 +382,42 @@ END FOR; qualifiedTargetTableName: string, dataTempTableName: string ): string { - let finalDmlSql = "\n-- Run final MERGE/INSERT."; + return [ + this.createIncrementalDataTempTableSql(table, dataTempTableName), + this.declareDataformColumnsListSql(), + this.executeMergeOrInsertSql(table, qualifiedTargetTableName, dataTempTableName) + ].join("\n"); + } - // Create temp table for incremental data. - finalDmlSql += ` + private createIncrementalDataTempTableSql(table: dataform.ITable, dataTempTableName: string): string { + return ` CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS ( - SELECT * FROM (${table.incrementalQuery || table.query}) + ${this.where(table.incrementalQuery || table.query, table.where)} );`; + } - // Generate dynamic column lists from temp_table_columns. - finalDmlSql += ` + private declareDataformColumnsListSql(): string { + return ` DECLARE dataform_columns_list STRING; SET dataform_columns_list = ( SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '') FROM UNNEST(temp_table_columns) );`; + } - // Run final MERGE/INSERT. + private executeMergeOrInsertSql( + table: dataform.ITable, + qualifiedTargetTableName: string, + dataTempTableName: string + ): string { if (table.uniqueKey && table.uniqueKey.length > 0) { const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and "); - finalDmlSql += ` + const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter; + const mergeOnClauseWithFilter = updatePartitionFilter + ? `${mergeOnClause} and T.${updatePartitionFilter}` + : mergeOnClause; + + return ` DECLARE dataform_columns_merge STRING; SET dataform_columns_merge = ( SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '') @@ -405,25 +428,22 @@ IF ARRAY_LENGTH(temp_table_columns) > 0 THEN EXECUTE IMMEDIATE ( "MERGE \`${qualifiedTargetTableName}\` T " || "USING \`${dataTempTableName}\` S " || - "ON ${mergeOnClause} " || + "ON ${mergeOnClauseWithFilter} " || "WHEN MATCHED THEN " || " UPDATE SET " || dataform_columns_merge || " " || "WHEN NOT MATCHED THEN " || " INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")" ); -END IF; -`; +END IF;`; } else { - finalDmlSql += ` + return ` IF ARRAY_LENGTH(temp_table_columns) > 0 THEN EXECUTE IMMEDIATE ( "INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " || "SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`" ); -END IF; -`; +END IF;`; } - return finalDmlSql; } private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string { From 23aae8f4297a2e08e8c9411d6501a888aac643d1 Mon Sep 17 00:00:00 2001 From: SuchodolskiEdvin Date: Fri, 10 Apr 2026 13:23:16 +0000 Subject: [PATCH 4/5] fix: Address feedback for PR #2101 --- cli/api/BUILD | 18 +- cli/api/dbadapters/execution_sql.ts | 181 +++++++++--------- .../{dbadapters => }/execution_sql_test.ts | 33 +++- 3 files changed, 118 insertions(+), 114 deletions(-) rename cli/api/{dbadapters => }/execution_sql_test.ts (77%) diff --git a/cli/api/BUILD b/cli/api/BUILD index ed2562829..c7f90777c 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -1,11 +1,9 @@ -load("//tools:ts_library.bzl", "ts_library") load("//testing:index.bzl", "ts_test_suite") load("//tools:node_modules.bzl", "node_modules") +load("//tools:ts_library.bzl", "ts_library") package(default_visibility = ["//visibility:public"]) -load("//tools:ts_library.bzl", "ts_library") - ts_library( name = "api", srcs = glob( @@ -57,23 +55,11 @@ node_modules( ], ) -# ts_test_suite( -# name = "tests", -# srcs = ["dbadapters/execution_sql_test.ts"], -# deps = [ -# ":api", -# "//core", -# "//protos:ts", -# "//testing", -# "@npm//@types/chai", -# "@npm//chai", -# ], -# ) - ts_test_suite( name = "tests", srcs = [ "utils_test.ts", + "execution_sql_test.ts", ], data = [ ":node_modules", diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index 9e97b5ac0..840930310 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -1,4 +1,3 @@ -import * as crypto from "crypto"; import * as semver from "semver"; import { concatenateQueries, Task, Tasks } from "df/cli/api/dbadapters/tasks"; @@ -122,6 +121,10 @@ from (${query}) as insertions`; return this.CompilationSql.resolveTarget(target); } + public getIncrementalQuery(table: dataform.ITable): string { + return this.where(table.incrementalQuery || table.query, table.where); + } + public publishTasks( table: dataform.ITable, runConfig: dataform.IRunConfig, @@ -157,14 +160,14 @@ from (${query}) as insertions`; ? this.mergeInto( table.target, tableMetadata?.fields.map(f => f.name), - this.where(table.incrementalQuery || table.query, table.where), + this.getIncrementalQuery(table), table.uniqueKey, table.bigquery && table.bigquery.updatePartitionFilter ) : this.insertInto( table.target, tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), - this.where(table.incrementalQuery || table.query, table.where) + this.getIncrementalQuery(table) ) ) ); @@ -199,7 +202,7 @@ from (${query}) as insertions`; } private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) { - const uniqueId = crypto.randomUUID().replace(/-/g, "_"); + const uniqueId = Math.random().toString(36).substring(2); const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; const emptyTempTableName = this.resolveTarget({ @@ -207,18 +210,11 @@ from (${query}) as insertions`; name: shortEmptyTableName }); - const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); - const dataTempTableName = this.resolveTarget({ - ...table.target, - name: shortDataTableName - }); - const procedureName = this.createProcedureName(table.target, uniqueId); const procedureBody = this.incrementalSchemaChangeBody( table, this.resolveTarget(table.target), emptyTempTableName, - dataTempTableName, shortEmptyTableName ); @@ -230,43 +226,37 @@ END;`; const callProcedureSql = this.safeCallAndDropProcedure( procedureName, - emptyTempTableName, - dataTempTableName + emptyTempTableName ); tasks.add(Task.statement(createProcedureSql)); tasks.add(Task.statement(callProcedureSql)); - tasks.add(Task.statement(`DROP PROCEDURE IF EXISTS ${procedureName};`)); } private createProcedureName(target: dataform.ITarget, uniqueId: string): string { - // Procedure names cannot contain hyphens. - const sanitizedUniqueId = uniqueId.replace(/-/g, "_"); return this.resolveTarget({ ...target, - name: `df_osc_${sanitizedUniqueId}` + name: `df_osc_${uniqueId}` }); } private safeCallAndDropProcedure( procedureName: string, - emptyTempTableName: string, - dataTempTableName: string + emptyTempTableName: string ): string { return ` BEGIN CALL ${procedureName}(); EXCEPTION WHEN ERROR THEN DROP TABLE IF EXISTS ${emptyTempTableName}; - DROP TABLE IF EXISTS ${dataTempTableName}; DROP PROCEDURE IF EXISTS ${procedureName}; RAISE; END; DROP PROCEDURE IF EXISTS ${procedureName};`; } - private inferSchemaSql(emptyTempTableName: string, query: string): string { + private createEmptyTempTableSql(emptyTempTableName: string, query: string): string { return ` --- Infer schema of new query. +-- Create empty table to extract schema of new query. CREATE OR REPLACE TABLE ${emptyTempTableName} AS ( SELECT * FROM (${query}) AS insertions LIMIT 0 );`; @@ -322,7 +312,7 @@ SET columns_removed = ( sql += ` IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN RAISE USING MESSAGE = FORMAT( - "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t", + "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T", columns_added, columns_removed ); @@ -333,69 +323,73 @@ END IF; sql += ` IF ARRAY_LENGTH(columns_removed) > 0 THEN RAISE USING MESSAGE = FORMAT( - "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t", + "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", columns_removed ); END IF; -FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO - EXECUTE IMMEDIATE FORMAT( - "ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s", - column_info.column_name, - column_info.data_type - ); -END FOR; +${this.alterTableAddColumnsSql(qualifiedTargetTableName)} `; break; case dataform.OnSchemaChange.SYNCHRONIZE: const uniqueKeys = table.uniqueKey || []; sql += ` -FOR removed_column_name IN (SELECT * FROM UNNEST(columns_removed)) DO - IF removed_column_name IN UNNEST(${JSON.stringify(uniqueKeys)}) THEN - RAISE USING MESSAGE = FORMAT( - "Cannot drop column %s as it is part of the unique key for table ${qualifiedTargetTableName}", - removed_column_name - ); - ELSE - EXECUTE IMMEDIATE FORMAT( - "ALTER TABLE ${qualifiedTargetTableName} DROP COLUMN IF EXISTS %s", - removed_column_name - ); - END IF; -END FOR; - -FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO - EXECUTE IMMEDIATE FORMAT( - "ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s", - column_info.column_name, - column_info.data_type +DECLARE invalid_removed_columns ARRAY; +SET invalid_removed_columns = ( + SELECT IFNULL(ARRAY_AGG(col), []) FROM UNNEST(columns_removed) AS col WHERE col IN UNNEST(${JSON.stringify(uniqueKeys)}) +); + +IF ARRAY_LENGTH(invalid_removed_columns) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Cannot drop columns %T as they are part of the unique key for table ${qualifiedTargetTableName}", + invalid_removed_columns + ); +END IF; + +IF ARRAY_LENGTH(columns_removed) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE ${qualifiedTargetTableName} " || + ( + SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ") + FROM UNNEST(columns_removed) AS col + ) ); -END FOR; +END IF; + +${this.alterTableAddColumnsSql(qualifiedTargetTableName)} `; break; } return sql; } + private alterTableAddColumnsSql(qualifiedTargetTableName: string): string { + return `IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE ${qualifiedTargetTableName} " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF;`; + } + private runFinalDmlSql( table: dataform.ITable, - qualifiedTargetTableName: string, - dataTempTableName: string + qualifiedTargetTableName: string ): string { + const query = this.getIncrementalQuery(table); + const escapedQuery = query.replace(/\\/g, "\\\\").replace(/"/g, '\\"'); + return [ - this.createIncrementalDataTempTableSql(table, dataTempTableName), this.declareDataformColumnsListSql(), - this.executeMergeOrInsertSql(table, qualifiedTargetTableName, dataTempTableName) + table.uniqueKey && table.uniqueKey.length > 0 + ? this.buildDynamicMergeSql(table, qualifiedTargetTableName, escapedQuery) + : this.buildDynamicInsertSql(table, qualifiedTargetTableName, escapedQuery) ].join("\n"); } - private createIncrementalDataTempTableSql(table: dataform.ITable, dataTempTableName: string): string { - return ` -CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS ( - ${this.where(table.incrementalQuery || table.query, table.where)} -);`; - } - private declareDataformColumnsListSql(): string { return ` DECLARE dataform_columns_list STRING; @@ -405,52 +399,56 @@ SET dataform_columns_list = ( );`; } - private executeMergeOrInsertSql( + private buildDynamicMergeSql( table: dataform.ITable, qualifiedTargetTableName: string, - dataTempTableName: string + escapedQuery: string ): string { - if (table.uniqueKey && table.uniqueKey.length > 0) { - const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and "); - const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter; - const mergeOnClauseWithFilter = updatePartitionFilter - ? `${mergeOnClause} and T.${updatePartitionFilter}` - : mergeOnClause; - - return ` + const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and "); + const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter; + const mergeOnClauseWithFilter = updatePartitionFilter + ? `${mergeOnClause} and T.${updatePartitionFilter}` + : mergeOnClause; + const escapedMergeOnClauseWithFilter = mergeOnClauseWithFilter.replace(/\\/g, "\\\\").replace(/"/g, '\\"'); + + return ` DECLARE dataform_columns_merge STRING; SET dataform_columns_merge = ( SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '') FROM UNNEST(temp_table_columns) ); -IF ARRAY_LENGTH(temp_table_columns) > 0 THEN EXECUTE IMMEDIATE ( - "MERGE \`${qualifiedTargetTableName}\` T " || - "USING \`${dataTempTableName}\` S " || - "ON ${mergeOnClauseWithFilter} " || + "MERGE ${qualifiedTargetTableName} T " || + "USING (" || + """${escapedQuery}""" || + ") S " || + "ON ${escapedMergeOnClauseWithFilter} " || "WHEN MATCHED THEN " || " UPDATE SET " || dataform_columns_merge || " " || "WHEN NOT MATCHED THEN " || " INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")" - ); -END IF;`; - } else { - return ` -IF ARRAY_LENGTH(temp_table_columns) > 0 THEN + );`; + } + + private buildDynamicInsertSql( + table: dataform.ITable, + qualifiedTargetTableName: string, + escapedQuery: string + ): string { + return ` EXECUTE IMMEDIATE ( - "INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " || - "SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`" - ); -END IF;`; - } + "INSERT INTO ${qualifiedTargetTableName} (" || dataform_columns_list || ") " || + "SELECT " || dataform_columns_list || " FROM (" || + """${escapedQuery}""" || + ")" + );`; } - private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string { + private cleanupSql(emptyTempTableName: string): string { return ` -- Cleanup temporary tables. DROP TABLE IF EXISTS ${emptyTempTableName}; -DROP TABLE IF EXISTS ${dataTempTableName}; `; } @@ -458,11 +456,10 @@ DROP TABLE IF EXISTS ${dataTempTableName}; table: dataform.ITable, qualifiedTargetTableName: string, emptyTempTableName: string, - dataTempTableName: string, shortEmptyTableName: string ): string { const statements: string[] = [ - this.inferSchemaSql(emptyTempTableName, table.incrementalQuery || table.query), + this.createEmptyTempTableSql(emptyTempTableName, this.getIncrementalQuery(table)), this.compareSchemasSql( table.target.database, table.target.schema, @@ -470,8 +467,8 @@ DROP TABLE IF EXISTS ${dataTempTableName}; shortEmptyTableName ), this.applySchemaChangeStrategySql(table, qualifiedTargetTableName), - this.runFinalDmlSql(table, qualifiedTargetTableName, dataTempTableName), - this.cleanupSql(emptyTempTableName, dataTempTableName) + this.runFinalDmlSql(table, qualifiedTargetTableName), + this.cleanupSql(emptyTempTableName) ]; return statements.join("\n\n"); diff --git a/cli/api/dbadapters/execution_sql_test.ts b/cli/api/execution_sql_test.ts similarity index 77% rename from cli/api/dbadapters/execution_sql_test.ts rename to cli/api/execution_sql_test.ts index ee9e2a1f8..67a4df9aa 100644 --- a/cli/api/dbadapters/execution_sql_test.ts +++ b/cli/api/execution_sql_test.ts @@ -50,7 +50,7 @@ suite("ExecutionSql with 'onSchemaChange'", () => { /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i ); expect(procedureSql).to.include( - `"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t"` + `"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T"` ); expect(procedureSql).to.match(/call `project-id.dataset-id.df_osc_.*`\(\)/i); expect(procedureSql).to.include("EXCEPTION WHEN ERROR THEN"); @@ -69,9 +69,17 @@ suite("ExecutionSql with 'onSchemaChange'", () => { /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i ); expect(procedureSql).to.include( - `"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t"` + `"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T"` + ); + expect(procedureSql).to.include( + `SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")` + ); + expect(procedureSql).to.include( + `"INSERT INTO \`project-id.dataset-id.incremental_on_schema_change\` (" || dataform_columns_list || ") " ||` + ); + expect(procedureSql).to.include( + `"""select 1 as id, 'a' as field1, 'new' as field2"""` ); - expect(procedureSql).to.include("ADD COLUMN IF NOT EXISTS"); }); test("generates procedure for SYNCHRONIZE strategy", () => { @@ -86,8 +94,21 @@ suite("ExecutionSql with 'onSchemaChange'", () => { expect(procedureSql).to.match( /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i ); - expect(procedureSql).to.include("ADD COLUMN IF NOT EXISTS"); - expect(procedureSql).to.include("DROP COLUMN IF EXISTS"); + expect(procedureSql).to.include( + `SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ")` + ); + expect(procedureSql).to.include( + `SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")` + ); + expect(procedureSql).to.include( + `"MERGE \`project-id.dataset-id.incremental_on_schema_change\` T " ||` + ); + expect(procedureSql).to.include( + `"ON T.\`id\` = S.\`id\` " ||` + ); + expect(procedureSql).to.include( + `"""select 1 as id, 'a' as field1, 'new' as field2"""` + ); }); test("SYNCHRONIZE strategy prevents dropping unique keys", () => { @@ -108,7 +129,7 @@ suite("ExecutionSql with 'onSchemaChange'", () => { ); const procedureSql = tasks.build()[0].statement; expect(procedureSql).to.include( - `"Cannot drop column %s as it is part of the unique key for table` + `"Cannot drop columns %T as they are part of the unique key for table` ); }); From da913c2b808abad4b421375936b2b93ed4742af6 Mon Sep 17 00:00:00 2001 From: SuchodolskiEdvin Date: Wed, 22 Apr 2026 09:00:08 +0000 Subject: [PATCH 5/5] fix: Address feedback for PR #2101 and fix presubmit failures - Updated e2e tests - Added golden files - Consolidated merge and insert functions --- cli/api/BUILD | 2 +- cli/api/dbadapters/execution_sql.ts | 119 +++---------- cli/api/execution_sql_test.ts | 88 ++-------- cli/api/goldens/on_schema_change_extend.sql | 78 +++++++++ cli/api/goldens/on_schema_change_fail.sql | 69 ++++++++ cli/api/goldens/on_schema_change_ignore.sql | 9 + .../goldens/on_schema_change_synchronize.sql | 98 +++++++++++ cli/index_run_e2e_test.ts | 163 ++++++++++++++++++ 8 files changed, 455 insertions(+), 171 deletions(-) create mode 100644 cli/api/goldens/on_schema_change_extend.sql create mode 100644 cli/api/goldens/on_schema_change_fail.sql create mode 100644 cli/api/goldens/on_schema_change_ignore.sql create mode 100644 cli/api/goldens/on_schema_change_synchronize.sql diff --git a/cli/api/BUILD b/cli/api/BUILD index c7f90777c..d3b4f578a 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -67,7 +67,7 @@ ts_test_suite( "//test_credentials:bigquery.json", "@nodejs//:node", "@nodejs//:npm", - ], + ] + glob(["goldens/**"]), deps = [ "//cli/api", "//core", diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index 840930310..33d1990e4 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -18,7 +18,8 @@ export class ExecutionSql { constructor( private readonly project: dataform.IProjectConfig, - private readonly dataformCoreVersion: string + private readonly dataformCoreVersion: string, + private readonly uniqueIdGenerator: () => string = () => Math.random().toString(36).substring(2) ) { this.CompilationSql = new CompilationSql(project, dataformCoreVersion); } @@ -151,7 +152,7 @@ from (${query}) as insertions`; case dataform.OnSchemaChange.EXTEND: case dataform.OnSchemaChange.SYNCHRONIZE: this.buildIncrementalSchemaChangeTasks(tasks, table); - break; + // Fall through to run the static DML after the procedure alters the schema case dataform.OnSchemaChange.IGNORE: default: tasks.add( @@ -202,20 +203,18 @@ from (${query}) as insertions`; } private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) { - const uniqueId = Math.random().toString(36).substring(2); + const uniqueId = this.uniqueIdGenerator(); - const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; - const emptyTempTableName = this.resolveTarget({ + const emptyTempTableTarget = { ...table.target, - name: shortEmptyTableName - }); + name: `${table.target.name}_df_temp_${uniqueId}_empty` + }; const procedureName = this.createProcedureName(table.target, uniqueId); const procedureBody = this.incrementalSchemaChangeBody( table, this.resolveTarget(table.target), - emptyTempTableName, - shortEmptyTableName + emptyTempTableTarget ); const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() @@ -226,7 +225,7 @@ END;`; const callProcedureSql = this.safeCallAndDropProcedure( procedureName, - emptyTempTableName + this.resolveTarget(emptyTempTableTarget) ); tasks.add(Task.statement(createProcedureSql)); tasks.add(Task.statement(callProcedureSql)); @@ -263,10 +262,8 @@ CREATE OR REPLACE TABLE ${emptyTempTableName} AS ( } private compareSchemasSql( - database: string, - schema: string, - targetName: string, - shortEmptyTableName: string + target: dataform.ITarget, + emptyTempTableTarget: dataform.ITarget ): string { return ` -- Compare schemas @@ -277,14 +274,14 @@ DECLARE columns_removed ARRAY; SET dataform_columns = ( SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) - FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\` - WHERE table_name = '${targetName}' + FROM \`${target.database}.${target.schema}.INFORMATION_SCHEMA.COLUMNS\` + WHERE table_name = '${target.name}' ); SET temp_table_columns = ( SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) - FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\` - WHERE table_name = '${shortEmptyTableName}' + FROM \`${emptyTempTableTarget.database}.${emptyTempTableTarget.schema}.INFORMATION_SCHEMA.COLUMNS\` + WHERE table_name = '${emptyTempTableTarget.name}' ); SET columns_added = ( @@ -375,76 +372,6 @@ ${this.alterTableAddColumnsSql(qualifiedTargetTableName)} END IF;`; } - private runFinalDmlSql( - table: dataform.ITable, - qualifiedTargetTableName: string - ): string { - const query = this.getIncrementalQuery(table); - const escapedQuery = query.replace(/\\/g, "\\\\").replace(/"/g, '\\"'); - - return [ - this.declareDataformColumnsListSql(), - table.uniqueKey && table.uniqueKey.length > 0 - ? this.buildDynamicMergeSql(table, qualifiedTargetTableName, escapedQuery) - : this.buildDynamicInsertSql(table, qualifiedTargetTableName, escapedQuery) - ].join("\n"); - } - - private declareDataformColumnsListSql(): string { - return ` -DECLARE dataform_columns_list STRING; -SET dataform_columns_list = ( - SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '') - FROM UNNEST(temp_table_columns) -);`; - } - - private buildDynamicMergeSql( - table: dataform.ITable, - qualifiedTargetTableName: string, - escapedQuery: string - ): string { - const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and "); - const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter; - const mergeOnClauseWithFilter = updatePartitionFilter - ? `${mergeOnClause} and T.${updatePartitionFilter}` - : mergeOnClause; - const escapedMergeOnClauseWithFilter = mergeOnClauseWithFilter.replace(/\\/g, "\\\\").replace(/"/g, '\\"'); - - return ` -DECLARE dataform_columns_merge STRING; -SET dataform_columns_merge = ( - SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '') - FROM UNNEST(temp_table_columns) -); - - EXECUTE IMMEDIATE ( - "MERGE ${qualifiedTargetTableName} T " || - "USING (" || - """${escapedQuery}""" || - ") S " || - "ON ${escapedMergeOnClauseWithFilter} " || - "WHEN MATCHED THEN " || - " UPDATE SET " || dataform_columns_merge || " " || - "WHEN NOT MATCHED THEN " || - " INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")" - );`; - } - - private buildDynamicInsertSql( - table: dataform.ITable, - qualifiedTargetTableName: string, - escapedQuery: string - ): string { - return ` - EXECUTE IMMEDIATE ( - "INSERT INTO ${qualifiedTargetTableName} (" || dataform_columns_list || ") " || - "SELECT " || dataform_columns_list || " FROM (" || - """${escapedQuery}""" || - ")" - );`; - } - private cleanupSql(emptyTempTableName: string): string { return ` -- Cleanup temporary tables. @@ -455,19 +382,17 @@ DROP TABLE IF EXISTS ${emptyTempTableName}; private incrementalSchemaChangeBody( table: dataform.ITable, qualifiedTargetTableName: string, - emptyTempTableName: string, - shortEmptyTableName: string + emptyTempTableTarget: dataform.ITarget ): string { + const emptyTempTableName = this.resolveTarget(emptyTempTableTarget); + const query = this.getIncrementalQuery(table); const statements: string[] = [ - this.createEmptyTempTableSql(emptyTempTableName, this.getIncrementalQuery(table)), + this.createEmptyTempTableSql(emptyTempTableName, query), this.compareSchemasSql( - table.target.database, - table.target.schema, - table.target.name, - shortEmptyTableName + table.target, + emptyTempTableTarget ), this.applySchemaChangeStrategySql(table, qualifiedTargetTableName), - this.runFinalDmlSql(table, qualifiedTargetTableName), this.cleanupSql(emptyTempTableName) ]; @@ -506,7 +431,7 @@ DROP TABLE IF EXISTS ${emptyTempTableName}; create or replace view ${this.resolveTarget(target)} as ${query}`; } - private mergeInto( + private mergeInto( target: dataform.ITarget, columns: string[], query: string, diff --git a/cli/api/execution_sql_test.ts b/cli/api/execution_sql_test.ts index 67a4df9aa..4a8cf5568 100644 --- a/cli/api/execution_sql_test.ts +++ b/cli/api/execution_sql_test.ts @@ -1,4 +1,5 @@ import { expect } from "chai"; +import * as fs from "fs-extra"; import { ExecutionSql } from "df/cli/api/dbadapters/execution_sql"; import { dataform } from "df/protos/ts"; @@ -10,7 +11,8 @@ suite("ExecutionSql with 'onSchemaChange'", () => { defaultDatabase: "project-id", defaultSchema: "dataset-id" }, - "2.0.0" + "2.0.0", + () => "test_uuid" ); const baseTable: dataform.ITable = { @@ -45,16 +47,9 @@ suite("ExecutionSql with 'onSchemaChange'", () => { onSchemaChange: dataform.OnSchemaChange.FAIL }; const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); - const procedureSql = tasks.build()[0].statement; - expect(procedureSql).to.match( - /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i - ); - expect(procedureSql).to.include( - `"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T"` - ); - expect(procedureSql).to.match(/call `project-id.dataset-id.df_osc_.*`\(\)/i); - expect(procedureSql).to.include("EXCEPTION WHEN ERROR THEN"); - expect(procedureSql).to.match(/drop procedure if exists `project-id.dataset-id.df_osc_.*`/i); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_fail.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); }); test("generates procedure for EXTEND strategy", () => { @@ -63,23 +58,9 @@ suite("ExecutionSql with 'onSchemaChange'", () => { onSchemaChange: dataform.OnSchemaChange.EXTEND }; const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); - const procedureSql = tasks.build()[0].statement; - - expect(procedureSql).to.match( - /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i - ); - expect(procedureSql).to.include( - `"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T"` - ); - expect(procedureSql).to.include( - `SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")` - ); - expect(procedureSql).to.include( - `"INSERT INTO \`project-id.dataset-id.incremental_on_schema_change\` (" || dataform_columns_list || ") " ||` - ); - expect(procedureSql).to.include( - `"""select 1 as id, 'a' as field1, 'new' as field2"""` - ); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_extend.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); }); test("generates procedure for SYNCHRONIZE strategy", () => { @@ -89,48 +70,9 @@ suite("ExecutionSql with 'onSchemaChange'", () => { uniqueKey: ["id"] }; const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); - const procedureSql = tasks.build()[0].statement; - - expect(procedureSql).to.match( - /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i - ); - expect(procedureSql).to.include( - `SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ")` - ); - expect(procedureSql).to.include( - `SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")` - ); - expect(procedureSql).to.include( - `"MERGE \`project-id.dataset-id.incremental_on_schema_change\` T " ||` - ); - expect(procedureSql).to.include( - `"ON T.\`id\` = S.\`id\` " ||` - ); - expect(procedureSql).to.include( - `"""select 1 as id, 'a' as field1, 'new' as field2"""` - ); - }); - - test("SYNCHRONIZE strategy prevents dropping unique keys", () => { - const tableWithExtraField = { - ...baseTable, - onSchemaChange: dataform.OnSchemaChange.SYNCHRONIZE, - uniqueKey: ["field_to_be_removed"] - }; - const tasks = executionSql.publishTasks( - tableWithExtraField, - { fullRefresh: false }, - { - ...tableMetadata, - fields: [ - { name: "field_to_be_removed", primitive: dataform.Field.Primitive.STRING } - ] - } - ); - const procedureSql = tasks.build()[0].statement; - expect(procedureSql).to.include( - `"Cannot drop columns %T as they are part of the unique key for table` - ); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_synchronize.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); }); test("generates simple merge for IGNORE strategy", () => { @@ -140,8 +82,8 @@ suite("ExecutionSql with 'onSchemaChange'", () => { uniqueKey: ["id"] }; const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); - const mergeSql = tasks.build()[0].statement; - expect(mergeSql).to.include("merge `project-id.dataset-id.incremental_on_schema_change` T"); - expect(mergeSql).to.not.include("create or replace procedure"); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); }); }); diff --git a/cli/api/goldens/on_schema_change_extend.sql b/cli/api/goldens/on_schema_change_extend.sql new file mode 100644 index 000000000..21f9558a8 --- /dev/null +++ b/cli/api/goldens/on_schema_change_extend.sql @@ -0,0 +1,78 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (EXTEND). +IF ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", + columns_removed + ); +END IF; + +IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +insert into `project-id.dataset-id.incremental_on_schema_change` +(`id`,`field1`) +select `id`,`field1` +from (select 1 as id, 'a' as field1, 'new' as field2) as insertions \ No newline at end of file diff --git a/cli/api/goldens/on_schema_change_fail.sql b/cli/api/goldens/on_schema_change_fail.sql new file mode 100644 index 000000000..ea8d90542 --- /dev/null +++ b/cli/api/goldens/on_schema_change_fail.sql @@ -0,0 +1,69 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (FAIL). +IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T", + columns_added, + columns_removed + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +insert into `project-id.dataset-id.incremental_on_schema_change` +(`id`,`field1`) +select `id`,`field1` +from (select 1 as id, 'a' as field1, 'new' as field2) as insertions \ No newline at end of file diff --git a/cli/api/goldens/on_schema_change_ignore.sql b/cli/api/goldens/on_schema_change_ignore.sql new file mode 100644 index 000000000..0f9c016ba --- /dev/null +++ b/cli/api/goldens/on_schema_change_ignore.sql @@ -0,0 +1,9 @@ +merge `project-id.dataset-id.incremental_on_schema_change` T +using (select 1 as id, 'a' as field1, 'new' as field2 +) S +on T.id = S.id + +when matched then + update set `id` = S.id,`field1` = S.field1 +when not matched then + insert (`id`,`field1`) values (`id`,`field1`) \ No newline at end of file diff --git a/cli/api/goldens/on_schema_change_synchronize.sql b/cli/api/goldens/on_schema_change_synchronize.sql new file mode 100644 index 000000000..bcfc63eac --- /dev/null +++ b/cli/api/goldens/on_schema_change_synchronize.sql @@ -0,0 +1,98 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (SYNCHRONIZE). +DECLARE invalid_removed_columns ARRAY; +SET invalid_removed_columns = ( + SELECT IFNULL(ARRAY_AGG(col), []) FROM UNNEST(columns_removed) AS col WHERE col IN UNNEST(["id"]) +); + +IF ARRAY_LENGTH(invalid_removed_columns) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Cannot drop columns %T as they are part of the unique key for table `project-id.dataset-id.incremental_on_schema_change`", + invalid_removed_columns + ); +END IF; + +IF ARRAY_LENGTH(columns_removed) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ") + FROM UNNEST(columns_removed) AS col + ) + ); +END IF; + +IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +merge `project-id.dataset-id.incremental_on_schema_change` T +using (select 1 as id, 'a' as field1, 'new' as field2 +) S +on T.id = S.id + +when matched then + update set `id` = S.id,`field1` = S.field1 +when not matched then + insert (`id`,`field1`) values (`id`,`field1`) \ No newline at end of file diff --git a/cli/index_run_e2e_test.ts b/cli/index_run_e2e_test.ts index ba675834a..a91c59c90 100644 --- a/cli/index_run_e2e_test.ts +++ b/cli/index_run_e2e_test.ts @@ -489,4 +489,167 @@ SELECT 1 as id }); }); }); + + suite("onSchemaChange", ({ beforeEach }) => { + const projectDir = tmpDirFixture.createNewTmpDir(); + const uniqueDataset = `dataform_e2e_osc_${Math.random().toString(36).substring(7)}`; + + beforeEach("setup test project", async () => { + const npmCacheDir = tmpDirFixture.createNewTmpDir(); + const workflowSettingsPath = path.join(projectDir, "workflow_settings.yaml"); + const packageJsonPath = path.join(projectDir, "package.json"); + + await getProcessResult( + execFile(nodePath, [cliEntryPointPath, "init", projectDir, DEFAULT_DATABASE, DEFAULT_LOCATION]) + ); + + const workflowSettings = dataform.WorkflowSettings.create( + loadYaml(fs.readFileSync(workflowSettingsPath, "utf8")) + ); + delete workflowSettings.dataformCoreVersion; + workflowSettings.defaultDataset = uniqueDataset; + fs.writeFileSync(workflowSettingsPath, dumpYaml(workflowSettings)); + + fs.writeFileSync( + packageJsonPath, + `{ + "dependencies":{ + "@dataform/core": "${version}" + } +}` + ); + await getProcessResult( + execFile(npmPath, [ + "install", + "--prefix", + projectDir, + "--cache", + npmCacheDir, + corePackageTarPath + ]) + ); + + fs.ensureFileSync(path.join(projectDir, "definitions", "setup_table.sqlx")); + fs.writeFileSync( + path.join(projectDir, "definitions", "setup_table.sqlx"), + ` +config { + type: "operations" +} +CREATE OR REPLACE TABLE \`\${dataform.projectConfig.defaultDatabase}.\${dataform.projectConfig.defaultSchema}.example_incremental\` AS SELECT 1 AS id, 'old' AS field1 +` + ); + + fs.ensureFileSync(path.join(projectDir, "definitions", "example_incremental.sqlx")); + fs.writeFileSync( + path.join(projectDir, "definitions", "example_incremental.sqlx"), + ` +config { + type: "incremental", + onSchemaChange: "EXTEND" +} +SELECT 1 as id, 'new' as field1, 'new2' as field2 +` + ); + + fs.ensureFileSync(path.join(projectDir, "definitions", "teardown_schema.sqlx")); + fs.writeFileSync( + path.join(projectDir, "definitions", "teardown_schema.sqlx"), + ` +config { + type: "operations" +} +DROP SCHEMA IF EXISTS \`\${dataform.projectConfig.defaultDatabase}.\${dataform.projectConfig.defaultSchema}\` CASCADE +` + ); + }); + + test("generates dynamic SQL for EXTEND when table exists in BigQuery", async () => { + try { + // Run setup operation to create the table in BigQuery. + // Dataform will automatically create the uniqueDataset schema. + await getProcessResult( + execFile(nodePath, [ + cliEntryPointPath, + "run", + projectDir, + "--credentials", + CREDENTIALS_PATH, + "--actions=setup_table" + ]) + ); + + // Run the incremental table in dry-run mode. + // Dataform will detect the table exists and generate the dynamic procedural SQL. + const runResult = await getProcessResult( + execFile(nodePath, [ + cliEntryPointPath, + "run", + projectDir, + "--credentials", + CREDENTIALS_PATH, + "--dry-run", + "--json", + "--actions=example_incremental" + ]) + ); + + expect(runResult.exitCode).equals(0); + const executionGraph = JSON.parse(runResult.stdout); + const statement = executionGraph.actions[0].tasks[0].statement; + + const expectedRunResult = { + projectConfig: { + warehouse: "bigquery", + defaultSchema: uniqueDataset, + assertionSchema: "dataform_assertions", + defaultDatabase: DEFAULT_DATABASE, + defaultLocation: DEFAULT_LOCATION + }, + runConfig: { + actions: ["example_incremental"], + fullRefresh: false + }, + actions: [ + { + fileName: "definitions/example_incremental.sqlx", + hermeticity: "NON_HERMETIC", + tableType: "incremental", + target: { + database: DEFAULT_DATABASE, + name: "example_incremental", + schema: uniqueDataset + }, + tasks: [ + { + statement, + type: "statement" + } + ], + type: "table" + } + ], + warehouseState: executionGraph.warehouseState + }; + + expect(executionGraph).deep.equals(expectedRunResult); + expect(statement).to.include("CREATE OR REPLACE PROCEDURE"); + expect(statement).to.include("Column removals are not allowed when on_schema_change = 'EXTEND'."); + expect(statement).to.include("ALTER TABLE"); + expect(statement).to.include("ADD COLUMN IF NOT EXISTS"); + } finally { + // Teardown the schema completely, regardless of test success or failure. + await getProcessResult( + execFile(nodePath, [ + cliEntryPointPath, + "run", + projectDir, + "--credentials", + CREDENTIALS_PATH, + "--actions=teardown_schema" + ]) + ); + } + }); + }); });