-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48354][SQL] JDBC Connectors predicate pushdown testing #46642
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
import org.apache.spark.sql.test.SharedSparkSession | ||
import org.apache.spark.tags.DockerTest | ||
|
||
@DockerTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you put this in docker-integration tests folder ? this seems generic to jdbc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a way to filterout tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it seemed as a fitting place, but it can be used for more than docker integration tests, what is a fitting place for this trait?
Also, we could add some way to filterout tests here, but we already use override def excluded
from SparkFunSuite
, as suites implementing this trait are extended from there, do we need another method for that in this trait?
@@ -141,6 +160,9 @@ private case class MsSqlServerDialect() extends JdbcDialect { | |||
case ShortType if !SQLConf.get.legacyMsSqlServerNumericMappingEnabled => | |||
Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) | |||
case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.TINYINT)) | |||
case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) | |||
case DoubleType => Some(JdbcType("FLOAT", java.sql.Types.FLOAT)) | |||
case _ if !SQLConf.get.legacyMsSqlServerNumericMappingEnabled => JdbcUtils.getCommonJDBCType(dt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If question is about a !SQLConf.get.legacyMsSqlServerNumericMappingEnabled
, it is added here because there is this config, false by default, and when it is set, some type mapping shouldn't be supported (not sure which, or why there is this config), but if we didn't have this check here, some tests with this config would fail, as we would, for example convert ShortType to SMALLINT
even tho we shouldn't.
@@ -155,7 +162,8 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { | |||
getJDBCType(et).map(_.databaseTypeDefinition) | |||
.orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition)) | |||
.map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY)) | |||
case _ => None | |||
case LongType => Some(JdbcType("BIGINT", Types.BIGINT)) | |||
case _ => JdbcUtils.getCommonJDBCType(dt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none code path returns this as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you return None I mean, JdbcUtils.getCommonJDBCType
should be called ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is what you mean but in visitCast we have getJDBCType(dataType).map(_.databaseTypeDefinition).getOrElse(dataType.typeName)
, so if we return None, JdbcUtils.getCommonJDBCType
won't be called here.
import org.apache.spark.sql.test.SharedSparkSession | ||
import org.apache.spark.tags.DockerTest | ||
|
||
@DockerTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a way to filterout tests?
|
||
class MySqlPushdownIntegrationSuite | ||
extends DockerJDBCIntegrationSuite | ||
with V2JDBCPushdownTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't you just extend existing MySqlIntegrationSuite ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. That would run all tests from V2JDBCTest
as well? Not sure we want that?
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog | ||
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite, MsSQLServerDatabaseOnDocker} | ||
|
||
class MsSqlServerPushdownIntegrationSuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer as above
assert(isAggregateRemoved(df)) | ||
commonAssertionOnDataFrame(df) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add DISTINCT test
test("DISTINCT aggregate push down") { | |
val df = sql( | |
s"SELECT COUNT(DISTINCT(num_col)) " + | |
s"FROM `$catalog`.`$schema`.`$tablePrefix`") | |
checkAnswer(df, Row(5)) | |
assert(isAggregateRemoved(df)) | |
commonAssertionOnDataFrame(df) | |
} |
assert(isAggregateRemoved(df)) | ||
commonAssertionOnDataFrame(df) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add SUM test
test("SUM aggregate push down") { | |
val df = sql( | |
s"SELECT SUM(num_col) " + | |
s"FROM `$catalog`.`$schema`.`$tablePrefix`") | |
checkAnswer(df, Row(4150)) | |
assert(isAggregateRemoved(df)) | |
commonAssertionOnDataFrame(df) | |
} |
executeUpdate( | ||
s"""INSERT INTO "$schema"."${tablePrefix}_string_test" VALUES (0, ' forth ', 1000)""") | ||
|
||
executeUpdate(s"""INSERT INTO "$schema"."$tablePrefix" VALUES (1, 'ab', 1000)""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is maybe good to insert one null int col, to test aggregate functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On spark, nulls should be ignored, I am not sure how other databases handles null in aggregates.
What changes were proposed in this pull request?
In this PR, I add a new trait with tests for integration testing of JDBC connectors. Also, I propose changes to
MsSqlServerDialect
to support more filter push downs.Why are the changes needed?
These changes are needed for better testing of JDBC connectors and general improvements of push down capabilities.
Does this PR introduce any user-facing change?
No
How was this patch tested?
With added tests.
Was this patch authored or co-authored using generative AI tooling?
No