From 8d3f53be2db40e70ea550e64db014e59ef3e4b58 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Wed, 14 Jun 2023 20:39:57 +0000 Subject: [PATCH 01/13] Add beggining of support for left anti joins --- dask_sql/physical/rel/logical/join.py | 25 ++++++++++++++++++------- tests/integration/test_join.py | 21 +++++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index 3aa3774d2..01fe50d35 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -46,6 +46,7 @@ class DaskJoinPlugin(BaseRelPlugin): "RIGHT": "right", "FULL": "outer", "LEFTSEMI": "inner", # TODO: Need research here! This is likely not a true inner join + "LEFTANTI": "leftanti", } def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer: @@ -227,7 +228,7 @@ def _join_on_columns( [~df_lhs_renamed.iloc[:, index].isna() for index in lhs_on], ) df_lhs_renamed = df_lhs_renamed[df_lhs_filter] - if join_type in ["inner", "left"]: + if join_type in ["inner", "left", "leftanti"]: df_rhs_filter = reduce( operator.and_, [~df_rhs_renamed.iloc[:, index].isna() for index in rhs_on], @@ -256,12 +257,22 @@ def _join_on_columns( "For more information refer to https://github.com/dask/dask/issues/9851" " and https://github.com/dask/dask/issues/9870" ) - df = df_lhs_with_tmp.merge( - df_rhs_with_tmp, - on=added_columns, - how=join_type, - broadcast=broadcast, - ).drop(columns=added_columns) + if join_type == "leftanti": + df = df_lhs_with_tmp.merge( + df_rhs_with_tmp, + on=added_columns, + how="left", + broadcast=broadcast, + indicator=True, + ).drop(columns=added_columns) + df = df[df["_merge"] == "left_only"].drop(columns=["_merge"]) + else: + df = df_lhs_with_tmp.merge( + df_rhs_with_tmp, + on=added_columns, + how=join_type, + broadcast=broadcast, + ).drop(columns=added_columns) return df diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 3b131541c..b2aba91b8 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -86,6 +86,27 @@ def test_join_left(c): assert_eq(return_df, expected_df, check_index=False) +def test_join_left_anti(c): + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.c + FROM user_table_2 AS lhs + LEFT ANTI JOIN user_table_1 AS rhs + ON lhs.user_id = rhs.user_id + """ + ) + expected_df = pd.DataFrame( + { + # That is strange. Unfortunately, it seems dask fills in the + # missing rows with NaN, not with NA... + "user_id": [4], + "c": [4], + } + ) + + assert_eq(return_df, expected_df, check_index=False) + + def test_join_right(c): return_df = c.sql( """ From 4a8d9bf563f8ef1376d8908c04291d21f0d843ab Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Thu, 22 Jun 2023 16:30:41 +0000 Subject: [PATCH 02/13] Update for gpu --- dask_sql/physical/rel/logical/join.py | 39 ++++++++++++++------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index 01fe50d35..9179fb458 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -45,7 +45,7 @@ class DaskJoinPlugin(BaseRelPlugin): "LEFT": "left", "RIGHT": "right", "FULL": "outer", - "LEFTSEMI": "inner", # TODO: Need research here! This is likely not a true inner join + "LEFTSEMI": "leftsemi", "LEFTANTI": "leftanti", } @@ -179,6 +179,7 @@ def merge_single_partitions(lhs_partition, rhs_partition): # and to rename them like the rel specifies row_type = rel.getRowType() field_specifications = [str(f) for f in row_type.getFieldNames()] + print(field_specifications) cc = cc.rename( { @@ -228,7 +229,7 @@ def _join_on_columns( [~df_lhs_renamed.iloc[:, index].isna() for index in lhs_on], ) df_lhs_renamed = df_lhs_renamed[df_lhs_filter] - if join_type in ["inner", "left", "leftanti"]: + if join_type in ["inner", "left", "leftanti", "leftsemi"]: df_rhs_filter = reduce( operator.and_, [~df_rhs_renamed.iloc[:, index].isna() for index in rhs_on], @@ -257,22 +258,24 @@ def _join_on_columns( "For more information refer to https://github.com/dask/dask/issues/9851" " and https://github.com/dask/dask/issues/9870" ) - if join_type == "leftanti": - df = df_lhs_with_tmp.merge( - df_rhs_with_tmp, - on=added_columns, - how="left", - broadcast=broadcast, - indicator=True, - ).drop(columns=added_columns) - df = df[df["_merge"] == "left_only"].drop(columns=["_merge"]) - else: - df = df_lhs_with_tmp.merge( - df_rhs_with_tmp, - on=added_columns, - how=join_type, - broadcast=broadcast, - ).drop(columns=added_columns) + # if join_type == "leftanti": + # df = df_lhs_with_tmp.merge( + # df_rhs_with_tmp, + # on=added_columns, + # how="left", + # broadcast=broadcast, + # indicator=True, + # ).drop(columns=added_columns) + # df = df[df["_merge"] == "left_only"].drop(columns=["_merge"]) + # elif join_type == "leftsemi": + # df = + + df = df_lhs_with_tmp.merge( + df_rhs_with_tmp, + on=added_columns, + how=join_type, + broadcast=broadcast, + ).drop(columns=added_columns) return df From 4129154fa5363629657f662619606daf75a61acc Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Thu, 22 Jun 2023 17:47:02 +0000 Subject: [PATCH 03/13] Add leftsemi test --- tests/integration/test_join.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index b2aba91b8..2a7df370e 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -107,6 +107,27 @@ def test_join_left_anti(c): assert_eq(return_df, expected_df, check_index=False) +def test_join_left_semi(c): + return_df = c.sql( + """ + SELECT lhs.user_id, lhs.c + FROM user_table_2 AS lhs + LEFT SEMI JOIN user_table_1 AS rhs + ON lhs.user_id = rhs.user_id + """ + ) + expected_df = pd.DataFrame( + { + # That is strange. Unfortunately, it seems dask fills in the + # missing rows with NaN, not with NA... + "user_id": [1, 1, 2], + "c": [1, 2, 3], + } + ) + + assert_eq(return_df, expected_df, check_index=False) + + def test_join_right(c): return_df = c.sql( """ From 41632c80bfb09f78133ea418b5cd5c0d6da5e2d2 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Thu, 22 Jun 2023 18:31:32 +0000 Subject: [PATCH 04/13] Workaround column mismatch and update tests for gpu --- dask_sql/physical/rel/base.py | 4 ++- dask_sql/physical/rel/logical/join.py | 25 +++++++++------- tests/integration/test_join.py | 42 ++++++++++++++++----------- 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index 520f14e6d..f646eb3dd 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -30,7 +30,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> dd.DataFra @staticmethod def fix_column_to_row_type( - cc: ColumnContainer, row_type: "RelDataType" + cc: ColumnContainer, row_type: "RelDataType", join_type: str = None ) -> ColumnContainer: """ Make sure that the given column container @@ -39,6 +39,8 @@ def fix_column_to_row_type( and will just "blindly" rename the columns. """ field_names = [str(x) for x in row_type.getFieldNames()] + if join_type in ("leftsemi", "leftanti"): + field_names = field_names[: len(cc.columns)] logger.debug(f"Renaming {cc.columns} to {field_names}") cc = cc.rename_handle_duplicates( diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index 9179fb458..306bd588b 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -171,15 +171,19 @@ def merge_single_partitions(lhs_partition, rhs_partition): # 6. So the next step is to make sure # we have the correct column order (and to remove the temporary join columns) - correct_column_order = list(df_lhs_renamed.columns) + list( - df_rhs_renamed.columns - ) + if join_type in ("leftsemi", "leftanti"): + correct_column_order = list(df_lhs_renamed.columns) + else: + correct_column_order = list(df_lhs_renamed.columns) + list( + df_rhs_renamed.columns + ) cc = ColumnContainer(df.columns).limit_to(correct_column_order) # and to rename them like the rel specifies row_type = rel.getRowType() field_specifications = [str(f) for f in row_type.getFieldNames()] - print(field_specifications) + if join_type in ("leftsemi", "leftanti"): + field_specifications = field_specifications[: len(cc.columns)] cc = cc.rename( { @@ -187,7 +191,7 @@ def merge_single_partitions(lhs_partition, rhs_partition): for from_col, to_col in zip(cc.columns, field_specifications) } ) - cc = self.fix_column_to_row_type(cc, row_type) + cc = self.fix_column_to_row_type(cc, row_type, join_type) dc = DataContainer(df, cc) # 7. Last but not least we apply any filters by and-chaining together the filters @@ -204,7 +208,8 @@ def merge_single_partitions(lhs_partition, rhs_partition): df = filter_or_scalar(df, filter_condition) dc = DataContainer(df, cc) - dc = self.fix_dtype_to_row_type(dc, rel.getRowType()) + if join_type not in ("leftsemi", "leftanti"): + dc = self.fix_dtype_to_row_type(dc, rel.getRowType()) # # Rename underlying DataFrame column names back to their original values before returning # df = dc.assign() # dc = DataContainer(df, ColumnContainer(cc.columns)) @@ -258,7 +263,7 @@ def _join_on_columns( "For more information refer to https://github.com/dask/dask/issues/9851" " and https://github.com/dask/dask/issues/9870" ) - # if join_type == "leftanti": + # if join_type == "leftanti" and : # df = df_lhs_with_tmp.merge( # df_rhs_with_tmp, # on=added_columns, @@ -266,10 +271,8 @@ def _join_on_columns( # broadcast=broadcast, # indicator=True, # ).drop(columns=added_columns) - # df = df[df["_merge"] == "left_only"].drop(columns=["_merge"]) - # elif join_type == "leftsemi": - # df = - + # df = df[df["_merge"] == "left_only"].drop(columns=["_merge"],errors="ignore") + # else: df = df_lhs_with_tmp.merge( df_rhs_with_tmp, on=added_columns, diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 2a7df370e..1169bc947 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -86,42 +86,50 @@ def test_join_left(c): assert_eq(return_df, expected_df, check_index=False) -def test_join_left_anti(c): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_join_left_anti(c, gpu): + df1 = pd.DataFrame({"id": [1, 1, 2, 4], "a": ["a", "b", "c", "d"]}) + df2 = pd.DataFrame({"id": [2, 1, 2, 3], "b": ["c", "c", "a", "c"]}) + c.create_table("df_1", df1, gpu=gpu) + c.create_table("df_2", df2, gpu=gpu) + return_df = c.sql( """ - SELECT lhs.user_id, lhs.c - FROM user_table_2 AS lhs - LEFT ANTI JOIN user_table_1 AS rhs - ON lhs.user_id = rhs.user_id + SELECT lhs.id, lhs.a + FROM df_1 AS lhs + LEFT ANTI JOIN df_2 AS rhs + ON lhs.id = rhs.id """ ) expected_df = pd.DataFrame( { - # That is strange. Unfortunately, it seems dask fills in the - # missing rows with NaN, not with NA... - "user_id": [4], - "c": [4], + "id": [4], + "a": ["d"], } ) assert_eq(return_df, expected_df, check_index=False) +@pytest.mark.gpu def test_join_left_semi(c): + df1 = pd.DataFrame({"id": [1, 1, 2, 4], "a": ["a", "b", "c", "d"]}) + df2 = pd.DataFrame({"id": [2, 1, 2, 3], "b": ["c", "c", "a", "c"]}) + c.create_table("df_1", df1, gpu=True) + c.create_table("df_2", df2, gpu=True) + return_df = c.sql( """ - SELECT lhs.user_id, lhs.c - FROM user_table_2 AS lhs - LEFT SEMI JOIN user_table_1 AS rhs - ON lhs.user_id = rhs.user_id + SELECT lhs.id, lhs.a + FROM df_1 AS lhs + LEFT SEMI JOIN df_2 AS rhs + ON lhs.id = rhs.id """ ) expected_df = pd.DataFrame( { - # That is strange. Unfortunately, it seems dask fills in the - # missing rows with NaN, not with NA... - "user_id": [1, 1, 2], - "c": [1, 2, 3], + "id": [1, 1, 2], + "a": ["a", "b", "c"], } ) From 99c1d0112ebcf93463d2e3dc3bb83a5890044db7 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Thu, 22 Jun 2023 21:01:15 +0000 Subject: [PATCH 05/13] Add cpu implementation of leftanti --- dask_sql/physical/rel/logical/join.py | 39 ++++++++++++++++----------- tests/integration/test_join.py | 6 +++-- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index 306bd588b..c31297329 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -14,6 +14,7 @@ from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rel.logical.filter import filter_or_scalar from dask_sql.physical.rex import RexConverter +from dask_sql.utils import is_cudf_type if TYPE_CHECKING: import dask_sql @@ -75,6 +76,9 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai join_type = join.getJoinType() join_type = self.JOIN_TYPE_MAPPING[str(join_type)] + # TODO: update with correct implementation of leftsemi + if join_type == "leftsemi" and not is_cudf_type(df_lhs_renamed): + join_type = "inner" # 3. The join condition can have two forms, that we can understand # (a) a = b @@ -208,6 +212,7 @@ def merge_single_partitions(lhs_partition, rhs_partition): df = filter_or_scalar(df, filter_condition) dc = DataContainer(df, cc) + # TODO: Debug this... if join_type not in ("leftsemi", "leftanti"): dc = self.fix_dtype_to_row_type(dc, rel.getRowType()) # # Rename underlying DataFrame column names back to their original values before returning @@ -263,22 +268,24 @@ def _join_on_columns( "For more information refer to https://github.com/dask/dask/issues/9851" " and https://github.com/dask/dask/issues/9870" ) - # if join_type == "leftanti" and : - # df = df_lhs_with_tmp.merge( - # df_rhs_with_tmp, - # on=added_columns, - # how="left", - # broadcast=broadcast, - # indicator=True, - # ).drop(columns=added_columns) - # df = df[df["_merge"] == "left_only"].drop(columns=["_merge"],errors="ignore") - # else: - df = df_lhs_with_tmp.merge( - df_rhs_with_tmp, - on=added_columns, - how=join_type, - broadcast=broadcast, - ).drop(columns=added_columns) + if join_type == "leftanti" and not is_cudf_type(df_lhs_with_tmp): + df = df_lhs_with_tmp.merge( + df_rhs_with_tmp, + on=added_columns, + how="left", + broadcast=broadcast, + indicator=True, + ).drop(columns=added_columns) + df = df[df["_merge"] == "left_only"].drop( + columns=["_merge"] + list(df_rhs_with_tmp.columns), errors="ignore" + ) + else: + df = df_lhs_with_tmp.merge( + df_rhs_with_tmp, + on=added_columns, + how=join_type, + broadcast=broadcast, + ).drop(columns=added_columns) return df diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 1169bc947..7546b44ce 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -101,9 +101,10 @@ def test_join_left_anti(c, gpu): ON lhs.id = rhs.id """ ) + # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "id": [4], + "lhs.id": [4], "a": ["d"], } ) @@ -126,9 +127,10 @@ def test_join_left_semi(c): ON lhs.id = rhs.id """ ) + # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "id": [1, 1, 2], + "lhs.id": [1, 1, 2], "a": ["a", "b", "c"], } ) From 6d24824644a2774ad22e718df82e71c86dfdbe64 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Thu, 22 Jun 2023 22:17:04 +0000 Subject: [PATCH 06/13] Update xfail queries --- tests/unit/test_queries.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_queries.py b/tests/unit/test_queries.py index b32e9530f..67120df82 100644 --- a/tests/unit/test_queries.py +++ b/tests/unit/test_queries.py @@ -36,7 +36,6 @@ 77, 80, 86, - 87, 88, 89, 92, From 5fd0d9c117dbc6eb3493c3ed425165717d9d6456 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Mon, 26 Jun 2023 00:41:48 +0000 Subject: [PATCH 07/13] Fix column renaming --- dask_sql/physical/rel/base.py | 11 ++++++++--- dask_sql/physical/rel/logical/join.py | 4 +--- tests/integration/test_join.py | 6 ++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index f646eb3dd..04c809671 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -86,7 +86,9 @@ def assert_inputs( return [RelConverter.convert(input_rel, context) for input_rel in input_rels] @staticmethod - def fix_dtype_to_row_type(dc: DataContainer, row_type: "RelDataType"): + def fix_dtype_to_row_type( + dc: DataContainer, row_type: "RelDataType", join_type: str = None + ): """ Fix the dtype of the given data container (or: the df within it) to the data type given as argument. @@ -100,9 +102,12 @@ def fix_dtype_to_row_type(dc: DataContainer, row_type: "RelDataType"): df = dc.df cc = dc.column_container + field_list = row_type.getFieldList() + if join_type in ("leftsemi", "leftanti"): + field_list = field_list[: len(cc.columns)] + field_types = { - str(field.getQualifiedName()): field.getType() - for field in row_type.getFieldList() + str(field.getQualifiedName()): field.getType() for field in field_list } for field_name, field_type in field_types.items(): diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index c31297329..b9fd7c879 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -212,9 +212,7 @@ def merge_single_partitions(lhs_partition, rhs_partition): df = filter_or_scalar(df, filter_condition) dc = DataContainer(df, cc) - # TODO: Debug this... - if join_type not in ("leftsemi", "leftanti"): - dc = self.fix_dtype_to_row_type(dc, rel.getRowType()) + dc = self.fix_dtype_to_row_type(dc, rel.getRowType(), join_type) # # Rename underlying DataFrame column names back to their original values before returning # df = dc.assign() # dc = DataContainer(df, ColumnContainer(cc.columns)) diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 7546b44ce..1169bc947 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -101,10 +101,9 @@ def test_join_left_anti(c, gpu): ON lhs.id = rhs.id """ ) - # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "lhs.id": [4], + "id": [4], "a": ["d"], } ) @@ -127,10 +126,9 @@ def test_join_left_semi(c): ON lhs.id = rhs.id """ ) - # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "lhs.id": [1, 1, 2], + "id": [1, 1, 2], "a": ["a", "b", "c"], } ) From b3da2b79cf12e7c9aff06de618c4e941c9b3bcaf Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Mon, 26 Jun 2023 01:39:41 +0000 Subject: [PATCH 08/13] Reverse test changes --- tests/integration/test_join.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 1169bc947..7546b44ce 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -101,9 +101,10 @@ def test_join_left_anti(c, gpu): ON lhs.id = rhs.id """ ) + # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "id": [4], + "lhs.id": [4], "a": ["d"], } ) @@ -126,9 +127,10 @@ def test_join_left_semi(c): ON lhs.id = rhs.id """ ) + # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "id": [1, 1, 2], + "lhs.id": [1, 1, 2], "a": ["a", "b", "c"], } ) From 9bae4e078ce0cc711e0a49e740ef1bc1a013bc27 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 27 Jun 2023 13:42:04 -0700 Subject: [PATCH 09/13] Limit select_names in _compute_table_from_rel --- dask_sql/context.py | 11 +++++++---- tests/integration/test_join.py | 6 ++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/dask_sql/context.py b/dask_sql/context.py index 0b6f8faf8..17c6d0055 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -845,15 +845,19 @@ def _get_ral(self, sql): def _compute_table_from_rel(self, rel: "LogicalPlan", return_futures: bool = True): dc = RelConverter.convert(rel, context=self) - # Optimization might remove some alias projects. Make sure to keep them here. - select_names = [field for field in rel.getRowType().getFieldList()] - if rel.get_current_node_type() == "Explain": return dc if dc is None: return + # Optimization might remove some alias projects. Make sure to keep them here. + select_names = [field for field in rel.getRowType().getFieldList()] + if select_names: + cc = dc.column_container + + select_names = select_names[: len(cc.columns)] + # Use FQ name if not unique and simple name if it is unique. If a join contains the same column # names the output col is prepended with the fully qualified column name field_counts = Counter([field.getName() for field in select_names]) @@ -864,7 +868,6 @@ def _compute_table_from_rel(self, rel: "LogicalPlan", return_futures: bool = Tru for field in select_names ] - cc = dc.column_container cc = cc.rename( { df_col: select_name diff --git a/tests/integration/test_join.py b/tests/integration/test_join.py index 7546b44ce..1169bc947 100644 --- a/tests/integration/test_join.py +++ b/tests/integration/test_join.py @@ -101,10 +101,9 @@ def test_join_left_anti(c, gpu): ON lhs.id = rhs.id """ ) - # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "lhs.id": [4], + "id": [4], "a": ["d"], } ) @@ -127,10 +126,9 @@ def test_join_left_semi(c): ON lhs.id = rhs.id """ ) - # TODO: Figure out why this returns lhs.id instead of id expected_df = pd.DataFrame( { - "lhs.id": [1, 1, 2], + "id": [1, 1, 2], "a": ["a", "b", "c"], } ) From 21690db7e38a6333158ea1040452f8c6a13de360 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Wed, 28 Jun 2023 17:29:59 +0000 Subject: [PATCH 10/13] Address reviews --- dask_sql/physical/rel/base.py | 4 ++-- dask_sql/physical/rel/logical/join.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index 04c809671..11897e974 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -30,7 +30,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> dd.DataFra @staticmethod def fix_column_to_row_type( - cc: ColumnContainer, row_type: "RelDataType", join_type: str = None + cc: ColumnContainer, row_type: "RelDataType", join_type: str | None = None ) -> ColumnContainer: """ Make sure that the given column container @@ -87,7 +87,7 @@ def assert_inputs( @staticmethod def fix_dtype_to_row_type( - dc: DataContainer, row_type: "RelDataType", join_type: str = None + dc: DataContainer, row_type: "RelDataType", join_type: str | None = None ): """ Fix the dtype of the given data container (or: the df within it) diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index b9fd7c879..c1c904af6 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -76,7 +76,8 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai join_type = join.getJoinType() join_type = self.JOIN_TYPE_MAPPING[str(join_type)] - # TODO: update with correct implementation of leftsemi + # TODO: update with correct implementation of leftsemi for CPU + # https://github.com/dask-contrib/dask-sql/issues/1190 if join_type == "leftsemi" and not is_cudf_type(df_lhs_renamed): join_type = "inner" From 448d5de52f3a56cb13f59acce3fe92df0b8b7e70 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Wed, 28 Jun 2023 19:11:09 +0000 Subject: [PATCH 11/13] Revert type change --- dask_sql/physical/rel/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index 11897e974..04c809671 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -30,7 +30,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> dd.DataFra @staticmethod def fix_column_to_row_type( - cc: ColumnContainer, row_type: "RelDataType", join_type: str | None = None + cc: ColumnContainer, row_type: "RelDataType", join_type: str = None ) -> ColumnContainer: """ Make sure that the given column container @@ -87,7 +87,7 @@ def assert_inputs( @staticmethod def fix_dtype_to_row_type( - dc: DataContainer, row_type: "RelDataType", join_type: str | None = None + dc: DataContainer, row_type: "RelDataType", join_type: str = None ): """ Fix the dtype of the given data container (or: the df within it) From f6dba77f1829f1365ed3af5b94dfbffd35625b54 Mon Sep 17 00:00:00 2001 From: ChrisJar Date: Fri, 30 Jun 2023 12:20:31 -0700 Subject: [PATCH 12/13] Update type annotations Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> --- dask_sql/physical/rel/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index 04c809671..2dc40fab9 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -30,7 +30,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> dd.DataFra @staticmethod def fix_column_to_row_type( - cc: ColumnContainer, row_type: "RelDataType", join_type: str = None + cc: ColumnContainer, row_type: "RelDataType", join_type: Optional[str] = None ) -> ColumnContainer: """ Make sure that the given column container @@ -87,7 +87,7 @@ def assert_inputs( @staticmethod def fix_dtype_to_row_type( - dc: DataContainer, row_type: "RelDataType", join_type: str = None + dc: DataContainer, row_type: "RelDataType", join_type: Optional[str] = None ): """ Fix the dtype of the given data container (or: the df within it) From 5ab4b0cfb1df799310d641e2e61ae0af8600c6cd Mon Sep 17 00:00:00 2001 From: ChrisJar Date: Fri, 30 Jun 2023 12:30:23 -0700 Subject: [PATCH 13/13] Update imports in base.py --- dask_sql/physical/rel/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index 2dc40fab9..a1f378197 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -1,5 +1,5 @@ import logging -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, Optional import dask.dataframe as dd