diff --git a/cli/api/BUILD b/cli/api/BUILD index 352f3ef06..d3b4f578a 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -1,6 +1,6 @@ -load("//tools:ts_library.bzl", "ts_library") load("//testing:index.bzl", "ts_test_suite") load("//tools:node_modules.bzl", "node_modules") +load("//tools:ts_library.bzl", "ts_library") package(default_visibility = ["//visibility:public"]) @@ -59,6 +59,7 @@ ts_test_suite( name = "tests", srcs = [ "utils_test.ts", + "execution_sql_test.ts", ], data = [ ":node_modules", @@ -66,7 +67,7 @@ ts_test_suite( "//test_credentials:bigquery.json", "@nodejs//:node", "@nodejs//:npm", - ], + ] + glob(["goldens/**"]), deps = [ "//cli/api", "//core", diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index f226b9e91..33d1990e4 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -18,7 +18,8 @@ export class ExecutionSql { constructor( private readonly project: dataform.IProjectConfig, - private readonly dataformCoreVersion: string + private readonly dataformCoreVersion: string, + private readonly uniqueIdGenerator: () => string = () => Math.random().toString(36).substring(2) ) { this.CompilationSql = new CompilationSql(project, dataformCoreVersion); } @@ -121,6 +122,10 @@ from (${query}) as insertions`; return this.CompilationSql.resolveTarget(target); } + public getIncrementalQuery(table: dataform.ITable): string { + return this.where(table.incrementalQuery || table.query, table.where); + } + public publishTasks( table: dataform.ITable, runConfig: dataform.IRunConfig, @@ -141,23 +146,34 @@ from (${query}) as insertions`; if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) { tasks.add(Task.statement(this.createOrReplace(table))); } else { - tasks.add( - Task.statement( - table.uniqueKey && table.uniqueKey.length > 0 - ? this.mergeInto( - table.target, - tableMetadata?.fields.map(f => f.name), - this.where(table.incrementalQuery || table.query, table.where), - table.uniqueKey, - table.bigquery && table.bigquery.updatePartitionFilter - ) - : this.insertInto( - table.target, - tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), - this.where(table.incrementalQuery || table.query, table.where) - ) - ) - ); + const onSchemaChange = table.onSchemaChange ?? dataform.OnSchemaChange.IGNORE; + switch (onSchemaChange) { + case dataform.OnSchemaChange.FAIL: + case dataform.OnSchemaChange.EXTEND: + case dataform.OnSchemaChange.SYNCHRONIZE: + this.buildIncrementalSchemaChangeTasks(tasks, table); + // Fall through to run the static DML after the procedure alters the schema + case dataform.OnSchemaChange.IGNORE: + default: + tasks.add( + Task.statement( + table.uniqueKey && table.uniqueKey.length > 0 + ? this.mergeInto( + table.target, + tableMetadata?.fields.map(f => f.name), + this.getIncrementalQuery(table), + table.uniqueKey, + table.bigquery && table.bigquery.updatePartitionFilter + ) + : this.insertInto( + table.target, + tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), + this.getIncrementalQuery(table) + ) + ) + ); + break; + } } } else { tasks.add(Task.statement(this.createOrReplace(table))); @@ -186,6 +202,203 @@ from (${query}) as insertions`; return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`; } + private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) { + const uniqueId = this.uniqueIdGenerator(); + + const emptyTempTableTarget = { + ...table.target, + name: `${table.target.name}_df_temp_${uniqueId}_empty` + }; + + const procedureName = this.createProcedureName(table.target, uniqueId); + const procedureBody = this.incrementalSchemaChangeBody( + table, + this.resolveTarget(table.target), + emptyTempTableTarget + ); + + const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() +OPTIONS(strict_mode=false) +BEGIN +${procedureBody} +END;`; + + const callProcedureSql = this.safeCallAndDropProcedure( + procedureName, + this.resolveTarget(emptyTempTableTarget) + ); + tasks.add(Task.statement(createProcedureSql)); + tasks.add(Task.statement(callProcedureSql)); + } + + private createProcedureName(target: dataform.ITarget, uniqueId: string): string { + return this.resolveTarget({ + ...target, + name: `df_osc_${uniqueId}` + }); + } + + private safeCallAndDropProcedure( + procedureName: string, + emptyTempTableName: string + ): string { + return ` +BEGIN + CALL ${procedureName}(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS ${emptyTempTableName}; + DROP PROCEDURE IF EXISTS ${procedureName}; + RAISE; +END; +DROP PROCEDURE IF EXISTS ${procedureName};`; + } + + private createEmptyTempTableSql(emptyTempTableName: string, query: string): string { + return ` +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE ${emptyTempTableName} AS ( + SELECT * FROM (${query}) AS insertions LIMIT 0 +);`; + } + + private compareSchemasSql( + target: dataform.ITarget, + emptyTempTableTarget: dataform.ITarget + ): string { + return ` +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM \`${target.database}.${target.schema}.INFORMATION_SCHEMA.COLUMNS\` + WHERE table_name = '${target.name}' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM \`${emptyTempTableTarget.database}.${emptyTempTableTarget.schema}.INFORMATION_SCHEMA.COLUMNS\` + WHERE table_name = '${emptyTempTableTarget.name}' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +);`; + } + + private applySchemaChangeStrategySql( + table: dataform.ITable, + qualifiedTargetTableName: string + ): string { + const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; + let sql = ` +-- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`; + + switch (onSchemaChange) { + case dataform.OnSchemaChange.FAIL: + sql += ` +IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T", + columns_added, + columns_removed + ); +END IF; +`; + break; + case dataform.OnSchemaChange.EXTEND: + sql += ` +IF ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", + columns_removed + ); +END IF; + +${this.alterTableAddColumnsSql(qualifiedTargetTableName)} +`; + break; + case dataform.OnSchemaChange.SYNCHRONIZE: + const uniqueKeys = table.uniqueKey || []; + sql += ` +DECLARE invalid_removed_columns ARRAY; +SET invalid_removed_columns = ( + SELECT IFNULL(ARRAY_AGG(col), []) FROM UNNEST(columns_removed) AS col WHERE col IN UNNEST(${JSON.stringify(uniqueKeys)}) +); + +IF ARRAY_LENGTH(invalid_removed_columns) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Cannot drop columns %T as they are part of the unique key for table ${qualifiedTargetTableName}", + invalid_removed_columns + ); +END IF; + +IF ARRAY_LENGTH(columns_removed) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE ${qualifiedTargetTableName} " || + ( + SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ") + FROM UNNEST(columns_removed) AS col + ) + ); +END IF; + +${this.alterTableAddColumnsSql(qualifiedTargetTableName)} +`; + break; + } + return sql; + } + + private alterTableAddColumnsSql(qualifiedTargetTableName: string): string { + return `IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE ${qualifiedTargetTableName} " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF;`; + } + + private cleanupSql(emptyTempTableName: string): string { + return ` +-- Cleanup temporary tables. +DROP TABLE IF EXISTS ${emptyTempTableName}; + `; + } + + private incrementalSchemaChangeBody( + table: dataform.ITable, + qualifiedTargetTableName: string, + emptyTempTableTarget: dataform.ITarget + ): string { + const emptyTempTableName = this.resolveTarget(emptyTempTableTarget); + const query = this.getIncrementalQuery(table); + const statements: string[] = [ + this.createEmptyTempTableSql(emptyTempTableName, query), + this.compareSchemasSql( + table.target, + emptyTempTableTarget + ), + this.applySchemaChangeStrategySql(table, qualifiedTargetTableName), + this.cleanupSql(emptyTempTableName) + ]; + + return statements.join("\n\n"); + } + private createOrReplace(table: dataform.ITable) { const options = []; if (table.bigquery && table.bigquery.partitionBy && table.bigquery.partitionExpirationDays) { @@ -218,7 +431,7 @@ from (${query}) as insertions`; create or replace view ${this.resolveTarget(target)} as ${query}`; } - private mergeInto( + private mergeInto( target: dataform.ITarget, columns: string[], query: string, diff --git a/cli/api/execution_sql_test.ts b/cli/api/execution_sql_test.ts new file mode 100644 index 000000000..4a8cf5568 --- /dev/null +++ b/cli/api/execution_sql_test.ts @@ -0,0 +1,89 @@ +import { expect } from "chai"; +import * as fs from "fs-extra"; + +import { ExecutionSql } from "df/cli/api/dbadapters/execution_sql"; +import { dataform } from "df/protos/ts"; +import { suite, test } from "df/testing"; + +suite("ExecutionSql with 'onSchemaChange'", () => { + const executionSql = new ExecutionSql( + { + defaultDatabase: "project-id", + defaultSchema: "dataset-id" + }, + "2.0.0", + () => "test_uuid" + ); + + const baseTable: dataform.ITable = { + type: "incremental", + enumType: dataform.TableType.INCREMENTAL, + target: { + database: "project-id", + schema: "dataset-id", + name: "incremental_on_schema_change" + }, + query: "select 1 as id, 'a' as field1", + incrementalQuery: "select 1 as id, 'a' as field1, 'new' as field2" + }; + + const tableMetadata: dataform.ITableMetadata = { + type: dataform.TableMetadata.Type.TABLE, + fields: [ + { + name: "id", + primitive: dataform.Field.Primitive.INTEGER + }, + { + name: "field1", + primitive: dataform.Field.Primitive.STRING + } + ] + }; + + test("generates procedure for FAIL strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.FAIL + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_fail.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); + }); + + test("generates procedure for EXTEND strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.EXTEND + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_extend.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); + }); + + test("generates procedure for SYNCHRONIZE strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.SYNCHRONIZE, + uniqueKey: ["id"] + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_synchronize.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); + }); + + test("generates simple merge for IGNORE strategy", () => { + const table = { + ...baseTable, + onSchemaChange: dataform.OnSchemaChange.IGNORE, + uniqueKey: ["id"] + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const procedureSql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8"); + expect(procedureSql).to.equal(expectedSql.trim()); + }); +}); diff --git a/cli/api/goldens/on_schema_change_extend.sql b/cli/api/goldens/on_schema_change_extend.sql new file mode 100644 index 000000000..21f9558a8 --- /dev/null +++ b/cli/api/goldens/on_schema_change_extend.sql @@ -0,0 +1,78 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (EXTEND). +IF ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", + columns_removed + ); +END IF; + +IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +insert into `project-id.dataset-id.incremental_on_schema_change` +(`id`,`field1`) +select `id`,`field1` +from (select 1 as id, 'a' as field1, 'new' as field2) as insertions \ No newline at end of file diff --git a/cli/api/goldens/on_schema_change_fail.sql b/cli/api/goldens/on_schema_change_fail.sql new file mode 100644 index 000000000..ea8d90542 --- /dev/null +++ b/cli/api/goldens/on_schema_change_fail.sql @@ -0,0 +1,69 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (FAIL). +IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T", + columns_added, + columns_removed + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +insert into `project-id.dataset-id.incremental_on_schema_change` +(`id`,`field1`) +select `id`,`field1` +from (select 1 as id, 'a' as field1, 'new' as field2) as insertions \ No newline at end of file diff --git a/cli/api/goldens/on_schema_change_ignore.sql b/cli/api/goldens/on_schema_change_ignore.sql new file mode 100644 index 000000000..0f9c016ba --- /dev/null +++ b/cli/api/goldens/on_schema_change_ignore.sql @@ -0,0 +1,9 @@ +merge `project-id.dataset-id.incremental_on_schema_change` T +using (select 1 as id, 'a' as field1, 'new' as field2 +) S +on T.id = S.id + +when matched then + update set `id` = S.id,`field1` = S.field1 +when not matched then + insert (`id`,`field1`) values (`id`,`field1`) \ No newline at end of file diff --git a/cli/api/goldens/on_schema_change_synchronize.sql b/cli/api/goldens/on_schema_change_synchronize.sql new file mode 100644 index 000000000..bcfc63eac --- /dev/null +++ b/cli/api/goldens/on_schema_change_synchronize.sql @@ -0,0 +1,98 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (SYNCHRONIZE). +DECLARE invalid_removed_columns ARRAY; +SET invalid_removed_columns = ( + SELECT IFNULL(ARRAY_AGG(col), []) FROM UNNEST(columns_removed) AS col WHERE col IN UNNEST(["id"]) +); + +IF ARRAY_LENGTH(invalid_removed_columns) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Cannot drop columns %T as they are part of the unique key for table `project-id.dataset-id.incremental_on_schema_change`", + invalid_removed_columns + ); +END IF; + +IF ARRAY_LENGTH(columns_removed) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ") + FROM UNNEST(columns_removed) AS col + ) + ); +END IF; + +IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +merge `project-id.dataset-id.incremental_on_schema_change` T +using (select 1 as id, 'a' as field1, 'new' as field2 +) S +on T.id = S.id + +when matched then + update set `id` = S.id,`field1` = S.field1 +when not matched then + insert (`id`,`field1`) values (`id`,`field1`) \ No newline at end of file diff --git a/cli/index_run_e2e_test.ts b/cli/index_run_e2e_test.ts index ba675834a..a91c59c90 100644 --- a/cli/index_run_e2e_test.ts +++ b/cli/index_run_e2e_test.ts @@ -489,4 +489,167 @@ SELECT 1 as id }); }); }); + + suite("onSchemaChange", ({ beforeEach }) => { + const projectDir = tmpDirFixture.createNewTmpDir(); + const uniqueDataset = `dataform_e2e_osc_${Math.random().toString(36).substring(7)}`; + + beforeEach("setup test project", async () => { + const npmCacheDir = tmpDirFixture.createNewTmpDir(); + const workflowSettingsPath = path.join(projectDir, "workflow_settings.yaml"); + const packageJsonPath = path.join(projectDir, "package.json"); + + await getProcessResult( + execFile(nodePath, [cliEntryPointPath, "init", projectDir, DEFAULT_DATABASE, DEFAULT_LOCATION]) + ); + + const workflowSettings = dataform.WorkflowSettings.create( + loadYaml(fs.readFileSync(workflowSettingsPath, "utf8")) + ); + delete workflowSettings.dataformCoreVersion; + workflowSettings.defaultDataset = uniqueDataset; + fs.writeFileSync(workflowSettingsPath, dumpYaml(workflowSettings)); + + fs.writeFileSync( + packageJsonPath, + `{ + "dependencies":{ + "@dataform/core": "${version}" + } +}` + ); + await getProcessResult( + execFile(npmPath, [ + "install", + "--prefix", + projectDir, + "--cache", + npmCacheDir, + corePackageTarPath + ]) + ); + + fs.ensureFileSync(path.join(projectDir, "definitions", "setup_table.sqlx")); + fs.writeFileSync( + path.join(projectDir, "definitions", "setup_table.sqlx"), + ` +config { + type: "operations" +} +CREATE OR REPLACE TABLE \`\${dataform.projectConfig.defaultDatabase}.\${dataform.projectConfig.defaultSchema}.example_incremental\` AS SELECT 1 AS id, 'old' AS field1 +` + ); + + fs.ensureFileSync(path.join(projectDir, "definitions", "example_incremental.sqlx")); + fs.writeFileSync( + path.join(projectDir, "definitions", "example_incremental.sqlx"), + ` +config { + type: "incremental", + onSchemaChange: "EXTEND" +} +SELECT 1 as id, 'new' as field1, 'new2' as field2 +` + ); + + fs.ensureFileSync(path.join(projectDir, "definitions", "teardown_schema.sqlx")); + fs.writeFileSync( + path.join(projectDir, "definitions", "teardown_schema.sqlx"), + ` +config { + type: "operations" +} +DROP SCHEMA IF EXISTS \`\${dataform.projectConfig.defaultDatabase}.\${dataform.projectConfig.defaultSchema}\` CASCADE +` + ); + }); + + test("generates dynamic SQL for EXTEND when table exists in BigQuery", async () => { + try { + // Run setup operation to create the table in BigQuery. + // Dataform will automatically create the uniqueDataset schema. + await getProcessResult( + execFile(nodePath, [ + cliEntryPointPath, + "run", + projectDir, + "--credentials", + CREDENTIALS_PATH, + "--actions=setup_table" + ]) + ); + + // Run the incremental table in dry-run mode. + // Dataform will detect the table exists and generate the dynamic procedural SQL. + const runResult = await getProcessResult( + execFile(nodePath, [ + cliEntryPointPath, + "run", + projectDir, + "--credentials", + CREDENTIALS_PATH, + "--dry-run", + "--json", + "--actions=example_incremental" + ]) + ); + + expect(runResult.exitCode).equals(0); + const executionGraph = JSON.parse(runResult.stdout); + const statement = executionGraph.actions[0].tasks[0].statement; + + const expectedRunResult = { + projectConfig: { + warehouse: "bigquery", + defaultSchema: uniqueDataset, + assertionSchema: "dataform_assertions", + defaultDatabase: DEFAULT_DATABASE, + defaultLocation: DEFAULT_LOCATION + }, + runConfig: { + actions: ["example_incremental"], + fullRefresh: false + }, + actions: [ + { + fileName: "definitions/example_incremental.sqlx", + hermeticity: "NON_HERMETIC", + tableType: "incremental", + target: { + database: DEFAULT_DATABASE, + name: "example_incremental", + schema: uniqueDataset + }, + tasks: [ + { + statement, + type: "statement" + } + ], + type: "table" + } + ], + warehouseState: executionGraph.warehouseState + }; + + expect(executionGraph).deep.equals(expectedRunResult); + expect(statement).to.include("CREATE OR REPLACE PROCEDURE"); + expect(statement).to.include("Column removals are not allowed when on_schema_change = 'EXTEND'."); + expect(statement).to.include("ALTER TABLE"); + expect(statement).to.include("ADD COLUMN IF NOT EXISTS"); + } finally { + // Teardown the schema completely, regardless of test success or failure. + await getProcessResult( + execFile(nodePath, [ + cliEntryPointPath, + "run", + projectDir, + "--credentials", + CREDENTIALS_PATH, + "--actions=teardown_schema" + ]) + ); + } + }); + }); });