Skip to content

Commit

Permalink
Fix the strftime tz argument and make optional (dagster-io#21475)
Browse files Browse the repository at this point in the history
## Summary & Motivation

The check in this method if the timezone name is the same on the datetime as what was passed in was incorrect. It was always false. It's hard to write correctly though honestly. There isn't a method on tzinfo or datetime that exposes the full timezone name that was used to create the `tzinfo`. 

Ideally, strftime shouldn't really be taking a datetime and a tz. Callers should just convert the datetime to the tz before calling the method. Instead of doing that larger refactor, I'm slipping in an optimization: Many of the methods on TimeWindowPartitionDef call this strftime method with timestamps that were generated by the cron iterator. All datetimes from that iterator are already using `self.timezone` for the `tzinfo`. 

## How I Tested These Changes

bk
  • Loading branch information
sbquinlan authored May 10, 2024
1 parent cb2badb commit e075d66
Showing 1 changed file with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,20 @@
from .partition_key_range import PartitionKeyRange


def is_second_ambiguous_time(dt: datetime, tz: str):
def is_second_ambiguous_time(dt: datetime, tz: Optional[str]):
"""Returns if a datetime is the second instance of an ambiguous time in the given timezone due
to DST transitions.
"""
# UTC is never ambiguous
if tz.upper() == "UTC":
if tz is not None and tz.upper() == "UTC":
return False

# Ensure that the datetime is in the correct timezone
tzinfo = check.not_none(dt.tzinfo)
if tzinfo.tzname(None) != tz:

# pendulum has the non-standard "name" attribute on it's tzinfo, check for it
# otherwise there is no standard way of getting the IANA timezone name from datetime/tzinfo
if tz is not None and getattr(tzinfo, "name", None) != tz:
dt = to_timezone(dt, tz)
tzinfo = check.not_none(dt.tzinfo)

Expand Down Expand Up @@ -109,24 +112,24 @@ def dst_safe_fmt(fmt: str) -> str:
return fmt + "%z"


def dst_safe_strftime(dt: datetime, tz: str, fmt: str, cron_schedule: str) -> str:
def dst_safe_strftime(dt: datetime, tz: Optional[str], fmt: str, cron_schedule: str) -> str:
"""A method for converting a datetime to a string which will append a suffix in cases where
the resulting timestamp would be ambiguous due to DST transitions.
"""
time_str = dt.strftime(fmt)
tz is Optional. None means use the timezone on the datetime object
"""
# if the format already includes a UTC offset, then we don't need to do anything
if fmt == dst_safe_fmt(fmt):
return time_str
if "%z" in fmt:
return dt.strftime(fmt)

# only need to handle ambiguous times for cron schedules which repeat every hour
if not cron_string_repeats_every_hour(cron_schedule):
return time_str
return dt.strftime(fmt)

# if the datetime is the second instance of an ambiguous time, then we append the UTC offset
if is_second_ambiguous_time(dt, tz):
return dt.strftime(dst_safe_fmt(fmt))
return time_str
return dt.strftime(fmt)


def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
Expand Down Expand Up @@ -422,10 +425,9 @@ def get_partition_keys_between_indexes(
or partitions_past_current_time < self.end_offset
):
if idx >= start_idx and idx < end_idx:
# datetimes from _iterate_time_windows have the correct tz so use None as a optimization
partition_keys.append(
dst_safe_strftime(
time_window.start, self.timezone, self.fmt, self.cron_schedule
)
dst_safe_strftime(time_window.start, None, self.fmt, self.cron_schedule)
)
if time_window.end.timestamp() > current_timestamp:
partitions_past_current_time += 1
Expand Down Expand Up @@ -455,10 +457,9 @@ def get_partition_keys(
time_window.end.timestamp() <= current_timestamp
or partitions_past_current_time < self.end_offset
):
# datetimes from _iterate_time_windows have the correct tz so use None as a optimization
partition_keys.append(
dst_safe_strftime(
time_window.start, self.timezone, self.fmt, self.cron_schedule
)
dst_safe_strftime(time_window.start, None, self.fmt, self.cron_schedule)
)

if time_window.end.timestamp() > current_timestamp:
Expand Down Expand Up @@ -529,7 +530,7 @@ def time_windows_for_partition_keys(
for partition_key in sorted_pks:
next_window = next(cur_windows_iterator)
if (
dst_safe_strftime(next_window.start, self.timezone, self.fmt, self.cron_schedule)
dst_safe_strftime(next_window.start, None, self.fmt, self.cron_schedule)
== partition_key
):
partition_key_time_windows.append(next_window)
Expand Down Expand Up @@ -585,7 +586,7 @@ def get_next_partition_key(
if start_time.timestamp() >= last_partition_window.end.timestamp():
return None
else:
return dst_safe_strftime(start_time, self.timezone, self.fmt, self.cron_schedule)
return dst_safe_strftime(start_time, None, self.fmt, self.cron_schedule)

def get_next_partition_window(
self, end_dt: datetime, current_time: Optional[datetime] = None, respect_bounds: bool = True
Expand Down

0 comments on commit e075d66

Please sign in to comment.