Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cli/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ ts_library(
"@npm//@types/yargs",
"@npm//chokidar",
"@npm//glob",
"@npm//parse-duration",
"@npm//readline-sync",
"@npm//untildify",
"@npm//yargs",
Expand Down
1 change: 1 addition & 0 deletions cli/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ts_library(
"@npm//deepmerge",
"@npm//fs-extra",
"@npm//glob",
"@npm//google-auth-library",
"@npm//google-sql-syntax-ts",
"@npm//js-beautify",
"@npm//js-yaml",
Expand Down
61 changes: 42 additions & 19 deletions cli/api/dbadapters/bigquery.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery";
import { GoogleAuth, Impersonated } from "google-auth-library";
import Long from "long";
import { PromisePoolExecutor } from "promise-pool-executor";

import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to break lint checks

Step #1: ERROR: /workspace/cli/api/dbadapters/bigquery.ts:4:1 - Import sources within a group must be alphabetized.
Step #1: ERROR: /workspace/cli/api/dbadapters/bigquery.ts:5:1 - Imports from this module are not allowed in this group.  The expected groups (in order) are: external, internal.

import { collectEvaluationQueries, QueryOrAction } from "df/cli/api/dbadapters/execution_sql";
import { IBigQueryError, IDbAdapter, IDbClient, IExecutionResult, OnCancel } from "df/cli/api/dbadapters/index";
import { parseBigqueryEvalError } from "df/cli/api/utils/error_parsing";
Expand All @@ -10,7 +10,9 @@ import { coerceAsError } from "df/common/errors/errors";
import { retry } from "df/common/promises";
import { dataform } from "df/protos/ts";

const GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform";
const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"];
const IMPERSONATION_GOOGLE_SCOPES = [GOOGLE_CLOUD_PLATFORM_SCOPE, ...EXTRA_GOOGLE_SCOPES];

const BIGQUERY_DATE_RELATED_FIELDS = [
"BigQueryDate",
Expand Down Expand Up @@ -104,8 +106,8 @@ export class BigQueryDbAdapter implements IDbAdapter {
try {
await this.pool
.addSingleTask({
generator: () =>
this.getClient().query({
generator: async () =>
(await this.getClient()).query({
useLegacySql: false,
query,
dryRun: true
Expand All @@ -130,7 +132,8 @@ export class BigQueryDbAdapter implements IDbAdapter {
}

public async tables(): Promise<dataform.ITarget[]> {
const datasets = await this.getClient().getDatasets({ autoPaginate: true, maxResults: 1000 });
const client = await this.getClient();
const datasets = await client.getDatasets({ autoPaginate: true, maxResults: 1000 });
const tables = await Promise.all(
datasets[0].map(dataset => dataset.getTables({ autoPaginate: true, maxResults: 1000 }))
);
Expand Down Expand Up @@ -219,7 +222,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
}

public async schemas(database: string): Promise<string[]> {
const data = await this.getClient(database).getDatasets();
const data = await (await this.getClient(database)).getDatasets();
return data[0].map(dataset => dataset.id);
}

Expand All @@ -239,7 +242,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
metadata.schema.fields
);

await this.getClient(target.database)
await (await this.getClient(target.database))
.dataset(target.schema)
.table(target.name)
.setMetadata({
Expand All @@ -251,7 +254,7 @@ export class BigQueryDbAdapter implements IDbAdapter {

private async getMetadata(target: dataform.ITarget): Promise<TableMetadata> {
try {
const table = await this.getClient(target.database)
const table = await (await this.getClient(target.database))
.dataset(target.schema)
.table(target.name)
.getMetadata();
Expand All @@ -266,19 +269,38 @@ export class BigQueryDbAdapter implements IDbAdapter {
}
}

private getClient(projectId?: string) {
private async getClient(projectId?: string) {
Comment thread
kolina marked this conversation as resolved.
projectId = projectId || this.bigQueryCredentials.projectId;
if (!this.clients.has(projectId)) {
this.clients.set(
const clientConfig: any = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQueryOptions instead of any?

projectId,
new BigQuery({
scopes: EXTRA_GOOGLE_SCOPES,
Comment on lines 276 to +277
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are projectId and scopes here used by the auth library if you ovewrite authClient? If they're not used in this case, I'd only set them in else branch below

location: this.bigQueryCredentials.location
};

if (this.bigQueryCredentials.impersonateServiceAccount) {
// For impersonation, create an Impersonated credential directly
const sourceAuth = new GoogleAuth({
projectId,
scopes: EXTRA_GOOGLE_SCOPES,
location: this.bigQueryCredentials.location,
scopes: IMPERSONATION_GOOGLE_SCOPES,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an explaining comment why we're passing different set of scopes with impersonation and without

credentials:
this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials)
})
);
this.bigQueryCredentials.credentials &&
JSON.parse(this.bigQueryCredentials.credentials)
});

const authClient = await sourceAuth.getClient();

clientConfig.authClient = new Impersonated({
sourceClient: authClient,
targetPrincipal: this.bigQueryCredentials.impersonateServiceAccount,
targetScopes: IMPERSONATION_GOOGLE_SCOPES
});
} else {
clientConfig.credentials =
this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials);
}

this.clients.set(projectId, new BigQuery(clientConfig));
}
return this.clients.get(projectId);
}
Expand All @@ -290,12 +312,12 @@ export class BigQueryDbAdapter implements IDbAdapter {
byteLimit?: number,
location?: string
) {
const results = await new Promise<any[]>((resolve, reject) => {
const results = await new Promise<any[]>(async (resolve, reject) => {
const allRows = new LimitedResultSet({
rowLimit,
byteLimit
});
const stream = this.getClient().createQueryStream({
const stream = (await this.getClient()).createQueryStream({
query,
params,
location
Expand Down Expand Up @@ -332,7 +354,8 @@ export class BigQueryDbAdapter implements IDbAdapter {
return retry(
async () => {
try {
const job = await this.getClient().createQueryJob({
const client = await this.getClient();
const job = await client.createQueryJob({
useLegacySql: false,
jobPrefix: "dataform-" + (jobPrefix ? `${jobPrefix}-` : ""),
query,
Expand Down
23 changes: 19 additions & 4 deletions cli/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as chokidar from "chokidar";
import * as fs from "fs";
import * as glob from "glob";
import parseDuration from "parse-duration";
import * as path from "path";
import yargs from "yargs";

Expand All @@ -28,6 +27,7 @@ import {
actuallyResolve,
assertPathExists,
compiledGraphHasErrors,
parseCliDuration,
promptForIcebergConfig,
} from "df/cli/util";
import { createYargsCli, INamedOption } from "df/cli/yargswrapper";
Expand Down Expand Up @@ -174,7 +174,7 @@ const timeoutOption: INamedOption<yargs.Options> = {
type: "string",
default: null,
coerce: (rawTimeoutString: string | null) =>
rawTimeoutString ? parseDuration(rawTimeoutString) : null
rawTimeoutString ? parseCliDuration(rawTimeoutString) : null
}
};

Expand Down Expand Up @@ -207,6 +207,13 @@ const bigqueryJobLabelsOption: INamedOption<yargs.Options> = {
}
};

const impersonateServiceAccountOption: INamedOption<yargs.Options> = {
name: "impersonate-service-account",
option: {
describe: "Service account email to impersonate during authentication.",
type: "string"
}
};
const quietCompileOption: INamedOption<yargs.Options> = {
name: "quiet",
option: {
Expand Down Expand Up @@ -503,7 +510,7 @@ export function runCli() {
format: `test [${projectDirMustExistOption.name}]`,
description: "Run the dataform project's unit tests.",
positionalOptions: [projectDirMustExistOption],
options: [credentialsOption, timeoutOption, ...ProjectConfigOptions.allYargsOptions],
options: [credentialsOption, impersonateServiceAccountOption, timeoutOption, ...ProjectConfigOptions.allYargsOptions],
processFn: async argv => {
print("Compiling...\n");
const compiledGraph = await compile({
Expand All @@ -519,6 +526,10 @@ export function runCli() {
const readCredentials = credentials.read(
getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name])
);
if (argv[impersonateServiceAccountOption.name]) {
(readCredentials as any).impersonateServiceAccount =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend dataform.IBigQuery with your new option to avoid dynamic casts breaking static typing?

argv[impersonateServiceAccountOption.name];
}

if (!compiledGraph.tests.length) {
printError("No unit tests found.");
Expand Down Expand Up @@ -563,10 +574,10 @@ export function runCli() {
},
actionsOption,
credentialsOption,
impersonateServiceAccountOption,
fullRefreshOption,
includeDepsOption,
includeDependentsOption,
credentialsOption,
jsonOutputOption,
timeoutOption,
tagsOption,
Expand Down Expand Up @@ -599,6 +610,10 @@ export function runCli() {
const readCredentials = credentials.read(
getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name])
);
if (argv[impersonateServiceAccountOption.name]) {
(readCredentials as any).impersonateServiceAccount =
argv[impersonateServiceAccountOption.name];
}

const dbadapter = new BigQueryDbAdapter(readCredentials);
const executionGraph = await build(
Expand Down
68 changes: 68 additions & 0 deletions cli/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,74 @@ export function formatBytesInHumanReadableFormat(bytes: number): string {
return `${value} ${units[i]}`;
}

const DURATION_UNITS_IN_MILLIS: { [unit: string]: number } = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate about the effort to avoid it and upgrade parse-duration dependency?

ms: 1,
msec: 1,
msecs: 1,
millisecond: 1,
milliseconds: 1,
s: 1000,
sec: 1000,
secs: 1000,
second: 1000,
seconds: 1000,
m: 60 * 1000,
min: 60 * 1000,
mins: 60 * 1000,
minute: 60 * 1000,
minutes: 60 * 1000,
h: 60 * 60 * 1000,
hr: 60 * 60 * 1000,
hrs: 60 * 60 * 1000,
hour: 60 * 60 * 1000,
hours: 60 * 60 * 1000,
d: 24 * 60 * 60 * 1000,
day: 24 * 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000,
w: 7 * 24 * 60 * 60 * 1000,
week: 7 * 24 * 60 * 60 * 1000,
weeks: 7 * 24 * 60 * 60 * 1000
};

export function parseCliDuration(rawDuration: string): number {
const normalizedDuration = rawDuration?.trim().toLowerCase();
if (!normalizedDuration) {
throw new Error("Duration cannot be empty.");
}

if (/^[+-]?\d+(\.\d+)?$/.test(normalizedDuration)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint checks are failing

Step #1: ERROR: /workspace/cli/util.ts:90:7 - Unsafe Regular Expression
Step #1: ERROR: /workspace/cli/util.ts:97:27 - Unsafe Regular Expression

return Number(normalizedDuration);
}

let totalDurationMillis = 0;
let matchFound = false;
let cursor = 0;
const durationPattern = /([+-]?\d+(?:\.\d+)?)\s*([a-z]+)/g;

for (let match = durationPattern.exec(normalizedDuration); match; match = durationPattern.exec(normalizedDuration)) {
if (normalizedDuration.slice(cursor, match.index).trim()) {
throw new Error(`Invalid duration: ${rawDuration}`);
}

const durationValue = Number(match[1]);
const durationUnit = match[2];
const unitMillis = DURATION_UNITS_IN_MILLIS[durationUnit];
if (unitMillis === undefined) {
throw new Error(`Unsupported duration unit: ${durationUnit}`);
}

totalDurationMillis += durationValue * unitMillis;
cursor = durationPattern.lastIndex;
matchFound = true;
}

if (!matchFound || normalizedDuration.slice(cursor).trim()) {
throw new Error(`Invalid duration: ${rawDuration}`);
}

return totalDurationMillis;
}

/**
* Handles prompting and validation for defaultBucketName, defaultTableFolderRoot
* and defaultTableFolderSubpath if the user provides the --iceberg flag when
Expand Down
25 changes: 25 additions & 0 deletions cli/util_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { expect } from "chai";
import {
formatBytesInHumanReadableFormat,
formatExecutionSuffix,
parseCliDuration,
validateIcebergConfigBucketName,
validateIcebergConfigTableFolderRoot,
validateIcebergConfigTableFolderSubpath,
Expand Down Expand Up @@ -35,6 +36,30 @@ suite('format bytes in human readable format', () => {
});
});

suite("parse cli duration", () => {
test("parses numeric durations as milliseconds", () => {
expect(parseCliDuration("1500")).equals(1500);
});

test("parses single-unit durations", () => {
expect(parseCliDuration("1s")).equals(1000);
expect(parseCliDuration("10m")).equals(600000);
expect(parseCliDuration("2 hours")).equals(7200000);
});

test("parses compound and fractional durations", () => {
expect(parseCliDuration("1h30m")).equals(5400000);
expect(parseCliDuration("1.5m")).equals(90000);
expect(parseCliDuration("1 week 2 days")).equals(777600000);
});

test("rejects invalid durations", () => {
expect(() => parseCliDuration("")).to.throw("Duration cannot be empty.");
expect(() => parseCliDuration("tomorrow")).to.throw("Invalid duration: tomorrow");
expect(() => parseCliDuration("1fortnight")).to.throw("Unsupported duration unit: fortnight");
});
});

suite('Iceberg Config Validation', () => {
suite('validateIcebergConfigBucketName', () => {
test('valid bucket names do not throw errors', () => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"estraverse": "^5.1.0",
"fs-extra": "^9.0.0",
"glob": "^10.5.0",
"google-auth-library": "~8.9.0",
"google-sql-syntax-ts": "^1.0.3",
"js-beautify": "^1.10.2",
"js-yaml": "^4.1.1",
Expand All @@ -45,7 +46,6 @@
"minimist": "^1.2.6",
"moo": "^0.5.0",
"object-sizeof": "^1.6.1",
"parse-duration": "^1.0.0",
"prettier": "^1.14.2",
"promise-pool-executor": "^1.1.1",
"protobufjs": "^7.2.5",
Expand Down
2 changes: 1 addition & 1 deletion packages/@dataform/cli/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ externals = [
"deepmerge",
"fs-extra",
"glob",
"google-auth-library",
"google-sql-syntax-ts",
"js-beautify",
"js-yaml",
"moo",
"object-sizeof",
"parse-duration",
"promise-pool-executor",
"protobufjs",
"readline-sync",
Expand Down
2 changes: 2 additions & 0 deletions protos/profiles.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ message BigQuery {
string credentials = 3;
// Options are listed here: https://cloud.google.com/bigquery/docs/locations
string location = 4;
// Service account email to impersonate during authentication
string impersonate_service_account = 5;

reserved 2;
}
Loading
Loading