Skip to content

Commit

Permalink
[FLINK-23165][python] Add StreamExecutionEnvironment#registerSlotShar…
Browse files Browse the repository at this point in the history
…ingGroup to PyFlink and Scala

This closes apache#16405.
  • Loading branch information
SteNicholas authored and HuangXingBo committed Jul 7, 2021
1 parent 4e9fa33 commit 4bcdfa2
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 16 deletions.
4 changes: 3 additions & 1 deletion flink-python/pyflink/datastream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
CoFlatMapFunction, ReduceFunction, RuntimeContext,
KeySelector, FilterFunction, Partitioner, SourceFunction,
SinkFunction)
from pyflink.datastream.slot_sharing_group import SlotSharingGroup
from pyflink.datastream.state_backend import (StateBackend, MemoryStateBackend, FsStateBackend,
RocksDBStateBackend, CustomStateBackend,
PredefinedOptions, HashMapStateBackend,
Expand Down Expand Up @@ -127,5 +128,6 @@
'WindowAssigner',
'MergingWindowAssigner',
'TriggerResult',
'Trigger'
'Trigger',
'SlotSharingGroup'
]
25 changes: 18 additions & 7 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Callable, Union, List, cast

from pyflink.common import typeinfo, ExecutionConfig, Row
from pyflink.datastream.slot_sharing_group import SlotSharingGroup
from pyflink.datastream.window import TimeWindowSerializer, CountWindowSerializer, WindowAssigner, \
Trigger, WindowOperationDescriptor
from pyflink.common.typeinfo import RowTypeInfo, Types, TypeInformation, _from_java_type
Expand Down Expand Up @@ -207,7 +208,7 @@ def disable_chaining(self) -> 'DataStream':
self._j_data_stream.disableChaining()
return self

def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStream':
def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) -> 'DataStream':
"""
Sets the slot sharing group of this operation. Parallel instances of operations that are in
the same slot sharing group will be co-located in the same TaskManager slot, if possible.
Expand All @@ -218,10 +219,14 @@ def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStream':
Initially an operation is in the default slot sharing group. An operation can be put into
the default group explicitly by setting the slot sharing group to 'default'.
:param slot_sharing_group: The slot sharing group name.
:param slot_sharing_group: The slot sharing group name or which contains name and its
resource spec.
:return: This operator.
"""
self._j_data_stream.slotSharingGroup(slot_sharing_group)
if isinstance(slot_sharing_group, SlotSharingGroup):
self._j_data_stream.slotSharingGroup(slot_sharing_group.get_java_slot_sharing_group())
else:
self._j_data_stream.slotSharingGroup(slot_sharing_group)
return self

def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \
Expand Down Expand Up @@ -784,7 +789,8 @@ def disable_chaining(self) -> 'DataStreamSink':
self._j_data_stream_sink.disableChaining()
return self

def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStreamSink':
def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) \
-> 'DataStreamSink':
"""
Sets the slot sharing group of this operation. Parallel instances of operations that are in
the same slot sharing group will be co-located in the same TaskManager slot, if possible.
Expand All @@ -795,10 +801,15 @@ def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStreamSink':
Initially an operation is in the default slot sharing group. An operation can be put into
the default group explicitly by setting the slot sharing group to 'default'.
:param slot_sharing_group: The slot sharing group name.
:param slot_sharing_group: The slot sharing group name or which contains name and its
resource spec.
:return: This operator.
"""
self._j_data_stream_sink.slotSharingGroup(slot_sharing_group)
if isinstance(slot_sharing_group, SlotSharingGroup):
self._j_data_stream_sink.slotSharingGroup(
slot_sharing_group.get_java_slot_sharing_group())
else:
self._j_data_stream_sink.slotSharingGroup(slot_sharing_group)
return self


Expand Down Expand Up @@ -1109,7 +1120,7 @@ def start_new_chain(self) -> 'DataStream':
def disable_chaining(self) -> 'DataStream':
raise Exception("Disable chaining for KeyedStream is not supported.")

def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStream':
def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) -> 'DataStream':
raise Exception("Setting slot sharing group for KeyedStream is not supported.")


Expand Down
293 changes: 293 additions & 0 deletions flink-python/pyflink/datastream/slot_sharing_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

