Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cli/api/BUILD
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
load("//tools:ts_library.bzl", "ts_library")
load("//testing:index.bzl", "ts_test_suite")
load("//tools:node_modules.bzl", "node_modules")
load("//tools:ts_library.bzl", "ts_library")

package(default_visibility = ["//visibility:public"])

Expand Down Expand Up @@ -59,14 +59,15 @@ ts_test_suite(
name = "tests",
srcs = [
"utils_test.ts",
"execution_sql_test.ts",
],
data = [
":node_modules",
"//packages/@dataform/core:package_tar",
"//test_credentials:bigquery.json",
"@nodejs//:node",
"@nodejs//:npm",
],
] + glob(["goldens/**"]),
deps = [
"//cli/api",
"//core",
Expand Down
251 changes: 232 additions & 19 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export class ExecutionSql {

constructor(
private readonly project: dataform.IProjectConfig,
private readonly dataformCoreVersion: string
private readonly dataformCoreVersion: string,
private readonly uniqueIdGenerator: () => string = () => Math.random().toString(36).substring(2)
) {
this.CompilationSql = new CompilationSql(project, dataformCoreVersion);
}
Expand Down Expand Up @@ -121,6 +122,10 @@ from (${query}) as insertions`;
return this.CompilationSql.resolveTarget(target);
}

public getIncrementalQuery(table: dataform.ITable): string {
return this.where(table.incrementalQuery || table.query, table.where);
}

public publishTasks(
table: dataform.ITable,
runConfig: dataform.IRunConfig,
Expand All @@ -141,23 +146,34 @@ from (${query}) as insertions`;
if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.where(table.incrementalQuery || table.query, table.where),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.where(table.incrementalQuery || table.query, table.where)
)
)
);
const onSchemaChange = table.onSchemaChange ?? dataform.OnSchemaChange.IGNORE;
switch (onSchemaChange) {
case dataform.OnSchemaChange.FAIL:
case dataform.OnSchemaChange.EXTEND:
case dataform.OnSchemaChange.SYNCHRONIZE:
this.buildIncrementalSchemaChangeTasks(tasks, table);
// Fall through to run the static DML after the procedure alters the schema
case dataform.OnSchemaChange.IGNORE:
default:
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.getIncrementalQuery(table),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.getIncrementalQuery(table)
)
)
);
break;
}
}
} else {
tasks.add(Task.statement(this.createOrReplace(table)));
Expand Down Expand Up @@ -186,6 +202,203 @@ from (${query}) as insertions`;
return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`;
}

private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) {
const uniqueId = this.uniqueIdGenerator();

const emptyTempTableTarget = {
...table.target,
name: `${table.target.name}_df_temp_${uniqueId}_empty`
};

const procedureName = this.createProcedureName(table.target, uniqueId);
const procedureBody = this.incrementalSchemaChangeBody(
table,
this.resolveTarget(table.target),
emptyTempTableTarget
);

const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}()
OPTIONS(strict_mode=false)
BEGIN
${procedureBody}
END;`;

const callProcedureSql = this.safeCallAndDropProcedure(
procedureName,
this.resolveTarget(emptyTempTableTarget)
);
tasks.add(Task.statement(createProcedureSql));
tasks.add(Task.statement(callProcedureSql));
}

private createProcedureName(target: dataform.ITarget, uniqueId: string): string {
return this.resolveTarget({
...target,
name: `df_osc_${uniqueId}`
});
}

private safeCallAndDropProcedure(
procedureName: string,
emptyTempTableName: string
): string {
return `
BEGIN
CALL ${procedureName}();
EXCEPTION WHEN ERROR THEN
DROP TABLE IF EXISTS ${emptyTempTableName};
DROP PROCEDURE IF EXISTS ${procedureName};
RAISE;
END;
DROP PROCEDURE IF EXISTS ${procedureName};`;
}

private createEmptyTempTableSql(emptyTempTableName: string, query: string): string {
return `
-- Create empty table to extract schema of new query.
CREATE OR REPLACE TABLE ${emptyTempTableName} AS (
SELECT * FROM (${query}) AS insertions LIMIT 0
);`;
}

private compareSchemasSql(
target: dataform.ITarget,
emptyTempTableTarget: dataform.ITarget
): string {
return `
-- Compare schemas
DECLARE dataform_columns ARRAY<STRING>;
DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_removed ARRAY<STRING>;

