Skip to content

Commit

Permalink
Error on warnings emitted in dask-sql/dask (#1261)
Browse files Browse the repository at this point in the history
* Error on dask warnings

* Fix dask shuffle method in conftest.py

* Nullable column normalization in compat testing

* Error on warnings emitted in dask_sql

* Fix deprecated use of np.bool8

* Remove deprecated uses of is_datetime64tz_dtype

* Ignore resource warnings emitted by cross-joins

* Drop deprecated uses of logger.warn

* Filter divide by zero warnings on test_coalesce

* Commit list of failures to date

* Resolve sqlite and rex failures

* Ignore int/float warning in test_intersect

* Ignore single-machine scheduler warnings

* Add meta to apply in test_describe_model

* Resolve remaining errors

* Fix style checks

* Always use isocalender().week

* Pin sklearn to <1.4

* Unpin sqlalchemy<2

* Refactor pyhive input/tests for sqlalchemy 2

* Use astype to normalize dtypes in _assert_query_gives_same_result

* Refine pd.NA normalization in _assert_query_gives_same_result

* Explicitly compute pandas result in test_join_reorder

* xfail tpot tests, unpin sklearn

* Linting

* Explicitly select group columns in window groupby-apply

* Replace deprecated pandas unit mappings

* Refactor eq_sqlite to avoid fillna downcasting

* Refactor boolean operations to avoid fillna downcasting

* Fix resulting failures in test_is_true|false

* Linting

* Switch to pd.to_datetime for pandas 1.4 compat

* Minor fixes to test assertions

* Move pytest ini to pyproject, add ignore for remaining emitted ResourceWarning

* Remove normalize_dask_result

* Introduce convert_nullable_columns to handle some mixed nan/NA cases, compute dask dataframe in compat tests

* Linting

* Use dask columns instead of postgres columns in _assert_query_gives_same_result
  • Loading branch information
charlesbluca committed Feb 1, 2024
1 parent f2025c1 commit 6bb8cd4
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 93 deletions.
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def pytest_addoption(parser):
def pytest_runtest_setup(item):
# TODO: get pyarrow strings and p2p shuffle working
dask.config.set({"dataframe.convert-string": False})
dask.config.set({"dataframe.shuffle.algorithm": "tasks"})
dask.config.set({"dataframe.shuffle.method": "tasks"})
if "gpu" in item.keywords:
if not item.config.getoption("--rungpu"):
pytest.skip("need --rungpu option to run")
Expand Down
2 changes: 1 addition & 1 deletion dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ def _get_ral(self, sql):
except DFOptimizationException as oe:
# Use original plan and warn about inability to optimize plan
rel = nonOptimizedRel
logger.warn(str(oe))
logger.warning(str(oe))
else:
rel = nonOptimizedRel

Expand Down
14 changes: 7 additions & 7 deletions dask_sql/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
pd.UInt16Dtype(): SqlTypeName.SMALLINT,
np.uint8: SqlTypeName.TINYINT,
pd.UInt8Dtype(): SqlTypeName.TINYINT,
np.bool8: SqlTypeName.BOOLEAN,
np.bool_: SqlTypeName.BOOLEAN,
pd.BooleanDtype(): SqlTypeName.BOOLEAN,
str: SqlTypeName.VARCHAR,
np.object_: SqlTypeName.VARCHAR,
Expand All @@ -55,7 +55,7 @@
"SqlTypeName.INTEGER": np.int32,
"SqlTypeName.SMALLINT": np.int16,
"SqlTypeName.TINYINT": np.int8,
"SqlTypeName.BOOLEAN": np.bool8,
"SqlTypeName.BOOLEAN": np.bool_,
"SqlTypeName.VARCHAR": str,
"SqlTypeName.CHAR": str,
"SqlTypeName.NULL": type(None),
Expand Down Expand Up @@ -100,7 +100,7 @@ def python_to_sql_type(python_type) -> "DaskTypeMap":
if isinstance(python_type, np.dtype):
python_type = python_type.type

if pd.api.types.is_datetime64tz_dtype(python_type):
if isinstance(python_type, pd.DatetimeTZDtype):
return DaskTypeMap(
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
unit=str(python_type.unit),
Expand Down Expand Up @@ -277,8 +277,8 @@ def similar_type(lhs: type, rhs: type) -> bool:
is_object = pdt.is_object_dtype
is_string = pdt.is_string_dtype
is_dt_ns = pdt.is_datetime64_ns_dtype
is_dt_tz = lambda t: is_dt_ns(t) and pdt.is_datetime64tz_dtype(t)
is_dt_ntz = lambda t: is_dt_ns(t) and not pdt.is_datetime64tz_dtype(t)
is_dt_tz = lambda t: is_dt_ns(t) and isinstance(t, pd.DatetimeTZDtype)
is_dt_ntz = lambda t: is_dt_ns(t) and not isinstance(t, pd.DatetimeTZDtype)
is_td_ns = pdt.is_timedelta64_ns_dtype
is_bool = pdt.is_bool_dtype

Expand Down Expand Up @@ -334,8 +334,8 @@ def cast_column_to_type(col: dd.Series, expected_type: str):
pdt = pd.api.types

is_dt_ns = pdt.is_datetime64_ns_dtype
is_dt_tz = lambda t: is_dt_ns(t) and pdt.is_datetime64tz_dtype(t)
is_dt_ntz = lambda t: is_dt_ns(t) and not pdt.is_datetime64tz_dtype(t)
is_dt_tz = lambda t: is_dt_ns(t) and isinstance(t, pd.DatetimeTZDtype)
is_dt_ntz = lambda t: is_dt_ns(t) and not isinstance(t, pd.DatetimeTZDtype)

current_type = col.dtype

Expand Down
4 changes: 3 additions & 1 deletion dask_sql/physical/rel/logical/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ def _apply_window(
# TODO: That is a bit of a hack. We should really use the real column dtype
meta = df._meta.assign(**{col: 0.0 for col in newly_created_columns})

df = df.groupby(group_columns, dropna=False).apply(filled_map, meta=meta)
df = df.groupby(group_columns, dropna=False)[df.columns.tolist()].apply(
filled_map, meta=meta
)
logger.debug(
f"Having created a dataframe {LoggableDataFrame(df)} after windowing. Will now drop {temporary_columns}."
)
Expand Down
16 changes: 8 additions & 8 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dask.highlevelgraph import HighLevelGraph
from dask.utils import random_state_data

from dask_sql._compat import DASK_CUDF_TODATETIME_SUPPORT, PANDAS_GT_200
from dask_sql._compat import DASK_CUDF_TODATETIME_SUPPORT
from dask_sql._datafusion_lib import SqlTypeName
from dask_sql.datacontainer import DataContainer
from dask_sql.mappings import (
Expand Down Expand Up @@ -311,7 +311,7 @@ def false_(
Returns false on nan.
"""
if is_frame(df):
return ~df.fillna(True)
return ~df.astype("boolean").fillna(True)

return not pd.isna(df) and df is not None and not np.isnan(df) and not bool(df)

Expand All @@ -331,7 +331,7 @@ def true_(
Returns false on nan.
"""
if is_frame(df):
return df.fillna(False)
return df.astype("boolean").fillna(False)

return not pd.isna(df) and df is not None and not np.isnan(df) and bool(df)

Expand Down Expand Up @@ -794,11 +794,11 @@ def _round_datetime(self, *operands):

unit_map = {
"DAY": "D",
"HOUR": "H",
"MINUTE": "T",
"SECOND": "S",
"HOUR": "h",
"MINUTE": "min",
"SECOND": "s",
"MICROSECOND": "U",
"MILLISECOND": "L",
"MILLISECOND": "ms",
}

try:
Expand Down Expand Up @@ -960,7 +960,7 @@ def date_part(self, what, df: SeriesOrScalar):
elif what in {"SECOND", "SECONDS"}:
return df.second
elif what in {"WEEK", "WEEKS"}:
return df.isocalendar().week if PANDAS_GT_200 else df.week
return df.isocalendar().week
elif what in {"YEAR", "YEARS"}:
return df.year
elif what == "DATE":
Expand Down
2 changes: 1 addition & 1 deletion dask_sql/server/presto_jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def create_meta_data(c: Context):
"""

if c is None:
logger.warn("Context None: jdbc meta data not created")
logger.warning("Context None: jdbc meta data not created")
return
catalog = ""
system_schema = "system_jdbc"
Expand Down
2 changes: 1 addition & 1 deletion docs/source/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ For this, it uses the following mapping:
+-----------------------+----------------+
| From Python Type | To SQL Type |
+=======================+================+
| ``np.bool8`` | ``BOOLEAN`` |
| ``np.bool_`` | ``BOOLEAN`` |
+-----------------------+----------------+
| ``np.datetime64`` | ``TIMESTAMP`` |
+-----------------------+----------------+
Expand Down
15 changes: 15 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,18 @@ locked = true

[tool.isort]
profile = "black"

[tool.pytest.ini_options]
markers = [
"gpu: marks tests that require GPUs (skipped by default, run with --rungpu)",
"queries: marks tests that run test queries (skipped by default, run with --runqueries)",
]
addopts = "-v -rsxfE --color=yes --cov dask_sql --cov-config=.coveragerc --cov-report=term-missing"
filterwarnings = [
"error:::dask_sql[.*]",
"error:::dask[.*]",
"ignore:Need to do a cross-join:ResourceWarning:dask_sql[.*]",
"ignore:Dask doesn't support Dask frames:ResourceWarning:dask_sql[.*]",
"ignore:Running on a single-machine scheduler:UserWarning:dask[.*]",
]
xfail_strict = true
11 changes: 0 additions & 11 deletions pytest.ini

This file was deleted.

21 changes: 9 additions & 12 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dask.datasets import timeseries as dd_timeseries
from dask.distributed import Client

from tests.utils import assert_eq
from tests.utils import assert_eq, convert_nullable_columns

try:
import cudf
Expand Down Expand Up @@ -333,20 +333,17 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs):

# allow that the names are different
# as expressions are handled differently
dask_result.columns = sql_result.columns
sql_result.columns = dask_result.columns

# replace all pd.NA scalars, which are resistent to
# check_dype=False and .astype()
dask_result = dask_result.replace({pd.NA: None})
sql_result = sql_result.convert_dtypes()
dask_result = dask_result.convert_dtypes()

if sort_columns:
sql_result = sql_result.sort_values(sort_columns)
dask_result = dask_result.sort_values(sort_columns)
convert_nullable_columns(sql_result)
convert_nullable_columns(dask_result)

sql_result = sql_result.reset_index(drop=True)
dask_result = dask_result.reset_index(drop=True)

assert_eq(sql_result, dask_result, check_dtype=False, **kwargs)
assert_eq(
sql_result, dask_result, check_dtype=False, check_index=False, **kwargs
)

return _assert_query_gives_same_result

Expand Down
38 changes: 14 additions & 24 deletions tests/integration/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,32 @@

from dask_sql import Context
from dask_sql.utils import ParsingException
from tests.utils import assert_eq
from tests.utils import assert_eq, convert_nullable_columns


def cast_datetime_to_string(df):
cols = df.select_dtypes(include=["datetime64[ns]"]).columns.tolist()

if not cols:
return df

for col in cols:
df[col] = df[col].dt.strftime("%Y-%m-%d %H:%M:%S")

return df


def eq_sqlite(sql, check_index=True, **dfs):
def eq_sqlite(sql, **dfs):
c = Context()
engine = sqlite3.connect(":memory:")

for name, df in dfs.items():
c.create_table(name, df)
df.to_sql(name, engine, index=False)

dask_result = c.sql(sql).reset_index(drop=True)
sqlite_result = pd.read_sql(sql, engine).reset_index(drop=True)
dask_result = c.sql(sql).compute().convert_dtypes()
sqlite_result = pd.read_sql(sql, engine).convert_dtypes()

convert_nullable_columns(dask_result)
convert_nullable_columns(sqlite_result)

# casting to object to ensure equality with sql-lite
# which returns object dtype for datetime inputs
dask_result = cast_datetime_to_string(dask_result)
datetime_cols = dask_result.select_dtypes(
include=["datetime64[ns]"]
).columns.tolist()
for col in datetime_cols:
sqlite_result[col] = pd.to_datetime(sqlite_result[col])

# Make sure SQL and Dask use the same "NULL" value
dask_result = dask_result.fillna(np.NaN)
sqlite_result = sqlite_result.fillna(np.NaN)
sqlite_result = sqlite_result.astype(dask_result.dtypes)

assert_eq(dask_result, sqlite_result, check_dtype=False, check_index=check_index)
assert_eq(dask_result, sqlite_result, check_dtype=False, check_index=False)


def make_rand_df(size: int, **kwargs):
Expand Down Expand Up @@ -953,7 +944,6 @@ def test_union():
UNION ALL SELECT * FROM c
ORDER BY b NULLS FIRST, c NULLS FIRST
""",
check_index=False,
a=a,
b=b,
c=c,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def test_group_by_case(c):
)


def test_group_by_nan(c):
def test_group_by_nan(c, user_table_nan):
return_df = c.sql(
"""
SELECT
Expand All @@ -180,7 +180,7 @@ def test_group_by_nan(c):
GROUP BY c
"""
)
expected_df = pd.DataFrame({"c": [3, float("nan"), 1]})
expected_df = user_table_nan.drop_duplicates(subset=["c"])

# we return nullable int dtype instead of float
assert_eq(return_df, expected_df, check_dtype=False)
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ def test_conditional_join_with_limit(c):
assert_eq(actual_df, expected_df, check_index=False)


@pytest.mark.filterwarnings(
"ignore:You are merging on int and float:UserWarning:dask.dataframe.multi"
)
def test_intersect(c):

# Join df_simple against itself
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,9 @@ def test_describe_model(c):
.sort_index()
)
# test
result = c.sql("DESCRIBE MODEL ex_describe_model")["Params"].apply(lambda x: str(x))
result = c.sql("DESCRIBE MODEL ex_describe_model")["Params"].apply(
lambda x: str(x), meta=("Params", "object")
)

assert_eq(expected_series, result)

Expand Down
Loading

0 comments on commit 6bb8cd4

Please sign in to comment.