Skip to content

Commit

Permalink
INSERT REPLACE WHERE with less than condition should be evaluated as …
Browse files Browse the repository at this point in the history
…less than

INSERT REPLACE WHERE with a less than condition was evaluated as a less than or equal condition. This was wrong, and leads to data being deleted that shouldn't be.

GitOrigin-RevId: f188a2be4d62f87a0711a93c4c75168bab98d8d9
  • Loading branch information
fred-db authored and vkorukanti committed Jan 26, 2023
1 parent f934153 commit 6e48267
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object DeltaSourceUtils {
expressions.GreaterThanOrEqual(
UnresolvedAttribute(attribute), expressions.Literal.create(value))
case sources.LessThan(attribute, value) =>
expressions.LessThanOrEqual(UnresolvedAttribute(attribute), expressions.Literal.create(value))
expressions.LessThan(UnresolvedAttribute(attribute), expressions.Literal.create(value))
case sources.LessThanOrEqual(attribute, value) =>
expressions.LessThanOrEqual(UnresolvedAttribute(attribute), expressions.Literal.create(value))
case sources.In(attribute, values) =>
Expand Down
15 changes: 14 additions & 1 deletion core/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{asc, expr, lit, map_values, struct}
import org.apache.spark.sql.functions.{asc, col, expr, lit, map_values, struct}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
Expand Down Expand Up @@ -3047,4 +3047,17 @@ class DeltaNameColumnMappingSuite extends DeltaSuite
}
}
}

test("replaceWhere dataframe V2 API with less than predicate") {
withTempDir { dir =>
val insertedDF = spark.range(10).toDF()

insertedDF.write.format("delta").save(dir.toString)

val otherDF = spark.range(start = 0, end = 4).toDF()
otherDF.writeTo(s"delta.`${dir.toString}`").overwrite(col("id") < 6)
checkAnswer(spark.read.load(dir.toString),
insertedDF.filter(col("id") >= 6).union(otherDF))
}
}
}

0 comments on commit 6e48267

Please sign in to comment.