Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
{% macro get_column_obj_and_monitors(model_relation, column_name, monitors=none) %}

{% set column_obj_and_monitors = [] %}
{% set column_objects = adapter.get_columns_in_relation(model_relation) %}

{% if target.type == 'bigquery' %}
{% set expanded = [] %}
{% for col in column_objects %}
{% do expanded.append(col) %}
{% if col.fields | length > 0 %}
{# `BigQueryColumn.flatten()` discards ancestor modes, so a
NULLABLE leaf under a REPEATED ancestor still satisfies
`leaf.mode != 'REPEATED'`. Build the set of safe leaf names
via an ancestor-aware walker and filter `flatten()` against
it. #}
{% set safe_names = elementary.bq_safe_leaf_names(col) %}
{% for leaf in col.flatten() %}
{% if leaf.name in safe_names %}
{% do expanded.append(leaf) %}
{% endif %}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{% endfor %}
{% endif %}
{% endfor %}
{% set column_objects = expanded %}
{% endif %}

{% for column_obj in column_objects %}
{% if column_obj.name.strip('"') | lower == column_name.strip('"') | lower %}
{% set wrapped = elementary.wrap_column_for_struct_support(column_obj) %}
{% set column_monitors = elementary.column_monitors_by_type(
elementary.get_column_data_type(column_obj), monitors
elementary.get_column_data_type(wrapped), monitors
) %}
{% set column_item = {"column": column_obj, "monitors": column_monitors} %}
{% set column_item = {"column": wrapped, "monitors": column_monitors} %}
{{ return(column_item) }}
{% endif %}
{% endfor %}
Expand All @@ -22,10 +44,11 @@
{% set column_objects = adapter.get_columns_in_relation(model_relation) %}

{% for column_obj in column_objects %}
{% set wrapped = elementary.wrap_column_for_struct_support(column_obj) %}
{% set column_monitors = elementary.column_monitors_by_type(
elementary.get_column_data_type(column_obj), monitors
elementary.get_column_data_type(wrapped), monitors
) %}
{% set column_item = {"column": column_obj, "monitors": column_monitors} %}
{% set column_item = {"column": wrapped, "monitors": column_monitors} %}
{% do column_obj_and_monitors.append(column_item) %}
{% endfor %}

Expand Down
127 changes: 118 additions & 9 deletions macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
{%- set timestamp_column = metric_properties.timestamp_column %}
{% set prefixed_dimensions = [] %}
{% for dimension_column in dimensions %}
{% do prefixed_dimensions.append("dimension_" ~ dimension_column) %}
{% do prefixed_dimensions.append(
"dimension_" ~ elementary.bq_safe_alias(dimension_column)
) %}
{% endfor %}

{% set metric_types = [] %}
Expand Down Expand Up @@ -53,7 +55,7 @@
),
filtered_monitored_table as (
select
{{ column_obj.quoted }},
{{ column_obj.quoted }} as {{ column_obj.safe_alias }},
{%- if dimensions -%}
{{
elementary.select_dimensions_columns(
Expand All @@ -78,7 +80,7 @@
{%- else %}
filtered_monitored_table as (
select
{{ column_obj.quoted }},
{{ column_obj.quoted }} as {{ column_obj.safe_alias }},
{%- if dimensions -%}
{{
elementary.select_dimensions_columns(
Expand All @@ -94,7 +96,7 @@
column_metrics as (

{%- if column_metrics %}
{%- set column = column_obj.quoted -%}
{%- set column = column_obj.safe_alias -%}
select
{%- if timestamp_column %}
edr_bucket_start as bucket_start, edr_bucket_end as bucket_end,
Expand Down Expand Up @@ -341,17 +343,124 @@
{% endif %}
{% endmacro %}

{# Updated to segment-quote nested dimensions on BigQuery and sanitise the
alias suffix. Backward compatible for non-nested columns and non-BQ adapters. #}
{% macro select_dimensions_columns(dimension_columns, as_prefix="") %}
{% set select_statements %}
{%- for column in dimension_columns -%}
{{ column }}
{%- if as_prefix -%}
{{ " as " ~ as_prefix ~ "_" ~ column }}
{%- endif -%}
{%- if not loop.last -%}
{{ ", " }}
{%- set _is_nested_bq = (target.type == 'bigquery' and '.' in column) -%}
{%- set _source = elementary.bq_segment_quote(column) if _is_nested_bq else column -%}
{%- set _alias_suffix = elementary.bq_safe_alias(column) if _is_nested_bq else column -%}
{{ _source }}{{ " as " ~ as_prefix ~ "_" ~ _alias_suffix }}
{%- else -%}
{{ column }}
{%- endif -%}
{%- if not loop.last -%}{{ ", " }}{%- endif -%}
{%- endfor -%}
{% endset %}
{{ return(select_statements) }}
{% endmacro %}


{# ---------------------------------------------------------------------- #}
{# BigQuery STRUCT nested-field helpers. #}
{# ---------------------------------------------------------------------- #}

{# Segment-quote a (possibly dotted) identifier for BigQuery.
Returns `<seg1>`.`<seg2>`.`<seg3>` for dotted paths, `<name>` otherwise.
For non-BigQuery adapters, returns the name unchanged (preserves existing
behaviour at all callsites). #}
{% macro bq_segment_quote(name) %}
{%- if target.type == 'bigquery' -%}
{%- if '.' in name -%}
{%- set parts = [] -%}
{%- for seg in name.split('.') -%}
{%- do parts.append('`' ~ seg ~ '`') -%}
{%- endfor -%}
{{ parts | join('.') }}
{%- else -%}
`{{ name }}`
{%- endif -%}
{%- else -%}
{{ name }}
{%- endif -%}
{% endmacro %}

{# Convert a (possibly dotted) identifier into a dot-free alias safe to use
as a SQL identifier. No-op for names without dots. #}
{% macro bq_safe_alias(name) %}
{{- name | replace('.', '__') -}}
{% endmacro %}

{# Wrap a Column / BigQueryColumn with a dict carrying both the SQL identifier
representation (.quoted, segment-quoted for nested) and a CTE-projection-safe
alias (.safe_alias, dot-free). For non-nested columns and non-BigQuery
adapters the wrapper mirrors the original Column's values, so downstream
consumers (which use only attribute / subscript access on column_obj) see
no behavioural difference. #}
{% macro wrap_column_for_struct_support(column_obj) %}
{%- set name = column_obj.name -%}
{%- if target.type == 'bigquery' and '.' in name -%}
{%- set quoted_segments = [] -%}
{%- for seg in name.split('.') -%}
{%- do quoted_segments.append('`' ~ seg ~ '`') -%}
{%- endfor -%}
{%- set quoted = quoted_segments | join('.') -%}
{%- set safe_alias = name | replace('.', '__') -%}
{%- else -%}
{%- set quoted = column_obj.quoted -%}
{%- set safe_alias = column_obj.column -%}
{%- endif -%}
{# `fields` only exists on BigQueryColumn; guard so non-BigQuery
adapters (Snowflake, Postgres, Redshift, ...) don't trip on the
attribute access. #}
{%- set fields = column_obj.fields if column_obj.fields is defined else [] -%}
{{ return({
'name': name,
'column': column_obj.column,
'quoted': quoted,
'safe_alias': safe_alias,
'dtype': column_obj.dtype,
'data_type': column_obj.data_type,
'fields': fields,
}) }}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{% endmacro %}

{# Walk a BigQuery STRUCT tree and collect dotted leaf names that are safe to
monitor without UNNEST — i.e. no REPEATED ancestor anywhere in the path,
and the leaf itself is not REPEATED. `BigQueryColumn.flatten()` returns leaf
columns with the leaf's own mode but discards ancestor modes, so this walker
is the source of truth for "which leaves can we project directly?". #}
{% macro bq_safe_leaf_names(column_obj) %}
{%- set safe_names = [] -%}
{%- if column_obj.mode != 'REPEATED'
and column_obj.fields is defined
and column_obj.fields | length > 0 -%}
{%- for child in column_obj.fields -%}
{%- do elementary._bq_walk_collect(
child, [column_obj.column], false, safe_names
) -%}
{%- endfor -%}
{%- endif -%}
{{ return(safe_names) }}
{% endmacro %}

{# Recursive helper: walks a google.cloud.bigquery.SchemaField subtree,
propagating whether any ancestor was REPEATED. Append safe leaf names to
`safe_names`. #}
{% macro _bq_walk_collect(field, prefix, has_repeated_ancestor, safe_names) %}
{%- set new_prefix = prefix + [field.name] -%}
{%- if field.fields | length == 0 -%}
{%- if not has_repeated_ancestor and field.mode != 'REPEATED' -%}
{%- do safe_names.append(new_prefix | join('.')) -%}
{%- endif -%}
{%- else -%}
{%- set new_has_repeated = has_repeated_ancestor or (field.mode == 'REPEATED') -%}
{%- for child in field.fields -%}
{%- do elementary._bq_walk_collect(
child, new_prefix, new_has_repeated, safe_names
) -%}
{%- endfor -%}
{%- endif -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
elementary.relation_to_full_name(monitored_table_relation)
) %}
{% set dimensions_string = elementary.join_list(dimensions, "; ") %}

{# Segment-quote dimension expressions for BigQuery so nested struct paths
(e.g. user.address.city) compile correctly. Non-BQ / non-nested: no-op. #}
{% set _sql_dimensions = [] %}
{% for d in dimensions %}
{% do _sql_dimensions.append(elementary.bq_segment_quote(d)) %}
{% endfor %}
{% set concat_dimensions_sql_expression = elementary.list_concat_with_separator(
dimensions, "; "
_sql_dimensions, "; "
) %}

{% set timestamp_column = metric_properties.timestamp_column %}
{%- set data_monitoring_metrics_relation = elementary.get_elementary_relation(
"data_monitoring_metrics"
Expand Down Expand Up @@ -83,25 +91,21 @@
sum(metric_value) as total_metric_value
from all_dimension_metrics
group by dimension_value
{# Remove outdated dimension values (dimensions with all metrics of 0 in the range of the test time) #}
having sum(metric_value) > 0
),

{# Create buckets for each previous dimension value #}
dimensions_buckets as (
select edr_bucket_start, edr_bucket_end, dimension_value
from training_set_dimensions
left join
buckets
on (
buckets.joiner = training_set_dimensions.joiner
{# This makes sure we dont create empty buckets for dimensions before their first appearance #}
and edr_bucket_end >= dimension_min_bucket_end
)
where dimension_value is not null
),

{# Calculating the row count for the value of each dimension #}
row_count_values as (
select
edr_bucket_start,
Expand All @@ -124,8 +128,6 @@
dimension_value
),

{# Merging between the row count and the dimensions buckets #}
{# This way we make sure that if a dimension has no rows, it will get a metric with value 0 #}
fill_empty_buckets_row_count_values as (
select
dimensions_buckets.edr_bucket_start,
Expand All @@ -145,7 +147,6 @@
)
),

{# We union so new buckets added in this run will be included (were filtered by the join we did on 'fill_empty_buckets_row_count_values') #}
union_row_count_values as (
select distinct *
from
Expand Down Expand Up @@ -204,7 +205,6 @@

{% else %}

{# Get all of the dimension anomaly metrics that were created for the test until this run #}
all_dimension_metrics as (
select bucket_end, dimension_value, metric_value
from {{ data_monitoring_metrics_relation }}
Expand All @@ -219,11 +219,9 @@
select distinct dimension_value, sum(metric_value) as total_metric_value
from all_dimension_metrics
group by dimension_value
{# Remove outdated dimension values (dimensions with all metrics of 0 in the range of the test time) #}
having sum(metric_value) > 0
),

{# Calculating the row count for the value of each dimension #}
row_count_values as (
select
{{
Expand All @@ -240,7 +238,6 @@
group by dimension_value
),

{# This way we make sure that if a dimension has no rows, it will get a metric with value 0 #}
fill_empty_dimensions_row_count_values as (
select
{{
Expand All @@ -256,7 +253,6 @@
not in (select distinct dimension_value from row_count_values)
),

{# Union between current row count for each dimension, and the "hydrated" metrics of the test until this run #}
row_count as (
select *
from row_count_values
Expand Down
Loading