SET dataform_columns = (
SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), [])
FROM \`${target.database}.${target.schema}.INFORMATION_SCHEMA.COLUMNS\`
WHERE table_name = '${target.name}'
);

SET temp_table_columns = (
SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), [])
FROM \`${emptyTempTableTarget.database}.${emptyTempTableTarget.schema}.INFORMATION_SCHEMA.COLUMNS\`
WHERE table_name = '${emptyTempTableTarget.name}'
);

SET columns_added = (
SELECT IFNULL(ARRAY_AGG(column_info), [])
FROM UNNEST(temp_table_columns) AS column_info
WHERE column_info.column_name NOT IN UNNEST(dataform_columns)
);
SET columns_removed = (
SELECT IFNULL(ARRAY_AGG(column_name), [])
FROM UNNEST(dataform_columns) AS column_name
WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col)
);`;
}

private applySchemaChangeStrategySql(
table: dataform.ITable,
qualifiedTargetTableName: string
): string {
const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE;
let sql = `
-- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just an informative comment added into the compiled SQL. In case users would want to inspect the query logs they can easily find which strategy was triggered. I can remove it if you find it unnecessary.


switch (onSchemaChange) {
case dataform.OnSchemaChange.FAIL:
sql += `
IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T",
columns_added,
columns_removed
);
END IF;
`;
break;
case dataform.OnSchemaChange.EXTEND:
sql += `
IF ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T",
columns_removed
);
END IF;

${this.alterTableAddColumnsSql(qualifiedTargetTableName)}
`;
break;
case dataform.OnSchemaChange.SYNCHRONIZE:
const uniqueKeys = table.uniqueKey || [];
sql += `
DECLARE invalid_removed_columns ARRAY<STRING>;
SET invalid_removed_columns = (
SELECT IFNULL(ARRAY_AGG(col), []) FROM UNNEST(columns_removed) AS col WHERE col IN UNNEST(${JSON.stringify(uniqueKeys)})
);

IF ARRAY_LENGTH(invalid_removed_columns) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Cannot drop columns %T as they are part of the unique key for table ${qualifiedTargetTableName}",
invalid_removed_columns
);
END IF;

IF ARRAY_LENGTH(columns_removed) > 0 THEN
EXECUTE IMMEDIATE (
"ALTER TABLE ${qualifiedTargetTableName} " ||
(
SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ")
FROM UNNEST(columns_removed) AS col
)
);
END IF;

${this.alterTableAddColumnsSql(qualifiedTargetTableName)}
`;
break;
}
return sql;
}

private alterTableAddColumnsSql(qualifiedTargetTableName: string): string {
return `IF ARRAY_LENGTH(columns_added) > 0 THEN
EXECUTE IMMEDIATE (
"ALTER TABLE ${qualifiedTargetTableName} " ||
(
SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")
FROM UNNEST(columns_added) AS column_info
)
);
END IF;`;
}

private cleanupSql(emptyTempTableName: string): string {
return `
-- Cleanup temporary tables.
DROP TABLE IF EXISTS ${emptyTempTableName};
`;
}

private incrementalSchemaChangeBody(
table: dataform.ITable,
qualifiedTargetTableName: string,
emptyTempTableTarget: dataform.ITarget
): string {
const emptyTempTableName = this.resolveTarget(emptyTempTableTarget);
const query = this.getIncrementalQuery(table);
const statements: string[] = [
this.createEmptyTempTableSql(emptyTempTableName, query),
this.compareSchemasSql(
table.target,
emptyTempTableTarget
),
this.applySchemaChangeStrategySql(table, qualifiedTargetTableName),
this.cleanupSql(emptyTempTableName)
];

return statements.join("\n\n");
}

private createOrReplace(table: dataform.ITable) {
const options = [];
if (table.bigquery && table.bigquery.partitionBy && table.bigquery.partitionExpirationDays) {
Expand Down Expand Up @@ -218,7 +431,7 @@ from (${query}) as insertions`;
create or replace view ${this.resolveTarget(target)} as ${query}`;
}

private mergeInto(
private mergeInto(
target: dataform.ITarget,
columns: string[],
query: string,
Expand Down
Loading
Loading