Skip to content

Commit

Permalink
[FLINK-22152][python] Fix the bug of same timers are registered multi…
Browse files Browse the repository at this point in the history
…ple times

This closes apache#15575.
  • Loading branch information
HuangXingBo committed Apr 13, 2021
1 parent 92fbe7f commit 2d2a687
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ cdef class GroupWindowAggFunctionBase:
cdef list timers
cdef object timer
timers = []
for timer in self._internal_timer_service.timers:
for timer in self._internal_timer_service.timers.keys():
timers.append(timer)
self._internal_timer_service.timers.clear()
return timers

cpdef void close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,8 @@ def on_processing_time(self, timer: InternalTimer):
return result

def get_timers(self):
timers = []
for timer in self._internal_timer_service.timers:
timers.append(timer)
return timers
yield from self._internal_timer_service.timers.keys()
self._internal_timer_service.timers.clear()

def to_utc_timestamp_mills(self, epoch_mills):
if self._shift_timezone == "UTC":
Expand Down
33 changes: 21 additions & 12 deletions flink-python/pyflink/fn_execution/timerservice_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import collections
import time
from enum import Enum

from typing import List, Tuple

from pyflink.datastream import TimerService
from pyflink.datastream.timerservice import InternalTimer, K, N, InternalTimerService
from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
Expand All @@ -42,6 +40,16 @@ def get_key(self) -> K:
def get_namespace(self) -> N:
return self._namespace

def __hash__(self):
result = int(self._timestamp ^ (self._timestamp >> 32))
result = 31 * result + hash(tuple(self._key))
result = 31 * result + hash(self._namespace)
return result

def __eq__(self, other):
return self.__class__ == other.__class__ and self._timestamp == other._timestamp \
and self._key == other._key and self._namespace == other._namespace


class TimerOperandType(Enum):
REGISTER_EVENT_TIMER = 0
Expand All @@ -58,7 +66,7 @@ class InternalTimerServiceImpl(InternalTimerService[N]):
def __init__(self, keyed_state_backend: RemoteKeyedStateBackend):
self._keyed_state_backend = keyed_state_backend
self._current_watermark = None
self.timers = [] # type: List[Tuple[TimerOperandType, InternalTimer]]
self.timers = collections.OrderedDict()

def current_processing_time(self):
return int(time.time() * 1000)
Expand All @@ -71,23 +79,24 @@ def advance_watermark(self, watermark: int):

def register_processing_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
self.timers.append(
(TimerOperandType.REGISTER_PROC_TIMER, InternalTimerImpl(t, current_key, namespace)))
timer = (TimerOperandType.REGISTER_PROC_TIMER, InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None

def register_event_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
self.timers.append(
(TimerOperandType.REGISTER_EVENT_TIMER, InternalTimerImpl(t, current_key, namespace)))
timer = (TimerOperandType.REGISTER_EVENT_TIMER,
InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None

def delete_processing_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
self.timers.append(
(TimerOperandType.DELETE_PROC_TIMER, InternalTimerImpl(t, current_key, namespace)))
timer = (TimerOperandType.DELETE_PROC_TIMER, InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None

def delete_event_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
self.timers.append(
(TimerOperandType.DELETE_EVENT_TIMER, InternalTimerImpl(t, current_key, namespace)))
timer = (TimerOperandType.DELETE_EVENT_TIMER, InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None


class TimerServiceImpl(TimerService):
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/fn_execution/utils/input_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _emit_output(self, output_result):
if output_result:
yield from self._output_factory.from_normal_data(output_result)

yield from self._output_factory.from_timers(self._internal_timer_service.timers)
yield from self._output_factory.from_timers(self._internal_timer_service.timers.keys())

self._internal_timer_service.timers.clear()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
Expand Down Expand Up @@ -100,6 +101,9 @@ public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
*/
@VisibleForTesting final long allowedLateness;

/** The shift timeZone of the window. */
@VisibleForTesting final ZoneId shiftTimeZone;

/** The Infos of the Window. */
private FlinkFnApi.GroupWindow.WindowProperty[] namedProperties;

Expand All @@ -124,9 +128,6 @@ public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
/** Session Window gap. */
private long gap;

/** The shift timeZone of the window. */
protected final ZoneId shiftTimeZone;

/** For serializing the window in checkpoints. */
@VisibleForTesting transient TypeSerializer<W> windowSerializer;

Expand All @@ -143,6 +144,8 @@ public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>

private transient UpdatableRowData reuseTimerRowData;

private transient RowDataSerializer keySerializer;

public PythonStreamGroupWindowAggregateOperator(
Configuration config,
RowType inputType,
Expand Down Expand Up @@ -194,6 +197,7 @@ public void open() throws Exception {
reuseTimerData = new UpdatableRowData(GenericRowData.of(0, null, 0), 3);
reuseTimerRowData.setField(4, reuseTimerData);
keyLength = getKeyType().getFieldCount();
keySerializer = (RowDataSerializer) getKeySerializer();
super.open();
}

Expand All @@ -206,7 +210,6 @@ public void processElementInternal(RowData value) throws Exception {
baos.reset();
}

@SuppressWarnings("unchecked")
@Override
public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
byte[] rawUdfResult = resultTuple.f0;
Expand Down Expand Up @@ -241,8 +244,9 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
byte[] encodedNamespace = timerData.getBinary(3);
bais.setBuffer(encodedNamespace, 0, encodedNamespace.length);
window = windowSerializer.deserialize(baisWrapper);
BinaryRowData rowKey = keySerializer.toBinaryRow(key).copy();
synchronized (getKeyedStateBackend()) {
setCurrentKey(((RowDataSerializer) getKeySerializer()).toBinaryRow(key));
setCurrentKey(rowKey);

if (timerOperandType == REGISTER_EVENT_TIMER) {
internalTimerService.registerEventTimeTimer(
Expand All @@ -264,10 +268,6 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
}
}

protected long getShiftEpochMills(long utcTimestampMills) {
return TimeWindowUtil.toEpochMills(utcTimestampMills, shiftTimeZone);
}

@Override
public String getFunctionUrn() {
return STREAM_GROUP_WINDOW_AGGREGATE_URN;
Expand Down Expand Up @@ -348,6 +348,11 @@ public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
emitTriggerTimerData(timer, REGISTER_PROCESSING_TIMER);
}

@VisibleForTesting
long getShiftEpochMills(long utcTimestampMills) {
return TimeWindowUtil.toEpochMills(utcTimestampMills, shiftTimeZone);
}

private void buildWindow(LogicalWindow window, PlannerNamedWindowProperty[] namedProperties) {
ValueLiteralExpression size = null;
ValueLiteralExpression slide = null;
Expand Down

0 comments on commit 2d2a687

Please sign in to comment.