Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7746] Fix type errors and enable checks for apache_beam.dataframe.* #11632

Merged
merged 9 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@
from __future__ import absolute_import

import inspect
from typing import TYPE_CHECKING
from typing import Any
robertwb marked this conversation as resolved.
Show resolved Hide resolved
from typing import Dict
from typing import Tuple
from typing import Union

from apache_beam import pvalue
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import transforms

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
import pandas


# TODO: Or should this be called as_dataframe?
def to_dataframe(
Expand All @@ -49,9 +58,9 @@ def to_dataframe(

# TODO: Or should this be called from_dataframe?
def to_pcollection(
*dataframes, # type: Tuple[frame_base.DeferredFrame]
*dataframes, # type: frame_base.DeferredFrame
**kwargs):
# type: (...) -> Union[pvalue.PCollection, Tuple[pvalue.PCollection]]
# type: (...) -> Union[pvalue.PCollection, Tuple[pvalue.PCollection, ...]]

"""Converts one or more deferred dataframe-like objects back to a PCollection.

Expand All @@ -67,18 +76,23 @@ def to_pcollection(
if label is None:
# Attempt to come up with a reasonable, stable label by retrieving the name
# of these variables in the calling context.
previous_frame = inspect.currentframe().f_back
current_frame = inspect.currentframe()
if current_frame is None:
label = 'ToDataframe(...)'

else:
previous_frame = current_frame.f_back

def name(obj):
for key, value in previous_frame.f_locals.items():
if obj is value:
return key
for key, value in previous_frame.f_globals.items():
if obj is value:
return key
return '...'
def name(obj):
for key, value in previous_frame.f_locals.items():
if obj is value:
return key
for key, value in previous_frame.f_globals.items():
if obj is value:
return key
return '...'

label = 'ToDataframe(%s)' % ', '.join(name(e) for e in dataframes)
label = 'ToDataframe(%s)' % ', '.join(name(e) for e in dataframes)

def extract_input(placeholder):
if not isinstance(placeholder._reference, pvalue.PCollection):
Expand All @@ -91,7 +105,8 @@ def extract_input(placeholder):
results = {p: extract_input(p)
for p in placeholders
} | label >> transforms._DataframeExpressionsTransform(
dict((ix, df._expr) for ix, df in enumerate(dataframes)))
dict((ix, df._expr) for ix, df in enumerate(
dataframes))) # type: Dict[Any, pvalue.PCollection]
if len(results) == 1 and not always_return_tuple:
return results[0]
else:
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/dataframe/doctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import contextlib
import doctest
import re
from typing import Any
from typing import Dict
from typing import List

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -136,7 +139,7 @@ class _InMemoryResultRecorder(object):
"""

# Class-level value to survive pickling.
_ALL_RESULTS = {}
_ALL_RESULTS = {} # type: Dict[str, List[Any]]

def __init__(self):
self._id = id(self)
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import absolute_import

import inspect
from typing import Dict

import pandas as pd

Expand All @@ -25,7 +26,7 @@

class DeferredFrame(object):

_pandas_type_map = {}
_pandas_type_map = {} # type: Dict[type, type]

def __init__(self, expr):
self._expr = expr
Expand Down
26 changes: 23 additions & 3 deletions sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,28 @@

from __future__ import absolute_import

from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import List
from typing import Mapping
from typing import Tuple
from typing import TypeVar
from typing import Union

import pandas as pd

import apache_beam as beam
from apache_beam import transforms
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frames # pylint: disable=unused-import

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from apache_beam.pvalue import PCollection

T = TypeVar('T')


class DataframeTransform(transforms.PTransform):
"""A PTransform for applying function that takes and returns dataframes
Expand Down Expand Up @@ -82,8 +97,8 @@ def expand(self, inputs):

def _apply_deferred_ops(
self,
inputs, # type: Dict[PlaceholderExpr, PCollection]
outputs, # type: Dict[Any, Expression]
inputs, # type: Dict[expressions.Expression, PCollection]
outputs, # type: Dict[Any, expressions.Expression]
): # -> Dict[Any, PCollection]
"""Construct a Beam graph that evaluates a set of expressions on a set of
input PCollections.
Expand Down Expand Up @@ -248,7 +263,12 @@ def _dict_union(dicts):
return result


def _flatten(valueish, root=()):
def _flatten(
valueish, # type: Union[T, List[T], Tuple[T], Dict[Any, T]]
root=(), # type: Tuple[Any, ...]
):
# type: (...) -> Mapping[Tuple[Any, ...], T]

"""Given a nested structure of dicts, tuples, and lists, return a flat
dictionary where the values are the leafs and the keys are the "paths" to
these leaves.
Expand Down
3 changes: 0 additions & 3 deletions sdks/python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ ignore_errors = true
[mypy-apache_beam.coders.*]
ignore_errors = true

[mypy-apache_beam.dataframe.*]
ignore_errors = true

[mypy-apache_beam.io.*]
ignore_errors = true

Expand Down