Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented get_name in StatsLogger, updated Otel and StatsD #43340

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor: remove redundant checks, lift OTel length restriction, and …
…apply minor refinements
  • Loading branch information
ArshiaZr committed Nov 14, 2024
commit ed094ad283b072da50db4d7d9d01207c332cbed9
22 changes: 10 additions & 12 deletions airflow/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,26 @@
)


ferruzzi marked this conversation as resolved.
Show resolved Hide resolved
def prepare_stat_with_tags(fn: T) -> T:
def prepare_metric_name_with_tags(fn: T) -> T:
"""Prepare tags and stat."""

@wraps(fn)
def wrapper(self, stat: str | None = None, *args, tags: dict[str, str] | None = None, **kwargs):
stat = stat or ""

def wrapper(self, metric_name: str | None = None, *args, tags: dict[str, str] | None = None, **kwargs):
if tags and self.metrics_tags:
valid_tags: dict[str, str] = {}
for k, v in tags.items():
if self.metric_tags_validator.test(k):
if all(c not in [",", "="] for c in f"{v}{k}"):
if ":" not in f"{v}{k}":
valid_tags[k] = v
else:
log.error("Dropping invalid tag: %s=%s.", k, v)
log.error("Dropping invalid tag: %s:%s.", k, v)
tags_list = [f"{key}:{value}" for key, value in valid_tags.items()]
else:
tags_list = []

kwargs["tags"] = tags_list

return fn(self, stat, *args, **kwargs)
return fn(self, metric_name, *args, **kwargs)

return cast(T, wrapper)

Expand All @@ -96,7 +94,7 @@ def __init__(
self.metrics_tags = metrics_tags
self.metric_tags_validator = metric_tags_validator

@prepare_stat_with_tags
@prepare_metric_name_with_tags
@validate_stat
def incr(
self,
Expand All @@ -112,7 +110,7 @@ def incr(
return self.dogstatsd.increment(metric=full_metric_name, value=count, tags=tags, sample_rate=rate)
return None

@prepare_stat_with_tags
@prepare_metric_name_with_tags
@validate_stat
def decr(
self,
Expand All @@ -128,7 +126,7 @@ def decr(
return self.dogstatsd.decrement(metric=full_metric_name, value=count, tags=tags, sample_rate=rate)
return None

@prepare_stat_with_tags
@prepare_metric_name_with_tags
@validate_stat
def gauge(
self,
Expand All @@ -145,7 +143,7 @@ def gauge(
return self.dogstatsd.gauge(metric=full_metric_name, value=value, tags=tags, sample_rate=rate)
return None

@prepare_stat_with_tags
@prepare_metric_name_with_tags
@validate_stat
def timing(
self,
Expand All @@ -166,7 +164,7 @@ def timing(
return self.dogstatsd.timing(metric=full_metric_name, value=dt, tags=tags)
return None

@prepare_stat_with_tags
@prepare_metric_name_with_tags
@validate_stat
def timer(
self,
Expand Down
2 changes: 0 additions & 2 deletions airflow/metrics/statsd_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ def prepare_metric_name_with_tags(fn: T) -> T:
def wrapper(
self, metric_name: str | None = None, tags: dict[str, str] | None = None
ArshiaZr marked this conversation as resolved.
Show resolved Hide resolved
ArshiaZr marked this conversation as resolved.
Show resolved Hide resolved
) -> Callable[[str], str]:
metric_name = metric_name or ""

if self.influxdb_tags_enabled and tags:
valid_tags: dict[str, str] = {}

Expand Down
17 changes: 1 addition & 16 deletions airflow/metrics/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,11 @@ def stat_name_otel_handler(
if not (isinstance(stat_name, str) and isinstance(stat_prefix, str)):
raise InvalidStatsNameException("Stat name and prefix must both be strings.")

if len(proposed_stat_name) > OTEL_NAME_MAX_LENGTH:
# If the name is in the exceptions list, do not fail it for being too long.
# It may still be deemed invalid for other reasons below.
for exemption in BACK_COMPAT_METRIC_NAMES:
Copy link
Contributor

@ferruzzi ferruzzi Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE is currently borked so I can't double check this easily; is BACK_COMPAT_METRIC_NAMES used anywhere else? It's possible that can also be pruned out now if it was only ever used as a work-around for the name length bug.

if re2.match(exemption, stat_name):
# There is a back-compat exception for this name; proceed
name_length_exemption = True
matched_exemption = exemption.pattern
break
else:
raise InvalidStatsNameException(
f"Invalid stat name: {proposed_stat_name}. Please see "
f"https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax"
)

# `stat_name_default_handler` throws InvalidStatsNameException if the
# provided value is not valid or returns the value if it is. We don't
# need the return value but will make use of the validation checks. If
# no exception is thrown, then the proposed name meets OTel requirements.
stat_name_default_handler(proposed_stat_name, max_length=999 if name_length_exemption else max_length)
stat_name_default_handler(proposed_stat_name, max_length=999)

# This warning is down here instead of up above because the exemption only
# applies to the length and a name may still be invalid for other reasons.
Expand Down
12 changes: 2 additions & 10 deletions tests/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
_is_up_down_counter,
full_name,
)
from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES, MetricNameLengthExemptionWarning
from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES

INVALID_STAT_NAME_CASES = [
(None, "can not be None"),
(42, "is not a string"),
("X" * OTEL_NAME_MAX_LENGTH, "too long"),
("test/$tats", "contains invalid characters"),
]

Expand Down Expand Up @@ -100,8 +99,7 @@ def test_old_name_exception_works(self, caplog):
name = "task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit"
assert len(name) > OTEL_NAME_MAX_LENGTH

with pytest.warns(MetricNameLengthExemptionWarning):
self.stats.incr(name)
self.stats.incr(name)

self.meter.get_meter().create_counter.assert_called_once_with(
name=(full_name(name)[:OTEL_NAME_MAX_LENGTH])
Expand Down Expand Up @@ -388,12 +386,6 @@ def test_get_name_invalid_cases(self):
with pytest.raises(InvalidStatsNameException):
self.stats.get_name(invalid_name)

def test_get_name_too_long(self):
# Edge case: Name exceeds max length
long_name = "a" * (OTEL_NAME_MAX_LENGTH + 1)
with pytest.raises(InvalidStatsNameException, match="Invalid stat name.*Please see"):
self.stats.get_name(long_name)

def test_get_name_special_characters(self):
# Edge case: Name contains invalid special characters
invalid_name = "invalid@name!"
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def test_increment_counter(self):
def test_increment_counter_with_tags(self):
self.stats.incr(
"test_stats_run.delay",
tags={"key0": "0", "key1": "val1", "key2": "val2"},
tags={"key0": 0, "key1": "val1", "key2": "val2"},
)
self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key0=0,key1=val1", 1, 1)

Expand Down
Loading