From e14a32f9b86ba7a77f893bc279e2a4780449b27b Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Tue, 28 Mar 2023 09:49:59 -0400 Subject: [PATCH 1/7] Add row level results for MinLength --- pom.xml | 2 +- .../amazon/deequ/analyzers/MinLength.scala | 6 +- .../com/amazon/deequ/analyzers/Minimum.scala | 6 +- .../amazon/deequ/constraints/Constraint.scala | 8 ++- .../amazon/deequ/VerificationSuiteTest.scala | 45 ++++++++++++- .../amazon/deequ/analyzers/AnalysisTest.scala | 10 ++- .../deequ/analyzers/MinLengthTest.scala | 67 +++++++++++++++++++ .../deequ/analyzers/StateProviderTest.scala | 20 +++++- 8 files changed, 154 insertions(+), 10 deletions(-) create mode 100644 src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala diff --git a/pom.xml b/pom.xml index 4346ddf61..ab9a21d37 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazon.deequ deequ - 2.0.3-spark-3.3 + 2.0.1-spark-3.1 1.8 diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index dc123003a..6c2f24452 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -18,6 +18,7 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} +import com.google.common.annotations.VisibleForTesting import org.apache.spark.sql.functions.{length, min} import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.sql.{Column, Row} @@ -32,7 +33,7 @@ case class MinLength(column: String, where: Option[String] = None) override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { ifNoNullsIn(result, offset) { _ => - MinState(result.getDouble(offset)) + MinState(result.getDouble(offset), Some(criterion)) } } @@ -41,4 +42,7 @@ case class MinLength(column: String, where: Option[String] = None) } override def filterCondition: Option[String] = where + + @VisibleForTesting + private[deequ] def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala b/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala index 68ef926cf..6a3376ddd 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala @@ -21,11 +21,13 @@ import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.functions.min import org.apache.spark.sql.types.{DoubleType, StructType} import Analyzers._ +import com.amazon.deequ.metrics.FullColumn -case class MinState(minValue: Double) extends DoubleValuedState[MinState] { +case class MinState(minValue: Double, override val fullColumn: Option[Column] = None) + extends DoubleValuedState[MinState] with FullColumn { override def sum(other: MinState): MinState = { - MinState(math.min(minValue, other.minValue)) + MinState(math.min(minValue, other.minValue), sum(fullColumn, other.fullColumn)) } override def metricValue(): Double = { diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 0afff24f6..6a8052f65 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -462,7 +462,13 @@ object Constraint { val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion, hint = hint) - new NamedConstraint(constraint, s"MinLengthConstraint($minLength)") + val sparkAssertion = org.apache.spark.sql.functions.udf(assertion) + + new RowLevelAssertedConstraint( + constraint, + s"MinLengthConstraint($minLength)", + s"ColumnLength-$column", + sparkAssertion) } /** diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 3a342f2f6..f3167485e 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -167,7 +167,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val isComplete = new Check(CheckLevel.Error, "rule1").isComplete("att1") val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") - val minLength = new Check(CheckLevel.Error, "rule4").hasMaxLength("item", _ <= 3) + val minLength = new Check(CheckLevel.Error, "rule4").hasMinLength("item", _ <= 3) val maxLength = new Check(CheckLevel.Error, "rule5").hasMaxLength("item", _ > 1) val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description @@ -206,6 +206,49 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec assert(Seq(false, true, true, true, true, true).sameElements(rowLevel4)) } + "generate a result that contains row-level results for length" in withSparkSession { session => + val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session) + + val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ <= 3) + val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ > 1) + val isLengthMin = new Check(CheckLevel.Error, "rule3").hasMinLength("att1", _ <= 1) + val isLengthMax = new Check(CheckLevel.Error, "rule4").hasMaxLength("att1", _ >= 1) + val expectedColumn1 = minLength.description + val expectedColumn2 = maxLength.description + val expectedColumn3 = isLengthMin.description + val expectedColumn4 = isLengthMax.description + + val suite = new VerificationSuite().onData(data) + .addCheck(minLength) + .addCheck(maxLength) + .addCheck(isLengthMin) + .addCheck(isLengthMax) + + val result: VerificationResult = suite.run() + + assert(result.status == CheckStatus.Success) + + val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) + + resultData.show() + val expectedColumns: Set[String] = + data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4 + assert(resultData.columns.toSet == expectedColumns) + + + val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, true, false, false, false).sameElements(rowLevel1)) + + val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) + assert(Seq(false, true, true, true, true, true).sameElements(rowLevel2)) + + val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getAs[Boolean](0)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3)) + + val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getAs[Boolean](0)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel4)) + } + "accept analysis config for mandatory analysis" in withSparkSession { sparkSession => import sparkSession.implicits._ diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala index cd1b8fd4d..72945e422 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalysisTest.scala @@ -158,8 +158,14 @@ class AnalysisTest extends AnyWordSpec with Matchers with SparkContextSpec with fullColumn.isDefined shouldBe true } - resultMetrics should contain(DoubleMetric(Entity.Column, "MinLength", "att1", - Success(0.0))) + val minLengthMetric = resultMetrics.tail.head + inside (minLengthMetric) { case DoubleMetric(entity, name, instance, value, fullColumn) => + entity shouldBe Entity.Column + name shouldBe "MinLength" + instance shouldBe "att1" + value shouldBe Success(0.0) + fullColumn.isDefined shouldBe true + } } "return the proper exception for non existing columns" in withSparkSession { sparkSession => diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala new file mode 100644 index 000000000..96b4e3fb4 --- /dev/null +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -0,0 +1,67 @@ +/** + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + + +package com.amazon.deequ.analyzers + +import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.FullColumn +import com.amazon.deequ.utils.FixtureSupport +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with FixtureSupport { + + "MinLength" should { + "return row-level results for non-null columns" in withSparkSession { session => + + val data = getDfWithStringColumns(session) + + val countryLength = MinLength("Country") // It's "India" in every row + val state: Option[MinState] = countryLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = countryLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0) + } + + "return row-level results for null columns" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + val addressLength = MinLength("att3") // It's null in two rows + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) + } + + "return row-level results for blank strings" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + val addressLength = MinLength("att1") // It's empty strings + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + } + } + +} diff --git a/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala b/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala index 0c76bc095..d1e9c7086 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/StateProviderTest.scala @@ -87,13 +87,13 @@ class StateProviderTest extends AnyWordSpec assertCorrectlyRestoresState[SumState](provider, provider, Sum("price"), data) assertCorrectlyRestoresState[MeanState](provider, provider, Mean("price"), data) - assertCorrectlyRestoresState[MinState](provider, provider, Minimum("price"), data) + assertCorrectlyRestoresMinState(provider, provider, Minimum("price"), data) assertCorrectlyRestoresMaxState(provider, provider, Maximum("price"), data) assertCorrectlyRestoresState[StandardDeviationState](provider, provider, StandardDeviation("price"), data) assertCorrectlyRestoresMaxState(provider, provider, MaxLength("att1"), data) - assertCorrectlyRestoresState[MinState](provider, provider, MinLength("att1"), data) + assertCorrectlyRestoresMinState(provider, provider, MinLength("att1"), data) assertCorrectlyRestoresState[DataTypeHistogram](provider, provider, DataType("item"), data) assertCorrectlyRestoresStateForHLL(provider, provider, ApproxCountDistinct("att1"), data) @@ -204,6 +204,22 @@ class StateProviderTest extends AnyWordSpec assert(MaxState(expectedState.maxValue, None) == restoredState.get) } + def assertCorrectlyRestoresMinState(persister: StatePersister, + loader: StateLoader, + analyzer: Analyzer[MinState, _], + data: DataFrame): Unit = { + + val stateResult = analyzer.computeStateFrom(data) + assert(stateResult.isDefined) + val expectedState = stateResult.get + + persister.persist[MinState](analyzer, expectedState) + val restoredState = loader.load[MinState](analyzer) + + assert(restoredState.isDefined) + assert(MinState(expectedState.minValue, None) == restoredState.get) + } + def assertCorrectlyRestoresState[S <: State[S]]( persister: StatePersister, loader: StateLoader, From e4d6d3e84e336e8188891e2a3868fae891e738c8 Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Mon, 10 Apr 2023 11:40:17 -0400 Subject: [PATCH 2/7] Modify MinLength and MaxLength to add option to convert null values --- .../com/amazon/deequ/VerificationResult.scala | 9 +++--- .../com/amazon/deequ/analyzers/Analyzer.scala | 22 ++++++++++---- .../amazon/deequ/analyzers/MaxLength.scala | 30 +++++++++++++------ .../amazon/deequ/analyzers/MinLength.scala | 29 ++++++++++++------ .../scala/com/amazon/deequ/checks/Check.scala | 10 ++++--- .../amazon/deequ/constraints/Constraint.scala | 10 ++++--- .../amazon/deequ/VerificationSuiteTest.scala | 26 +++++++++------- .../deequ/analyzers/MaxLengthTest.scala | 12 ++++++++ .../deequ/analyzers/MinLengthTest.scala | 12 ++++++++ 9 files changed, 114 insertions(+), 46 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index c4c4ea869..e0a328aa8 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -18,17 +18,18 @@ package com.amazon.deequ import com.amazon.deequ.analyzers.Analyzer import com.amazon.deequ.analyzers.runners.AnalyzerContext -import com.amazon.deequ.checks.{Check, CheckResult, CheckStatus} -import com.amazon.deequ.constraints.AnalysisBasedConstraint +import com.amazon.deequ.checks.Check +import com.amazon.deequ.checks.CheckResult +import com.amazon.deequ.checks.CheckStatus import com.amazon.deequ.constraints.ConstraintResult -import com.amazon.deequ.constraints.NamedConstraint import com.amazon.deequ.constraints.RowLevelAssertedConstraint import com.amazon.deequ.constraints.RowLevelConstraint import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository.SimpleResultSerde import org.apache.spark.sql.Column -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SparkSession /** * The result returned from the VerificationSuite diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index eae17459f..e317049a2 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -17,16 +17,22 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ -import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} import com.amazon.deequ.analyzers.runners._ import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity +import com.amazon.deequ.metrics.Metric +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.Column +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession import scala.language.existentials -import scala.util.{Failure, Success} +import scala.util.Failure +import scala.util.Success /** * A state (sufficient statistic) computed from data, from which we can compute a metric. @@ -449,6 +455,12 @@ private[deequ] object Analyzers { if (columns.size == 1) Entity.Column else Entity.Mutlicolumn } + def conditionalSelectionForLength(selection: Column, where: Option[String], replaceWith: Double): Column = { + val conditionColumn = where.map { expression => expr(expression) } + conditionColumn + .map { condition => when(condition, replaceWith).otherwise(selection) } + .getOrElse(selection) + } def conditionalSelection(selection: String, where: Option[String]): Column = { conditionalSelection(col(selection), where) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index 8aa00451e..d4d3dfb84 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -17,22 +17,26 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ -import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} -import org.apache.spark.sql.functions.{length, max} -import org.apache.spark.sql.types.{DoubleType, StructType} -import org.apache.spark.sql.{Column, Row} - -case class MaxLength(column: String, where: Option[String] = None) +import com.amazon.deequ.analyzers.Preconditions.hasColumn +import com.amazon.deequ.analyzers.Preconditions.isString +import org.apache.spark.sql.Column +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.length +import org.apache.spark.sql.functions.max +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.StructType + +case class MaxLength(column: String, where: Option[String] = None, convertNull: Boolean = false) extends StandardScanShareableAnalyzer[MaxState]("MaxLength", column) with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - max(criterion) :: Nil + max(criterion(false)) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = { ifNoNullsIn(result, offset) { _ => - MaxState(result.getDouble(offset), Some(criterion)) + MaxState(result.getDouble(offset), Some(criterion(convertNull))) } } @@ -42,5 +46,13 @@ case class MaxLength(column: String, where: Option[String] = None) override def filterCondition: Option[String] = where - private def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType) + private def criterion(convertNull: Boolean): Column = { + if (convertNull) { + val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) + conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Double.MinValue) + } else { + length(conditionalSelection(column, where)).cast(DoubleType) + } + } + } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index 6c2f24452..a86698797 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -17,23 +17,27 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ -import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} +import com.amazon.deequ.analyzers.Preconditions.hasColumn +import com.amazon.deequ.analyzers.Preconditions.isString import com.google.common.annotations.VisibleForTesting -import org.apache.spark.sql.functions.{length, min} -import org.apache.spark.sql.types.{DoubleType, StructType} -import org.apache.spark.sql.{Column, Row} - -case class MinLength(column: String, where: Option[String] = None) +import org.apache.spark.sql.functions.length +import org.apache.spark.sql.functions.min +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.Column +import org.apache.spark.sql.Row + +case class MinLength(column: String, where: Option[String] = None, convertNull: Boolean = false) extends StandardScanShareableAnalyzer[MinState]("MinLength", column) with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - min(length(conditionalSelection(column, where))).cast(DoubleType) :: Nil + min(criterion(false)) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { ifNoNullsIn(result, offset) { _ => - MinState(result.getDouble(offset), Some(criterion)) + MinState(result.getDouble(offset), Some(criterion(convertNull))) } } @@ -44,5 +48,12 @@ case class MinLength(column: String, where: Option[String] = None) override def filterCondition: Option[String] = where @VisibleForTesting - private[deequ] def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType) + private[deequ] def criterion(convertNull: Boolean): Column = { + if (convertNull) { + val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) + conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Double.MaxValue) + } else { + length(conditionalSelection(column, where)).cast(DoubleType) + } + } } diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 500326f52..0e5f02d6c 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -516,10 +516,11 @@ case class Check( def hasMinLength( column: String, assertion: Double => Boolean, - hint: Option[String] = None) + hint: Option[String] = None, + convertNull: Boolean = false) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint) } + addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint, convertNull) } } /** @@ -533,10 +534,11 @@ case class Check( def hasMaxLength( column: String, assertion: Double => Boolean, - hint: Option[String] = None) + hint: Option[String] = None, + convertNull: Boolean = false) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint) } + addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint, convertNull) } } /** diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 6a8052f65..dc338ebe4 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -426,10 +426,11 @@ object Constraint { column: String, assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + convertNull: Boolean = false) : Constraint = { - val maxLength = MaxLength(column, where) + val maxLength = MaxLength(column, where, convertNull) val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion, hint = hint) @@ -454,10 +455,11 @@ object Constraint { column: String, assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + convertNull: Boolean = false) : Constraint = { - val minLength = MinLength(column, where) + val minLength = MinLength(column, where, convertNull) val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion, hint = hint) diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index f3167485e..b0fb9812c 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -26,9 +26,9 @@ import com.amazon.deequ.constraints.Constraint import com.amazon.deequ.io.DfsUtils import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.Entity -import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.repository.MetricsRepository import com.amazon.deequ.repository.ResultKey +import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.utils.CollectionUtils.SeqExtensions import com.amazon.deequ.utils.FixtureSupport import com.amazon.deequ.utils.TempFileUtils @@ -167,8 +167,10 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val isComplete = new Check(CheckLevel.Error, "rule1").isComplete("att1") val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") - val minLength = new Check(CheckLevel.Error, "rule4").hasMinLength("item", _ <= 3) - val maxLength = new Check(CheckLevel.Error, "rule5").hasMaxLength("item", _ > 1) + val minLength = new Check(CheckLevel.Error, "rule4") + .hasMinLength("item", _ <= 3, convertNull = true) + val maxLength = new Check(CheckLevel.Error, "rule5") + .hasMaxLength("item", _ > 1, convertNull = true) val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description val expectedColumn3 = minLength.description @@ -199,10 +201,10 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2)) - val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getAs[Boolean](0)) + val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) assert(Seq(true, true, true, false, false, false).sameElements(rowLevel3)) - val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getAs[Boolean](0)) + val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) assert(Seq(false, true, true, true, true, true).sameElements(rowLevel4)) } @@ -211,8 +213,10 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ <= 3) val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ > 1) - val isLengthMin = new Check(CheckLevel.Error, "rule3").hasMinLength("att1", _ <= 1) - val isLengthMax = new Check(CheckLevel.Error, "rule4").hasMaxLength("att1", _ >= 1) + val isLengthMin = new Check(CheckLevel.Error, "rule3") + .hasMinLength("att2", _ <= 1, convertNull = true) + val isLengthMax = new Check(CheckLevel.Error, "rule4") + .hasMaxLength("att2", _ >= 1, convertNull = true) val expectedColumn1 = minLength.description val expectedColumn2 = maxLength.description val expectedColumn3 = isLengthMin.description @@ -242,11 +246,11 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) assert(Seq(false, true, true, true, true, true).sameElements(rowLevel2)) - val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3)) + val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel3)) - val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(true, true, true, true, true, true).sameElements(rowLevel4)) + val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel4)) } "accept analysis config for mandatory analysis" in withSparkSession { sparkSession => diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index b3f9ae96c..1f42942dc 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -49,6 +49,18 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) } + "return row-level results for null columns with convertNull option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + val addressLength = MaxLength("att3", convertNull = true) // It's null in two rows + val state: Option[MaxState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MinValue, 1.0, Double.MinValue, 1.0) + } + "return row-level results for blank strings" in withSparkSession { session => val data = getEmptyColumnDataDf(session) diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala index 96b4e3fb4..f899984d8 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -51,6 +51,18 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) } + "return row-level results for null columns with convertNull option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + val addressLength = MinLength("att3", convertNull = true) // It's null in two rows + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MaxValue, 1.0, Double.MaxValue, 1.0) + } + "return row-level results for blank strings" in withSparkSession { session => val data = getEmptyColumnDataDf(session) From 4d76899c8df054b0840a2c5a75fb8f3825d2fbf4 Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Mon, 10 Apr 2023 16:29:27 -0400 Subject: [PATCH 3/7] Use AnalyzerOptions Case Class for the converting null option --- pom.xml | 2 +- .../scala/com/amazon/deequ/analyzers/Analyzer.scala | 6 ++++++ .../com/amazon/deequ/analyzers/MaxLength.scala | 5 ++++- .../com/amazon/deequ/analyzers/MinLength.scala | 5 ++++- src/main/scala/com/amazon/deequ/checks/Check.scala | 13 +++++++------ .../com/amazon/deequ/constraints/Constraint.scala | 8 ++++---- .../com/amazon/deequ/VerificationSuiteTest.scala | 8 ++++---- .../com/amazon/deequ/analyzers/MaxLengthTest.scala | 2 +- .../com/amazon/deequ/analyzers/MinLengthTest.scala | 2 +- .../deequ/repository/AnalysisResultSerdeTest.scala | 8 ++++---- 10 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index ab9a21d37..4346ddf61 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazon.deequ deequ - 2.0.1-spark-3.1 + 2.0.3-spark-3.3 1.8 diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index e317049a2..65bb8e984 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -255,6 +255,12 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo } } +case class AnalyzerOptions(convertNull: Boolean) { + def getConvertNull(): Boolean = { + convertNull + } +} + /** Base class for analyzers that compute ratios of matching predicates */ abstract class PredicateMatchingAnalyzer( name: String, diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index d4d3dfb84..ff48fc3bb 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.functions.max import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.types.StructType -case class MaxLength(column: String, where: Option[String] = None, convertNull: Boolean = false) +case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[MaxState]("MaxLength", column) with FilterableAnalyzer { @@ -35,6 +35,9 @@ case class MaxLength(column: String, where: Option[String] = None, convertNull: } override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = { + val convertNull: Boolean = analyzerOptions + .map { options => options.getConvertNull() } + .getOrElse(false) ifNoNullsIn(result, offset) { _ => MaxState(result.getDouble(offset), Some(criterion(convertNull))) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index a86698797..90a99b305 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Column import org.apache.spark.sql.Row -case class MinLength(column: String, where: Option[String] = None, convertNull: Boolean = false) +case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[MinState]("MinLength", column) with FilterableAnalyzer { @@ -36,6 +36,9 @@ case class MinLength(column: String, where: Option[String] = None, convertNull: } override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { + val convertNull: Boolean = analyzerOptions + .map { options => options.getConvertNull() } + .getOrElse(false) ifNoNullsIn(result, offset) { _ => MinState(result.getDouble(offset), Some(criterion(convertNull))) } diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 0e5f02d6c..856abed81 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -16,16 +16,17 @@ package com.amazon.deequ.checks +import com.amazon.deequ.analyzers.AnalyzerOptions import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint} import com.amazon.deequ.analyzers.runners.AnalyzerContext -import com.amazon.deequ.analyzers.{Analyzer, Histogram, Patterns, State, KLLParameters} +import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State} import com.amazon.deequ.constraints.Constraint._ import com.amazon.deequ.constraints._ import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} import com.amazon.deequ.repository.MetricsRepository import org.apache.spark.sql.expressions.UserDefinedFunction import com.amazon.deequ.anomalydetection.HistoryUtils -import com.amazon.deequ.checks.ColumnCondition.{isEachNotNull, isAnyNotNull} +import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull} import scala.util.matching.Regex @@ -517,10 +518,10 @@ case class Check( column: String, assertion: Double => Boolean, hint: Option[String] = None, - convertNull: Boolean = false) + analyzerOptions: Option[AnalyzerOptions] = None) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint, convertNull) } + addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint, analyzerOptions) } } /** @@ -535,10 +536,10 @@ case class Check( column: String, assertion: Double => Boolean, hint: Option[String] = None, - convertNull: Boolean = false) + analyzerOptions: Option[AnalyzerOptions] = None) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint, convertNull) } + addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint, analyzerOptions) } } /** diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index dc338ebe4..1ccf2ce41 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -427,10 +427,10 @@ object Constraint { assertion: Double => Boolean, where: Option[String] = None, hint: Option[String] = None, - convertNull: Boolean = false) + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val maxLength = MaxLength(column, where, convertNull) + val maxLength = MaxLength(column, where, analyzerOptions) val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion, hint = hint) @@ -456,10 +456,10 @@ object Constraint { assertion: Double => Boolean, where: Option[String] = None, hint: Option[String] = None, - convertNull: Boolean = false) + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val minLength = MinLength(column, where, convertNull) + val minLength = MinLength(column, where, analyzerOptions) val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion, hint = hint) diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index b0fb9812c..42ae7d3e1 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -168,9 +168,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") val minLength = new Check(CheckLevel.Error, "rule4") - .hasMinLength("item", _ <= 3, convertNull = true) + .hasMinLength("item", _ <= 3, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val maxLength = new Check(CheckLevel.Error, "rule5") - .hasMaxLength("item", _ > 1, convertNull = true) + .hasMaxLength("item", _ > 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description val expectedColumn3 = minLength.description @@ -214,9 +214,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ <= 3) val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ > 1) val isLengthMin = new Check(CheckLevel.Error, "rule3") - .hasMinLength("att2", _ <= 1, convertNull = true) + .hasMinLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val isLengthMax = new Check(CheckLevel.Error, "rule4") - .hasMaxLength("att2", _ >= 1, convertNull = true) + .hasMaxLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val expectedColumn1 = minLength.description val expectedColumn2 = maxLength.description val expectedColumn3 = isLengthMin.description diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index 1f42942dc..c6e6aa510 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -53,7 +53,7 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val data = getEmptyColumnDataDf(session) - val addressLength = MaxLength("att3", convertNull = true) // It's null in two rows + val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(true))) // It's null in two rows val state: Option[MaxState] = addressLength.computeStateFrom(data) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala index f899984d8..951fbe82e 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -55,7 +55,7 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val data = getEmptyColumnDataDf(session) - val addressLength = MinLength("att3", convertNull = true) // It's null in two rows + val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(true))) // It's null in two rows val state: Option[MinState] = addressLength.computeStateFrom(data) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala index 61cb40e41..aeec94994 100644 --- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala @@ -228,10 +228,6 @@ class SimpleResultSerdeTest extends WordSpec with Matchers with SparkContextSpec |"instance":"att2","name":"Completeness","value":1.0}, |{"dataset_date":1507975810,"entity":"Column","region":"EU", |"instance":"att1","name":"Completeness","value":1.0}, - |{"dataset_date":1507975810,"entity":"Column","region":"EU", - |"instance":"att1","name":"MinLength","value":1.0}, - |{"dataset_date":1507975810,"entity":"Column","region":"EU", - |"instance":"att1","name":"MaxLength","value":1.0}, |{"dataset_date":1507975810,"entity":"Mutlicolumn","region":"EU", |"instance":"att1,att2","name":"MutualInformation","value":0.5623351446188083}, |{"dataset_date":1507975810,"entity":"Dataset","region":"EU", @@ -241,6 +237,10 @@ class SimpleResultSerdeTest extends WordSpec with Matchers with SparkContextSpec |{"dataset_date":1507975810,"entity":"Column","region":"EU", |"instance":"att1","name":"Distinctness","value":0.5}, |{"dataset_date":1507975810,"entity":"Column","region":"EU", + |"instance":"att1","name":"MinLength","value":1.0}, + |{"dataset_date":1507975810,"entity":"Column","region":"EU", + |"instance":"att1","name":"MaxLength","value":1.0}, + |{"dataset_date":1507975810,"entity":"Column","region":"EU", |"instance":"att2","name":"Uniqueness","value":0.25}]""" .stripMargin.replaceAll("\n", "") From f170a3b40cedbb1862139313fbe9ab8ba5b2cc8c Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Tue, 11 Apr 2023 17:32:35 -0400 Subject: [PATCH 4/7] Revise how nulls are converted for min and max length --- pom.xml | 2 +- .../amazon/deequ/analyzers/MaxLength.scala | 5 +- .../amazon/deequ/analyzers/MinLength.scala | 5 +- .../amazon/deequ/VerificationSuiteTest.scala | 55 +++++++++++++++---- .../deequ/analyzers/MaxLengthTest.scala | 2 +- .../deequ/analyzers/MinLengthTest.scala | 1 - .../amazon/deequ/utils/FixtureSupport.scala | 11 ++++ 7 files changed, 63 insertions(+), 18 deletions(-) diff --git a/pom.xml b/pom.xml index 4346ddf61..889aa8795 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazon.deequ deequ - 2.0.3-spark-3.3 + 2.0.0-spark-3.1 1.8 diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index ff48fc3bb..e87f9bd9b 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.functions.length import org.apache.spark.sql.functions.max import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) @@ -51,8 +52,8 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio private def criterion(convertNull: Boolean): Column = { if (convertNull) { - val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) - conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Double.MinValue) + val colLengths: Column = length(conditionalSelection(column, where)).cast(IntegerType) + conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Integer.MAX_VALUE) } else { length(conditionalSelection(column, where)).cast(DoubleType) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index 90a99b305..74cd67b95 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Column import org.apache.spark.sql.Row +import org.apache.spark.sql.types.IntegerType case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[MinState]("MinLength", column) @@ -53,8 +54,8 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio @VisibleForTesting private[deequ] def criterion(convertNull: Boolean): Column = { if (convertNull) { - val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) - conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Double.MaxValue) + val colLengths: Column = length(conditionalSelection(column, where)).cast(IntegerType) + conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Integer.MIN_VALUE) } else { length(conditionalSelection(column, where)).cast(DoubleType) } diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 42ae7d3e1..2b073182e 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -168,9 +168,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") val minLength = new Check(CheckLevel.Error, "rule4") - .hasMinLength("item", _ <= 3, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + .hasMinLength("item", _ >= 3, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val maxLength = new Check(CheckLevel.Error, "rule5") - .hasMaxLength("item", _ > 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + .hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description val expectedColumn3 = minLength.description @@ -202,21 +202,54 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2)) val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) - assert(Seq(true, true, true, false, false, false).sameElements(rowLevel3)) + assert(Seq(false, false, true, true, true, true).sameElements(rowLevel3)) val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) - assert(Seq(false, true, true, true, true, true).sameElements(rowLevel4)) + assert(Seq(true, false, false, false, false, false).sameElements(rowLevel4)) + } + + "generate a result that contains row-level results for dates" in withSparkSession { session => + val data = getDateDf(session) + + val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("date", _ >= 10, + analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("date", _ <= 10, + analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + val expectedColumn1 = minLength.description + val expectedColumn2 = maxLength.description + + val suite = new VerificationSuite().onData(data) + .addCheck(minLength) + .addCheck(maxLength) + + val result: VerificationResult = suite.run() + + assert(result.status == CheckStatus.Success) + + val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) + + resultData.show() + val expectedColumns: Set[String] = + data.columns.toSet + expectedColumn1 + expectedColumn2 + assert(resultData.columns.toSet == expectedColumns) + + + val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) + assert(Seq(false, true, false, false, false).sameElements(rowLevel1)) + + val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) + assert(Seq(false, true, false, false, false).sameElements(rowLevel2)) } "generate a result that contains row-level results for length" in withSparkSession { session => val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session) - val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ <= 3) - val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ > 1) + val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ >= 3) + val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ < 1) val isLengthMin = new Check(CheckLevel.Error, "rule3") - .hasMinLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val isLengthMax = new Check(CheckLevel.Error, "rule4") - .hasMaxLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + .hasMaxLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) val expectedColumn1 = minLength.description val expectedColumn2 = maxLength.description val expectedColumn3 = isLengthMin.description @@ -230,7 +263,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val result: VerificationResult = suite.run() - assert(result.status == CheckStatus.Success) + assert(result.status == CheckStatus.Error) val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) @@ -241,10 +274,10 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) - assert(Seq(true, true, true, false, false, false).sameElements(rowLevel1)) + assert(Seq(false, false, true, true, true, true).sameElements(rowLevel1)) val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) - assert(Seq(false, true, true, true, true, true).sameElements(rowLevel2)) + assert(Seq(false, false, false, false, false, false).sameElements(rowLevel2)) val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) assert(Seq(true, true, false, true, false, true).sameElements(rowLevel3)) diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index c6e6aa510..4f3288651 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -58,7 +58,7 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) data.withColumn("new", metric.fullColumn.get) - .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MinValue, 1.0, Double.MinValue, 1.0) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Integer.MAX_VALUE, 1.0, Integer.MAX_VALUE, 1.0) } "return row-level results for blank strings" in withSparkSession { session => diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala index 951fbe82e..02dca7356 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -60,7 +60,6 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) data.withColumn("new", metric.fullColumn.get) - .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MaxValue, 1.0, Double.MaxValue, 1.0) } "return row-level results for blank strings" in withSparkSession { session => diff --git a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala index da54f6e5e..073f699ed 100644 --- a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala +++ b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala @@ -170,6 +170,17 @@ trait FixtureSupport { ).toDF("item", "att1", "att2") } + def getDateDf(sparkSession: SparkSession): DataFrame = { + import sparkSession.implicits._ + + Seq( + (100, "Furniture", "Product 1", 25, null), + (101, "Cosmetics", "Product 2", 20, "2022-01-05"), + (102, "Furniture", "Product 3", 30, null), + (103, "Electronics", "Product 4", 10, null), + (104, "Electronics", "Product 5", 50, null) + ).toDF("id", "product", "product_id", "units", "date") + } def getDfCompleteAndInCompleteColumnsDelta(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ From f6fb87467bd16fd10f344baab3270a6d77288171 Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Wed, 12 Apr 2023 12:05:06 -0400 Subject: [PATCH 5/7] Modify AnalyzerOptions to use NullBehavior enum --- pom.xml | 2 +- .../com/amazon/deequ/analyzers/Analyzer.scala | 21 +++++--- .../amazon/deequ/analyzers/MaxLength.scala | 30 ++++++----- .../amazon/deequ/analyzers/MinLength.scala | 34 +++++++----- .../amazon/deequ/VerificationSuiteTest.scala | 54 ++++++++----------- .../deequ/analyzers/MaxLengthTest.scala | 5 +- .../deequ/analyzers/MinLengthTest.scala | 4 +- 7 files changed, 80 insertions(+), 70 deletions(-) diff --git a/pom.xml b/pom.xml index 889aa8795..4346ddf61 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazon.deequ deequ - 2.0.0-spark-3.1 + 2.0.3-spark-3.3 1.8 diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index 65bb8e984..e3654e1a8 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -17,6 +17,7 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ +import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn @@ -255,12 +256,17 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo } } -case class AnalyzerOptions(convertNull: Boolean) { - def getConvertNull(): Boolean = { - convertNull +case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) { + def getNullBehavior(): NullBehavior = { + nullBehavior } } +object NullBehavior extends Enumeration { + type NullBehavior = Value + val Ignore, Empty, Fail = Value +} + /** Base class for analyzers that compute ratios of matching predicates */ abstract class PredicateMatchingAnalyzer( name: String, @@ -461,15 +467,16 @@ private[deequ] object Analyzers { if (columns.size == 1) Entity.Column else Entity.Mutlicolumn } - def conditionalSelectionForLength(selection: Column, where: Option[String], replaceWith: Double): Column = { + def conditionalSelection(selection: String, where: Option[String]): Column = { + conditionalSelection(col(selection), where) + } + + def conditionalSelection(selection: Column, where: Option[String], replaceWith: Any): Column = { val conditionColumn = where.map { expression => expr(expression) } conditionColumn .map { condition => when(condition, replaceWith).otherwise(selection) } .getOrElse(selection) } - def conditionalSelection(selection: String, where: Option[String]): Column = { - conditionalSelection(col(selection), where) - } def conditionalSelection(selection: Column, condition: Option[String]): Column = { val conditionColumn = condition.map { expression => expr(expression) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index e87f9bd9b..be0d3f479 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -17,14 +17,15 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ +import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.Preconditions.hasColumn import com.amazon.deequ.analyzers.Preconditions.isString import org.apache.spark.sql.Column import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.length import org.apache.spark.sql.functions.max import org.apache.spark.sql.types.DoubleType -import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) @@ -32,15 +33,12 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - max(criterion(false)) :: Nil + max(criterion(getNullBehavior())) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = { - val convertNull: Boolean = analyzerOptions - .map { options => options.getConvertNull() } - .getOrElse(false) ifNoNullsIn(result, offset) { _ => - MaxState(result.getDouble(offset), Some(criterion(convertNull))) + MaxState(result.getDouble(offset), Some(criterion(getNullBehavior()))) } } @@ -50,13 +48,19 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio override def filterCondition: Option[String] = where - private def criterion(convertNull: Boolean): Column = { - if (convertNull) { - val colLengths: Column = length(conditionalSelection(column, where)).cast(IntegerType) - conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Integer.MAX_VALUE) - } else { - length(conditionalSelection(column, where)).cast(DoubleType) + private def criterion(nullBehavior: NullBehavior): Column = { + nullBehavior match { + case NullBehavior.Fail => + val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) + conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MaxValue) + case NullBehavior.Empty => + length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType) + case _ => length(conditionalSelection(column, where)).cast(DoubleType) } } - + private def getNullBehavior(): NullBehavior = { + analyzerOptions + .map { options => options.getNullBehavior() } + .getOrElse(NullBehavior.Ignore) + } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index 74cd67b95..c8d6406ba 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -17,31 +17,29 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ +import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.Preconditions.hasColumn import com.amazon.deequ.analyzers.Preconditions.isString import com.google.common.annotations.VisibleForTesting +import org.apache.spark.sql.Column +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.length import org.apache.spark.sql.functions.min import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.Column -import org.apache.spark.sql.Row -import org.apache.spark.sql.types.IntegerType case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[MinState]("MinLength", column) with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - min(criterion(false)) :: Nil + min(criterion(getNullBehavior())) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { - val convertNull: Boolean = analyzerOptions - .map { options => options.getConvertNull() } - .getOrElse(false) ifNoNullsIn(result, offset) { _ => - MinState(result.getDouble(offset), Some(criterion(convertNull))) + MinState(result.getDouble(offset), Some(criterion(getNullBehavior()))) } } @@ -52,12 +50,20 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio override def filterCondition: Option[String] = where @VisibleForTesting - private[deequ] def criterion(convertNull: Boolean): Column = { - if (convertNull) { - val colLengths: Column = length(conditionalSelection(column, where)).cast(IntegerType) - conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Integer.MIN_VALUE) - } else { - length(conditionalSelection(column, where)).cast(DoubleType) + private[deequ] def criterion(nullBehavior: NullBehavior): Column = { + nullBehavior match { + case NullBehavior.Fail => + val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) + conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MinValue) + case NullBehavior.Empty => + length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType) + case _ => length(conditionalSelection(column, where)).cast(DoubleType) } } + + private def getNullBehavior(): NullBehavior = { + analyzerOptions + .map { options => options.getNullBehavior() } + .getOrElse(NullBehavior.Ignore) + } } diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 2b073182e..b97b847c5 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -168,9 +168,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") val minLength = new Check(CheckLevel.Error, "rule4") - .hasMinLength("item", _ >= 3, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + .hasMinLength("item", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val maxLength = new Check(CheckLevel.Error, "rule5") - .hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + .hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description val expectedColumn3 = minLength.description @@ -202,19 +202,19 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2)) val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) - assert(Seq(false, false, true, true, true, true).sameElements(rowLevel3)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3)) val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) assert(Seq(true, false, false, false, false, false).sameElements(rowLevel4)) } - "generate a result that contains row-level results for dates" in withSparkSession { session => - val data = getDateDf(session) + "generate a result that contains length row-level results with nullBehavior fail" in withSparkSession { session => + val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session) - val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("date", _ >= 10, - analyzerOptions = Option(AnalyzerOptions(convertNull = true))) - val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("date", _ <= 10, - analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + val minLength = new Check(CheckLevel.Error, "rule1") + .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) + val maxLength = new Check(CheckLevel.Error, "rule2") + .hasMaxLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val expectedColumn1 = minLength.description val expectedColumn2 = maxLength.description @@ -224,7 +224,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val result: VerificationResult = suite.run() - assert(result.status == CheckStatus.Success) + assert(result.status == CheckStatus.Error) val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) @@ -235,31 +235,27 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) - assert(Seq(false, true, false, false, false).sameElements(rowLevel1)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1)) val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) - assert(Seq(false, true, false, false, false).sameElements(rowLevel2)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2)) } - "generate a result that contains row-level results for length" in withSparkSession { session => + "generate a result that contains length row-level results with nullBehavior empty" in withSparkSession { session => val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session) - val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ >= 3) - val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ < 1) - val isLengthMin = new Check(CheckLevel.Error, "rule3") - .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) - val isLengthMax = new Check(CheckLevel.Error, "rule4") - .hasMaxLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true))) + // null should fail since length 0 is not >= 1 + val minLength = new Check(CheckLevel.Error, "rule1") + .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + // nulls should succeed since length 0 is < 2 + val maxLength = new Check(CheckLevel.Error, "rule2") + .hasMaxLength("att2", _ < 2, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) val expectedColumn1 = minLength.description val expectedColumn2 = maxLength.description - val expectedColumn3 = isLengthMin.description - val expectedColumn4 = isLengthMax.description val suite = new VerificationSuite().onData(data) .addCheck(minLength) .addCheck(maxLength) - .addCheck(isLengthMin) - .addCheck(isLengthMax) val result: VerificationResult = suite.run() @@ -269,21 +265,15 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec resultData.show() val expectedColumns: Set[String] = - data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4 + data.columns.toSet + expectedColumn1 + expectedColumn2 assert(resultData.columns.toSet == expectedColumns) val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) - assert(Seq(false, false, true, true, true, true).sameElements(rowLevel1)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1)) val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0)) - assert(Seq(false, false, false, false, false, false).sameElements(rowLevel2)) - - val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0)) - assert(Seq(true, true, false, true, false, true).sameElements(rowLevel3)) - - val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) - assert(Seq(true, true, false, true, false, true).sameElements(rowLevel4)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel2)) } "accept analysis config for mandatory analysis" in withSparkSession { sparkSession => diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index 4f3288651..28a2afc65 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -53,12 +53,13 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val data = getEmptyColumnDataDf(session) - val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(true))) // It's null in two rows + // It's null in two rows + val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val state: Option[MaxState] = addressLength.computeStateFrom(data) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) data.withColumn("new", metric.fullColumn.get) - .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Integer.MAX_VALUE, 1.0, Integer.MAX_VALUE, 1.0) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MaxValue, 1.0, Double.MaxValue, 1.0) } "return row-level results for blank strings" in withSparkSession { session => diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala index 02dca7356..3b61a63d6 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -55,11 +55,13 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val data = getEmptyColumnDataDf(session) - val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(true))) // It's null in two rows + // It's null in two rows + val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val state: Option[MinState] = addressLength.computeStateFrom(data) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MinValue, 1.0, Double.MinValue, 1.0) } "return row-level results for blank strings" in withSparkSession { session => From bdbd29409812db4c4809a409d7961254f9900bf7 Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Thu, 13 Apr 2023 15:17:28 -0400 Subject: [PATCH 6/7] Explicit typing for conditionalSelection and extra tests for Min&MaxLength --- .../com/amazon/deequ/analyzers/Analyzer.scala | 9 ++++++++- .../com/amazon/deequ/analyzers/MaxLength.scala | 4 ++-- .../com/amazon/deequ/analyzers/MinLength.scala | 4 ++-- .../amazon/deequ/analyzers/MaxLengthTest.scala | 15 ++++++++++++++- .../amazon/deequ/analyzers/MinLengthTest.scala | 15 ++++++++++++++- 5 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index e3654e1a8..dd3b10f6b 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -471,7 +471,14 @@ private[deequ] object Analyzers { conditionalSelection(col(selection), where) } - def conditionalSelection(selection: Column, where: Option[String], replaceWith: Any): Column = { + def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = { + val conditionColumn = where.map { expression => expr(expression) } + conditionColumn + .map { condition => when(condition, replaceWith).otherwise(selection) } + .getOrElse(selection) + } + + def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = { val conditionColumn = where.map { expression => expr(expression) } conditionColumn .map { condition => when(condition, replaceWith).otherwise(selection) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index be0d3f479..a358db97a 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -52,9 +52,9 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio nullBehavior match { case NullBehavior.Fail => val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) - conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MaxValue) + conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue) case NullBehavior.Empty => - length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType) + length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType) case _ => length(conditionalSelection(column, where)).cast(DoubleType) } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index c8d6406ba..40c0f6524 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -54,9 +54,9 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio nullBehavior match { case NullBehavior.Fail => val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) - conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MinValue) + conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue) case NullBehavior.Empty => - length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType) + length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType) case _ => length(conditionalSelection(column, where)).cast(DoubleType) } } diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index 28a2afc65..8536a6c1e 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -49,7 +49,7 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) } - "return row-level results for null columns with convertNull option" in withSparkSession { session => + "return row-level results for null columns with NullBehavior fail option" in withSparkSession { session => val data = getEmptyColumnDataDf(session) @@ -62,6 +62,19 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MaxValue, 1.0, Double.MaxValue, 1.0) } + "return row-level results for null columns with NullBehavior empty option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + // It's null in two rows + val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + val state: Option[MaxState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) + } + "return row-level results for blank strings" in withSparkSession { session => val data = getEmptyColumnDataDf(session) diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala index 3b61a63d6..71690371b 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -51,7 +51,7 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) } - "return row-level results for null columns with convertNull option" in withSparkSession { session => + "return row-level results for null columns with NullBehavior fail option" in withSparkSession { session => val data = getEmptyColumnDataDf(session) @@ -64,6 +64,19 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MinValue, 1.0, Double.MinValue, 1.0) } + "return row-level results for null columns with NullBehavior empty option" in withSparkSession { session => + + val data = getEmptyColumnDataDf(session) + + // It's null in two rows + val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + val state: Option[MinState] = addressLength.computeStateFrom(data) + val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) + + data.withColumn("new", metric.fullColumn.get) + .collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, 0.0, 1.0, 0.0, 1.0) + } + "return row-level results for blank strings" in withSparkSession { session => val data = getEmptyColumnDataDf(session) From 296965e8dcf2a9c0920ca399034bde23ba6dac91 Mon Sep 17 00:00:00 2001 From: Edward Cho Date: Thu, 13 Apr 2023 16:09:39 -0400 Subject: [PATCH 7/7] Styling fixes --- .../scala/com/amazon/deequ/analyzers/Analyzer.scala | 12 ++++-------- .../scala/com/amazon/deequ/analyzers/MaxLength.scala | 10 +++++----- .../scala/com/amazon/deequ/analyzers/MinLength.scala | 12 +++++------- .../com/amazon/deequ/VerificationSuiteTest.scala | 5 ++--- .../com/amazon/deequ/analyzers/MaxLengthTest.scala | 2 +- .../com/amazon/deequ/analyzers/MinLengthTest.scala | 2 +- 6 files changed, 18 insertions(+), 25 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index dd3b10f6b..1dd0bf248 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -256,15 +256,11 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo } } -case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) { - def getNullBehavior(): NullBehavior = { - nullBehavior - } -} +case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) object NullBehavior extends Enumeration { type NullBehavior = Value - val Ignore, Empty, Fail = Value + val Ignore, EmptyString, Fail = Value } /** Base class for analyzers that compute ratios of matching predicates */ @@ -472,14 +468,14 @@ private[deequ] object Analyzers { } def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = { - val conditionColumn = where.map { expression => expr(expression) } + val conditionColumn = where.map(expr) conditionColumn .map { condition => when(condition, replaceWith).otherwise(selection) } .getOrElse(selection) } def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = { - val conditionColumn = where.map { expression => expr(expression) } + val conditionColumn = where.map(expr) conditionColumn .map { condition => when(condition, replaceWith).otherwise(selection) } .getOrElse(selection) diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index a358db97a..2ad9a8ab6 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -33,12 +33,12 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - max(criterion(getNullBehavior())) :: Nil + max(criterion(getNullBehavior)) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = { ifNoNullsIn(result, offset) { _ => - MaxState(result.getDouble(offset), Some(criterion(getNullBehavior()))) + MaxState(result.getDouble(offset), Some(criterion(getNullBehavior))) } } @@ -53,14 +53,14 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio case NullBehavior.Fail => val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue) - case NullBehavior.Empty => + case NullBehavior.EmptyString => length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType) case _ => length(conditionalSelection(column, where)).cast(DoubleType) } } - private def getNullBehavior(): NullBehavior = { + private def getNullBehavior: NullBehavior = { analyzerOptions - .map { options => options.getNullBehavior() } + .map { options => options.nullBehavior } .getOrElse(NullBehavior.Ignore) } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index 40c0f6524..f2c2849a8 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -20,7 +20,6 @@ import com.amazon.deequ.analyzers.Analyzers._ import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.Preconditions.hasColumn import com.amazon.deequ.analyzers.Preconditions.isString -import com.google.common.annotations.VisibleForTesting import org.apache.spark.sql.Column import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col @@ -34,12 +33,12 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio with FilterableAnalyzer { override def aggregationFunctions(): Seq[Column] = { - min(criterion(getNullBehavior())) :: Nil + min(criterion(getNullBehavior)) :: Nil } override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { ifNoNullsIn(result, offset) { _ => - MinState(result.getDouble(offset), Some(criterion(getNullBehavior()))) + MinState(result.getDouble(offset), Some(criterion(getNullBehavior))) } } @@ -49,21 +48,20 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio override def filterCondition: Option[String] = where - @VisibleForTesting private[deequ] def criterion(nullBehavior: NullBehavior): Column = { nullBehavior match { case NullBehavior.Fail => val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue) - case NullBehavior.Empty => + case NullBehavior.EmptyString => length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType) case _ => length(conditionalSelection(column, where)).cast(DoubleType) } } - private def getNullBehavior(): NullBehavior = { + private def getNullBehavior: NullBehavior = { analyzerOptions - .map { options => options.getNullBehavior() } + .map { options => options.nullBehavior } .getOrElse(NullBehavior.Ignore) } } diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index b97b847c5..8f92e6066 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -233,7 +233,6 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec data.columns.toSet + expectedColumn1 + expectedColumn2 assert(resultData.columns.toSet == expectedColumns) - val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0)) assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1)) @@ -246,10 +245,10 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec // null should fail since length 0 is not >= 1 val minLength = new Check(CheckLevel.Error, "rule1") - .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + .hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) // nulls should succeed since length 0 is < 2 val maxLength = new Check(CheckLevel.Error, "rule2") - .hasMaxLength("att2", _ < 2, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + .hasMaxLength("att2", _ < 2, analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) val expectedColumn1 = minLength.description val expectedColumn2 = maxLength.description diff --git a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala index 8536a6c1e..1c8d67471 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala @@ -67,7 +67,7 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val data = getEmptyColumnDataDf(session) // It's null in two rows - val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) val state: Option[MaxState] = addressLength.computeStateFrom(data) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) diff --git a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala index 71690371b..b9d706a8a 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/MinLengthTest.scala @@ -69,7 +69,7 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with val data = getEmptyColumnDataDf(session) // It's null in two rows - val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty))) + val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.EmptyString))) val state: Option[MinState] = addressLength.computeStateFrom(data) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)