From d45a775db6b003749ae355790130c7e13f589390 Mon Sep 17 00:00:00 2001 From: Thomas Langton <155970791+tlangton3@users.noreply.github.com> Date: Fri, 22 May 2026 11:33:26 +0100 Subject: [PATCH 1/2] Support BigQuery nested STRUCT fields in anomaly tests Allows column_anomalies and dimension_anomalies to reference nested STRUCT leaves on BigQuery (e.g. user.address.city) instead of only top-level columns. A single column-discovery wrapper segment-quotes nested references (`a`.`b`.`c`) and projects the monitored column with a dot-free CTE alias so the path survives into downstream aggregates. Non-nested columns and non-BigQuery adapters are byte-equivalent to today's behaviour. REPEATED ancestors are out of scope (would require UNNEST). test_all_columns_anomalies is unchanged - users opt in by passing column_name=user.address.city explicitly to avoid ballooning the test surface on wide STRUCT schemas. --- .../get_column_monitors.sql | 27 ++++-- .../column_monitoring_query.sql | 85 +++++++++++++++++-- .../dimension_monitoring_query.sql | 22 ++--- 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql b/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql index c32dcf520..7403cac78 100644 --- a/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql +++ b/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql @@ -1,13 +1,29 @@ {% macro get_column_obj_and_monitors(model_relation, column_name, monitors=none) %} - {% set column_obj_and_monitors = [] %} {% set column_objects = adapter.get_columns_in_relation(model_relation) %} + + {% if target.type == 'bigquery' %} + {% set expanded = [] %} + {% for col in column_objects %} + {% do expanded.append(col) %} + {% if col.fields | length > 0 %} + {% for leaf in col.flatten() %} + {% if leaf.mode != 'REPEATED' %} + {% do expanded.append(leaf) %} + {% endif %} + {% endfor %} + {% endif %} + {% endfor %} + {% set column_objects = expanded %} + {% endif %} + {% for column_obj in column_objects %} {% if column_obj.name.strip('"') | lower == column_name.strip('"') | lower %} + {% set wrapped = elementary.wrap_column_for_struct_support(column_obj) %} {% set column_monitors = elementary.column_monitors_by_type( - elementary.get_column_data_type(column_obj), monitors + elementary.get_column_data_type(wrapped), monitors ) %} - {% set column_item = {"column": column_obj, "monitors": column_monitors} %} + {% set column_item = {"column": wrapped, "monitors": column_monitors} %} {{ return(column_item) }} {% endif %} {% endfor %} @@ -22,10 +38,11 @@ {% set column_objects = adapter.get_columns_in_relation(model_relation) %} {% for column_obj in column_objects %} + {% set wrapped = elementary.wrap_column_for_struct_support(column_obj) %} {% set column_monitors = elementary.column_monitors_by_type( - elementary.get_column_data_type(column_obj), monitors + elementary.get_column_data_type(wrapped), monitors ) %} - {% set column_item = {"column": column_obj, "monitors": column_monitors} %} + {% set column_item = {"column": wrapped, "monitors": column_monitors} %} {% do column_obj_and_monitors.append(column_item) %} {% endfor %} diff --git a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql index ecd1d3564..e9f26a34c 100644 --- a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql @@ -15,7 +15,9 @@ {%- set timestamp_column = metric_properties.timestamp_column %} {% set prefixed_dimensions = [] %} {% for dimension_column in dimensions %} - {% do prefixed_dimensions.append("dimension_" ~ dimension_column) %} + {% do prefixed_dimensions.append( + "dimension_" ~ elementary.bq_safe_alias(dimension_column) + ) %} {% endfor %} {% set metric_types = [] %} @@ -53,7 +55,7 @@ ), filtered_monitored_table as ( select - {{ column_obj.quoted }}, + {{ column_obj.quoted }} as {{ column_obj.safe_alias }}, {%- if dimensions -%} {{ elementary.select_dimensions_columns( @@ -78,7 +80,7 @@ {%- else %} filtered_monitored_table as ( select - {{ column_obj.quoted }}, + {{ column_obj.quoted }} as {{ column_obj.safe_alias }}, {%- if dimensions -%} {{ elementary.select_dimensions_columns( @@ -94,7 +96,7 @@ column_metrics as ( {%- if column_metrics %} - {%- set column = column_obj.quoted -%} + {%- set column = column_obj.safe_alias -%} select {%- if timestamp_column %} edr_bucket_start as bucket_start, edr_bucket_end as bucket_end, @@ -341,17 +343,82 @@ {% endif %} {% endmacro %} +{# Updated to segment-quote nested dimensions on BigQuery and sanitise the + alias suffix. Backward compatible for non-nested columns and non-BQ adapters. #} {% macro select_dimensions_columns(dimension_columns, as_prefix="") %} {% set select_statements %} {%- for column in dimension_columns -%} - {{ column }} {%- if as_prefix -%} - {{ " as " ~ as_prefix ~ "_" ~ column }} - {%- endif -%} - {%- if not loop.last -%} - {{ ", " }} + {%- set _is_nested_bq = (target.type == 'bigquery' and '.' in column) -%} + {%- set _source = elementary.bq_segment_quote(column) if _is_nested_bq else column -%} + {%- set _alias_suffix = elementary.bq_safe_alias(column) if _is_nested_bq else column -%} + {{ _source }}{{ " as " ~ as_prefix ~ "_" ~ _alias_suffix }} + {%- else -%} + {{ column }} {%- endif -%} + {%- if not loop.last -%}{{ ", " }}{%- endif -%} {%- endfor -%} {% endset %} {{ return(select_statements) }} {% endmacro %} + + +{# ---------------------------------------------------------------------- #} +{# BigQuery STRUCT nested-field helpers. #} +{# ---------------------------------------------------------------------- #} + +{# Segment-quote a (possibly dotted) identifier for BigQuery. + Returns ``.``.`` for dotted paths, `` otherwise. + For non-BigQuery adapters, returns the name unchanged (preserves existing + behaviour at all callsites). #} +{% macro bq_segment_quote(name) %} + {%- if target.type == 'bigquery' -%} + {%- if '.' in name -%} + {%- set parts = [] -%} + {%- for seg in name.split('.') -%} + {%- do parts.append('`' ~ seg ~ '`') -%} + {%- endfor -%} + {{ parts | join('.') }} + {%- else -%} + `{{ name }}` + {%- endif -%} + {%- else -%} + {{ name }} + {%- endif -%} +{% endmacro %} + +{# Convert a (possibly dotted) identifier into a dot-free alias safe to use + as a SQL identifier. No-op for names without dots. #} +{% macro bq_safe_alias(name) %} + {{- name | replace('.', '__') -}} +{% endmacro %} + +{# Wrap a Column / BigQueryColumn with a dict carrying both the SQL identifier + representation (.quoted, segment-quoted for nested) and a CTE-projection-safe + alias (.safe_alias, dot-free). For non-nested columns and non-BigQuery + adapters the wrapper mirrors the original Column's values, so downstream + consumers (which use only attribute / subscript access on column_obj) see + no behavioural difference. #} +{% macro wrap_column_for_struct_support(column_obj) %} + {%- set name = column_obj.name -%} + {%- if target.type == 'bigquery' and '.' in name -%} + {%- set quoted_segments = [] -%} + {%- for seg in name.split('.') -%} + {%- do quoted_segments.append('`' ~ seg ~ '`') -%} + {%- endfor -%} + {%- set quoted = quoted_segments | join('.') -%} + {%- set safe_alias = name | replace('.', '__') -%} + {%- else -%} + {%- set quoted = column_obj.quoted -%} + {%- set safe_alias = column_obj.column -%} + {%- endif -%} + {{ return({ + 'name': name, + 'column': column_obj.column, + 'quoted': quoted, + 'safe_alias': safe_alias, + 'dtype': column_obj.dtype, + 'data_type': column_obj.data_type, + 'fields': column_obj.fields, + }) }} +{% endmacro %} diff --git a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql index 3aa20fe19..2f7fe294b 100644 --- a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql @@ -12,9 +12,17 @@ elementary.relation_to_full_name(monitored_table_relation) ) %} {% set dimensions_string = elementary.join_list(dimensions, "; ") %} + + {# Segment-quote dimension expressions for BigQuery so nested struct paths + (e.g. user.address.city) compile correctly. Non-BQ / non-nested: no-op. #} + {% set _sql_dimensions = [] %} + {% for d in dimensions %} + {% do _sql_dimensions.append(elementary.bq_segment_quote(d)) %} + {% endfor %} {% set concat_dimensions_sql_expression = elementary.list_concat_with_separator( - dimensions, "; " + _sql_dimensions, "; " ) %} + {% set timestamp_column = metric_properties.timestamp_column %} {%- set data_monitoring_metrics_relation = elementary.get_elementary_relation( "data_monitoring_metrics" @@ -83,11 +91,9 @@ sum(metric_value) as total_metric_value from all_dimension_metrics group by dimension_value - {# Remove outdated dimension values (dimensions with all metrics of 0 in the range of the test time) #} having sum(metric_value) > 0 ), - {# Create buckets for each previous dimension value #} dimensions_buckets as ( select edr_bucket_start, edr_bucket_end, dimension_value from training_set_dimensions @@ -95,13 +101,11 @@ buckets on ( buckets.joiner = training_set_dimensions.joiner - {# This makes sure we dont create empty buckets for dimensions before their first appearance #} and edr_bucket_end >= dimension_min_bucket_end ) where dimension_value is not null ), - {# Calculating the row count for the value of each dimension #} row_count_values as ( select edr_bucket_start, @@ -124,8 +128,6 @@ dimension_value ), - {# Merging between the row count and the dimensions buckets #} - {# This way we make sure that if a dimension has no rows, it will get a metric with value 0 #} fill_empty_buckets_row_count_values as ( select dimensions_buckets.edr_bucket_start, @@ -145,7 +147,6 @@ ) ), - {# We union so new buckets added in this run will be included (were filtered by the join we did on 'fill_empty_buckets_row_count_values') #} union_row_count_values as ( select distinct * from @@ -204,7 +205,6 @@ {% else %} - {# Get all of the dimension anomaly metrics that were created for the test until this run #} all_dimension_metrics as ( select bucket_end, dimension_value, metric_value from {{ data_monitoring_metrics_relation }} @@ -219,11 +219,9 @@ select distinct dimension_value, sum(metric_value) as total_metric_value from all_dimension_metrics group by dimension_value - {# Remove outdated dimension values (dimensions with all metrics of 0 in the range of the test time) #} having sum(metric_value) > 0 ), - {# Calculating the row count for the value of each dimension #} row_count_values as ( select {{ @@ -240,7 +238,6 @@ group by dimension_value ), - {# This way we make sure that if a dimension has no rows, it will get a metric with value 0 #} fill_empty_dimensions_row_count_values as ( select {{ @@ -256,7 +253,6 @@ not in (select distinct dimension_value from row_count_values) ), - {# Union between current row count for each dimension, and the "hydrated" metrics of the test until this run #} row_count as ( select * from row_count_values From 8c7b36ef29da05a21ada97a5efab4a188d78cc34 Mon Sep 17 00:00:00 2001 From: Thomas Langton <155970791+tlangton3@users.noreply.github.com> Date: Fri, 22 May 2026 15:14:41 +0100 Subject: [PATCH 2/2] Fix REPEATED ancestor handling and non-BigQuery wrapper safety Address CodeRabbit findings: 1. `BigQueryColumn.flatten()` discards ancestor modes, so a NULLABLE leaf under a REPEATED ancestor still satisfied the previous `leaf.mode != 'REPEATED'` filter. Add `bq_safe_leaf_names` + `_bq_walk_collect`, an ancestor-aware walker that returns only leaves with no REPEATED ancestor in their path. Filter `flatten()` output against this set. 2. `wrap_column_for_struct_support` unconditionally read `column_obj.fields`, which raised on non-BigQuery adapters (base `Column` lacks `fields`). Guard with `column_obj.fields is defined` and default to an empty list, so the wrapper is safe on Snowflake, Postgres, Redshift, etc. --- .../get_column_monitors.sql | 8 +++- .../column_monitoring_query.sql | 44 ++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql b/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql index 7403cac78..ed5efe49f 100644 --- a/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql +++ b/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql @@ -7,8 +7,14 @@ {% for col in column_objects %} {% do expanded.append(col) %} {% if col.fields | length > 0 %} + {# `BigQueryColumn.flatten()` discards ancestor modes, so a + NULLABLE leaf under a REPEATED ancestor still satisfies + `leaf.mode != 'REPEATED'`. Build the set of safe leaf names + via an ancestor-aware walker and filter `flatten()` against + it. #} + {% set safe_names = elementary.bq_safe_leaf_names(col) %} {% for leaf in col.flatten() %} - {% if leaf.mode != 'REPEATED' %} + {% if leaf.name in safe_names %} {% do expanded.append(leaf) %} {% endif %} {% endfor %} diff --git a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql index e9f26a34c..e2c1696d3 100644 --- a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql @@ -412,6 +412,10 @@ {%- set quoted = column_obj.quoted -%} {%- set safe_alias = column_obj.column -%} {%- endif -%} + {# `fields` only exists on BigQueryColumn; guard so non-BigQuery + adapters (Snowflake, Postgres, Redshift, ...) don't trip on the + attribute access. #} + {%- set fields = column_obj.fields if column_obj.fields is defined else [] -%} {{ return({ 'name': name, 'column': column_obj.column, @@ -419,6 +423,44 @@ 'safe_alias': safe_alias, 'dtype': column_obj.dtype, 'data_type': column_obj.data_type, - 'fields': column_obj.fields, + 'fields': fields, }) }} {% endmacro %} + +{# Walk a BigQuery STRUCT tree and collect dotted leaf names that are safe to + monitor without UNNEST — i.e. no REPEATED ancestor anywhere in the path, + and the leaf itself is not REPEATED. `BigQueryColumn.flatten()` returns leaf + columns with the leaf's own mode but discards ancestor modes, so this walker + is the source of truth for "which leaves can we project directly?". #} +{% macro bq_safe_leaf_names(column_obj) %} + {%- set safe_names = [] -%} + {%- if column_obj.mode != 'REPEATED' + and column_obj.fields is defined + and column_obj.fields | length > 0 -%} + {%- for child in column_obj.fields -%} + {%- do elementary._bq_walk_collect( + child, [column_obj.column], false, safe_names + ) -%} + {%- endfor -%} + {%- endif -%} + {{ return(safe_names) }} +{% endmacro %} + +{# Recursive helper: walks a google.cloud.bigquery.SchemaField subtree, + propagating whether any ancestor was REPEATED. Append safe leaf names to + `safe_names`. #} +{% macro _bq_walk_collect(field, prefix, has_repeated_ancestor, safe_names) %} + {%- set new_prefix = prefix + [field.name] -%} + {%- if field.fields | length == 0 -%} + {%- if not has_repeated_ancestor and field.mode != 'REPEATED' -%} + {%- do safe_names.append(new_prefix | join('.')) -%} + {%- endif -%} + {%- else -%} + {%- set new_has_repeated = has_repeated_ancestor or (field.mode == 'REPEATED') -%} + {%- for child in field.fields -%} + {%- do elementary._bq_walk_collect( + child, new_prefix, new_has_repeated, safe_names + ) -%} + {%- endfor -%} + {%- endif -%} +{% endmacro %}