Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions integration_tests/tests/test_dimension_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject):
}
for superhero in ["Superman", "Superman", "Superman", "Spiderman"]
]

data += [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
Expand All @@ -98,12 +99,19 @@ def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject):
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "fail"

# Dimension anomalies only stores anomalous rows (unlike other anomaly tests) - so we should only get 1 row with the problematic value
anomaly_test_points = get_latest_anomaly_test_points(dbt_project, test_id)
assert len(anomaly_test_points) == 1
assert anomaly_test_points[0]["is_anomalous"]
assert anomaly_test_points[0]["dimension"] == "superhero"
assert anomaly_test_points[0]["dimension_value"] == "Superman"

# Only dimension values with anomalies are stored in the test points
dimension_values = set([x["dimension_value"] for x in anomaly_test_points])

superman_anomaly_test_points = [
x for x in anomaly_test_points if x["dimension_value"] == "Superman"
]

assert len(dimension_values) == 1
assert "Superman" in dimension_values
assert len(anomaly_test_points) == len(superman_anomaly_test_points)
assert any(x["is_anomalous"] for x in superman_anomaly_test_points)


# Anomalies currently not supported on ClickHouse
Expand Down
34 changes: 17 additions & 17 deletions macros/edr/materializations/test/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@
{% macro handle_dbt_test(flattened_test, materialization_macro) %}
{% set result = materialization_macro() %}
{% set sample_limit = elementary.get_config_var('test_sample_row_count') %}

{% set disable_test_samples = false %}
{% if "meta" in flattened_test and "disable_test_samples" in flattened_test["meta"] %}
{% set disable_test_samples = flattened_test["meta"]["disable_test_samples"] %}
{% endif %}

{% if disable_test_samples %}
{% set sample_limit = 0 %}
{% elif elementary.is_pii_table(flattened_test) %}
{% set sample_limit = 0 %}
{% elif elementary.should_disable_sampling_for_pii(flattened_test) %}
{% set sample_limit = 0 %}
{% endif %}

{% set result_rows = elementary.query_test_result_rows(sample_limit=sample_limit, ignore_passed_tests=true) %}
{% set elementary_test_results_row = elementary.get_dbt_test_result_row(flattened_test, result_rows) %}
{% do elementary.cache_elementary_test_results_rows([elementary_test_results_row]) %}
Expand Down Expand Up @@ -125,7 +125,7 @@
{% do elementary.debug_log("Skipping sample query because the test passed.") %}
{% do return([]) %}
{% endif %}