__all__ = ['MemorySize', 'SlotSharingGroup']

from typing import Optional

from pyflink.java_gateway import get_gateway


class MemorySize(object):
"""
MemorySize is a representation of a number of bytes, viewable in different units.
"""

def __init__(self, j_memory_size=None, bytes_size: int = None):
self._j_memory_size = get_gateway().jvm \
.org.apache.flink.configuration.MemorySize(bytes_size) \
if j_memory_size is None else j_memory_size

@staticmethod
def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize':
return MemorySize(
get_gateway().jvm.org.apache.flink.configuration.MemorySize.ofMebiBytes(mebi_bytes))

def get_bytes(self) -> int:
"""
Gets the memory size in bytes.
:return: The memory size in bytes.
"""
return self._j_memory_size.getBytes()

def get_kibi_bytes(self) -> int:
"""
Gets the memory size in Kibibytes (= 1024 bytes).
:return: The memory size in Kibibytes.
"""
return self._j_memory_size.getKibiBytes()

def get_mebi_bytes(self) -> int:
"""
Gets the memory size in Mebibytes (= 1024 Kibibytes).
:return: The memory size in Mebibytes.
"""
return self._j_memory_size.getMebiBytes()

def get_gibi_bytes(self) -> int:
"""
Gets the memory size in Gibibytes (= 1024 Mebibytes).
:return: The memory size in Gibibytes.
"""
return self._j_memory_size.getGibiBytes()

def get_tebi_bytes(self) -> int:
"""
Gets the memory size in Tebibytes (= 1024 Gibibytes).
:return: The memory size in Tebibytes.
"""
return self._j_memory_size.getTebiBytes()

def get_java_memory_size(self):
"""
Gets the Java MemorySize object.
:return: The Java MemorySize object.
"""
return self._j_memory_size

def __eq__(self, other):
return isinstance(other, self.__class__) and self._j_memory_size == other._j_memory_size

def __hash__(self):
return self._j_memory_size.hashCode()

def __lt__(self, other: 'MemorySize'):
if not isinstance(other, MemorySize):
raise Exception("Does not support comparison with non-MemorySize %s" % other)

return self._j_memory_size.compareTo(other._j_memory_size) == -1

def __le__(self, other: 'MemorySize'):
return self.__eq__(other) and self.__lt__(other)

def __str__(self):
return self._j_memory_size.toString()


class SlotSharingGroup(object):
"""
Describe the name and the the different resource components of a slot sharing group.
"""

def __init__(self, j_slot_sharing_group):
self._j_slot_sharing_group = j_slot_sharing_group

def get_name(self) -> str:
"""
Gets the name of this SlotSharingGroup.
:return: The name of the SlotSharingGroup.
"""
return self._j_slot_sharing_group.getName()

def get_managed_memory(self) -> Optional[MemorySize]:
"""
Gets the task managed memory for this SlotSharingGroup.
:return: The task managed memory of the SlotSharingGroup.
"""
managed_memory = self._j_slot_sharing_group.getManagedMemory()
return MemorySize(managed_memory.get()) if managed_memory.isPresent() else None

def get_task_heap_memory(self) -> Optional[MemorySize]:
"""
Gets the task heap memory for this SlotSharingGroup.
:return: The task heap memory of the SlotSharingGroup.
"""
task_heap_memory = self._j_slot_sharing_group.getTaskHeapMemory()
return MemorySize(task_heap_memory.get()) if task_heap_memory.isPresent() else None

def get_task_off_heap_memory(self) -> Optional[MemorySize]:
"""
Gets the task off-heap memory for this SlotSharingGroup.
:return: The task off-heap memory of the SlotSharingGroup.
"""
task_off_heap_memory = self._j_slot_sharing_group.getTaskOffHeapMemory()
return MemorySize(task_off_heap_memory.get()) if task_off_heap_memory.isPresent() else None

def get_cpu_cores(self) -> Optional[float]:
"""
Gets the CPU cores for this SlotSharingGroup.
:return: The CPU cores of the SlotSharingGroup.
"""
cpu_cores = self._j_slot_sharing_group.getCpuCores()
return cpu_cores.get() if cpu_cores.isPresent() else None

def get_external_resources(self) -> dict:
"""
Gets the external resource from this SlotSharingGroup.
:return: User specified resources of the SlotSharingGroup.
"""
return dict(self._j_slot_sharing_group.getExternalResources())

def get_java_slot_sharing_group(self):
"""
Gets the Java SlotSharingGroup object.
:return: The Java SlotSharingGroup object.
"""
return self._j_slot_sharing_group

@staticmethod
def builder(name: str) -> 'Builder':
"""
Gets the Builder with the given name for this SlotSharingGroup.
:param name: The name of the SlotSharingGroup.
:return: The builder for the SlotSharingGroup.
"""
return SlotSharingGroup.Builder(
get_gateway().jvm.org.apache.flink.api.common.operators.SlotSharingGroup.newBuilder(
name))

def __eq__(self, other):
return isinstance(other, self.__class__) and \
self._j_slot_sharing_group == other._j_slot_sharing_group

def __hash__(self):
return self._j_slot_sharing_group.hashCode()

class Builder(object):
"""
Builder for the SlotSharingGroup.
"""

def __init__(self, j_builder):
self._j_builder = j_builder

def set_cpu_cores(self, cpu_cores: float) -> 'SlotSharingGroup.Builder':
"""
Sets the CPU cores for this SlotSharingGroup.
:param cpu_cores: The CPU cores of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setCpuCores(cpu_cores)
return self

def set_task_heap_memory(self, task_heap_memory: MemorySize) -> 'SlotSharingGroup.Builder':
"""
Sets the task heap memory for this SlotSharingGroup.
:param task_heap_memory: The task heap memory of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setTaskHeapMemory(task_heap_memory.get_java_memory_size())
return self

def set_task_heap_memory_mb(self, task_heap_memory_mb: int) -> 'SlotSharingGroup.Builder':
"""
Sets the task heap memory for this SlotSharingGroup in MB.
:param task_heap_memory_mb: The task heap memory of the SlotSharingGroup in MB.
:return: This object.
"""
self._j_builder.setTaskHeapMemoryMB(task_heap_memory_mb)
return self

def set_task_off_heap_memory(self, task_off_heap_memory: MemorySize) \
-> 'SlotSharingGroup.Builder':
"""
Sets the task off-heap memory for this SlotSharingGroup.
:param task_off_heap_memory: The task off-heap memory of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setTaskOffHeapMemory(task_off_heap_memory.get_java_memory_size())
return self

def set_task_off_heap_memory_mb(self, task_off_heap_memory_mb: int) \
-> 'SlotSharingGroup.Builder':
"""
Sets the task off-heap memory for this SlotSharingGroup in MB.
:param task_off_heap_memory_mb: The task off-heap memory of the SlotSharingGroup in MB.
:return: This object.
"""
self._j_builder.setTaskOffHeapMemoryMB(task_off_heap_memory_mb)
return self

def set_managed_memory(self, managed_memory: MemorySize) -> 'SlotSharingGroup.Builder':
"""
Sets the task managed memory for this SlotSharingGroup.
:param managed_memory: The task managed memory of the SlotSharingGroup.
:return: This object.
"""
self._j_builder.setManagedMemory(managed_memory.get_java_memory_size())
return self

def set_managed_memory_mb(self, managed_memory_mb: int) -> 'SlotSharingGroup.Builder':
"""
Sets the task managed memory for this SlotSharingGroup in MB.
:param managed_memory_mb: The task managed memory of the SlotSharingGroup in MB.
:return: This object.
"""
self._j_builder.setManagedMemoryMB(managed_memory_mb)
return self

def set_external_resource(self, name: str, value: float) -> 'SlotSharingGroup.Builder':
"""
Adds the given external resource. The old value with the same resource name will be
replaced if present.
:param name: The resource name of the given external resource.
:param value: The value of the given external resource.
:return: This object.
"""
self._j_builder.setExternalResource(name, value)
return self

def build(self) -> 'SlotSharingGroup':
"""
Builds the SlotSharingGroup.
:return: The SlotSharingGroup object.
"""
return SlotSharingGroup(j_slot_sharing_group=self._j_builder.build())
Loading

0 comments on commit 4bcdfa2

Please sign in to comment.