{% set query %}
with test_results as (
{{ sql }}
Expand All @@ -137,23 +137,23 @@

{% macro get_columns_to_exclude_from_sampling(flattened_test) %}
{% set columns_to_exclude = [] %}

{% if not flattened_test %}
{% do return(columns_to_exclude) %}
{% endif %}

{% if elementary.get_config_var('disable_samples_on_pii_tags') %}
{% set pii_columns = elementary.get_pii_columns_from_parent_model(flattened_test) %}
{% set columns_to_exclude = columns_to_exclude + pii_columns %}
{% endif %}

{% if elementary.is_sampling_disabled_for_column(flattened_test) %}
{% set test_column_name = elementary.insensitive_get_dict_value(flattened_test, 'test_column_name') %}
{% if test_column_name and test_column_name not in columns_to_exclude %}
{% do columns_to_exclude.append(test_column_name) %}
{% endif %}
{% endif %}

{% do return(columns_to_exclude) %}
{% endmacro %}

Expand All @@ -162,48 +162,48 @@
{% if not elementary.get_config_var('disable_samples_on_pii_tags') %}
{% do return(false) %}
{% endif %}

{% set pii_columns = elementary.get_pii_columns_from_parent_model(flattened_test) %}
{% if not pii_columns %}
{% do return(false) %}
{% endif %}

{# Get the compiled test query #}
{% set test_query = elementary.get_compiled_code(flattened_test) %}
{% set test_query_lower = test_query.lower() %}

{# Check if query uses * (select all columns) #}
{# Note: This is intentionally conservative and may over-censor in cases like
{# Note: This is intentionally conservative and may over-censor in cases like
"SELECT * FROM other_table" in CTEs, but it's better to be safe with PII data #}
{% if '*' in test_query_lower %}
{% do return(true) %}
{% endif %}

{# Check if any PII column appears in the test query #}
{% for pii_column in pii_columns %}
{% if pii_column.lower() in test_query_lower %}
{% do return(true) %}
{% endif %}
{% endfor %}

{% do return(false) %}
{% endmacro %}

{% macro is_sampling_disabled_for_column(flattened_test) %}
{% set test_column_name = elementary.insensitive_get_dict_value(flattened_test, 'test_column_name') %}
{% set parent_model_unique_id = elementary.insensitive_get_dict_value(flattened_test, 'parent_model_unique_id') %}

{% if not test_column_name or not parent_model_unique_id %}
{% do return(false) %}
{% endif %}

{% set parent_model = elementary.get_node(parent_model_unique_id) %}
{% if parent_model and parent_model.get('columns') %}
{% set column_config = parent_model.get('columns', {}).get(test_column_name, {}).get('config', {}) %}
{% set disable_test_samples = elementary.safe_get_with_default(column_config, 'disable_test_samples', false) %}
{% do return(disable_test_samples) %}
{% endif %}

{% do return(false) %}
{% endmacro %}

Expand Down
8 changes: 4 additions & 4 deletions macros/edr/tests/test_dimension_anomalies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
detection_period=detection_period,
training_period=training_period,
exclude_final_results=exclude_final_results) %}

{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand Down Expand Up @@ -71,11 +71,11 @@
{{ elementary.test_log('end', full_table_name) }}

{% set flattened_test = elementary.flatten_test(context["model"]) %}
{% set anomalous_rows_sql = elementary.get_anomaly_query(flatten_model) %}
{% set anomalous_dimension_rows_sql = elementary.get_anomaly_query_for_dimension_anomalies(flattened_test) %}
{% do elementary.store_metrics_table_in_cache() %}
{% do elementary.store_anomaly_test_results(flattened_test, anomalous_rows_sql) %}
{% do elementary.store_anomaly_test_results(flattened_test, anomalous_dimension_rows_sql) %}

{{ anomalous_rows_sql }}
{{ elementary.get_anomaly_query(flattened_test) }}

{% else %}

Expand Down
24 changes: 19 additions & 5 deletions macros/edr/tests/test_utils/get_anomaly_query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@
{{- return(query) -}}
{%- endmacro -%}

{%- macro get_anomaly_query_for_dimension_anomalies(flattened_test=none) -%}
{%- set dimension_values_query -%}
select distinct dimension_value from ({{ elementary.get_read_anomaly_scores_query(flattened_test) }}) results
where is_anomalous = true
{%- endset -%}

{% set dimension_anomalies_query -%}
select * from ({{ elementary.get_read_anomaly_scores_query(flattened_test) }}) results
where dimension_value in ({{ dimension_values_query }})
{%- endset -%}

{{- return(dimension_anomalies_query) -}}
{%- endmacro -%}

{% macro get_read_anomaly_scores_query(flattened_test=none) %}
{% if not flattened_test %}
{% set flattened_test = elementary.flatten_test(model) %}
Expand Down Expand Up @@ -71,15 +85,15 @@ case when
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'spike' then
{{ elementary.lag('min_metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
when '{{ test_configuration.anomaly_direction }}' = 'spike' then metric_value
else min_metric_value
else min_metric_value
end as min_value,
case
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' = 'drop' then
{{ elementary.lag('metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
when is_anomalous = TRUE and '{{ test_configuration.anomaly_direction }}' != 'drop' then
{{ elementary.lag('max_metric_value') }} over (partition by full_table_name, column_name, metric_name, dimension, dimension_value, bucket_seasonality order by bucket_end)
when '{{ test_configuration.anomaly_direction }}' = 'drop' then metric_value
else max_metric_value
else max_metric_value
end as max_value,
bucket_start as start_time,
bucket_end as end_time,
Expand Down Expand Up @@ -121,7 +135,7 @@ case when
{% set drop_filter %}
(metric_value < ((1 - {{ drop_failure_percent_threshold }}/100.0) * training_avg))
{% endset %}

{% if spike_failure_percent_threshold and drop_failure_percent_threshold and (anomaly_direction | lower) == 'both' %}
{{ spike_filter }} or {{ drop_filter }}
{% else %}
Expand All @@ -144,7 +158,7 @@ case when

{% macro fail_on_zero(fail_on_zero) %}
(
metric_value = 0 and
metric_value = 0 and
{% if fail_on_zero %}
1 = 1
{% else %}
Expand All @@ -163,7 +177,7 @@ case when
test_configuration.ignore_small_changes.spike_failure_percent_threshold,
test_configuration.ignore_small_changes.drop_failure_percent_threshold,
test_configuration.anomaly_direction
)
)
}}
)
))
Expand Down