Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Length Row Level Results #465

Merged
merged 7 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Modify AnalyzerOptions to use NullBehavior enum
  • Loading branch information
eycho-am committed Apr 12, 2023
commit f6fb87467bd16fd10f344baab3270a6d77288171
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>2.0.0-spark-3.1</version>
<version>2.0.3-spark-3.3</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
21 changes: 14 additions & 7 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.runners._
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
Expand Down Expand Up @@ -255,12 +256,17 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo
}
}

case class AnalyzerOptions(convertNull: Boolean) {
def getConvertNull(): Boolean = {
convertNull
case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) {
def getNullBehavior(): NullBehavior = {
nullBehavior
}
}

object NullBehavior extends Enumeration {
type NullBehavior = Value
val Ignore, Empty, Fail = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does Empty mean for numbers? If it is only applicable for Strings, should we call it EmptyString ?

}

/** Base class for analyzers that compute ratios of matching predicates */
abstract class PredicateMatchingAnalyzer(
name: String,
Expand Down Expand Up @@ -461,15 +467,16 @@ private[deequ] object Analyzers {
if (columns.size == 1) Entity.Column else Entity.Mutlicolumn
}

def conditionalSelectionForLength(selection: Column, where: Option[String], replaceWith: Double): Column = {
def conditionalSelection(selection: String, where: Option[String]): Column = {
conditionalSelection(col(selection), where)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: Any): Column = {
eycho-am marked this conversation as resolved.
Show resolved Hide resolved
val conditionColumn = where.map { expression => expr(expression) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: where.map(expr)

conditionColumn
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}
def conditionalSelection(selection: String, where: Option[String]): Column = {
conditionalSelection(col(selection), where)
}

def conditionalSelection(selection: Column, condition: Option[String]): Column = {
val conditionColumn = condition.map { expression => expr(expression) }
Expand Down
30 changes: 17 additions & 13 deletions src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,28 @@
package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.Preconditions.hasColumn
import com.amazon.deequ.analyzers.Preconditions.isString
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.length
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType

case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None)
extends StandardScanShareableAnalyzer[MaxState]("MaxLength", column)
with FilterableAnalyzer {

override def aggregationFunctions(): Seq[Column] = {
max(criterion(false)) :: Nil
max(criterion(getNullBehavior())) :: Nil
}

override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = {
val convertNull: Boolean = analyzerOptions
.map { options => options.getConvertNull() }
.getOrElse(false)
ifNoNullsIn(result, offset) { _ =>
MaxState(result.getDouble(offset), Some(criterion(convertNull)))
MaxState(result.getDouble(offset), Some(criterion(getNullBehavior())))
}
}

Expand All @@ -50,13 +48,19 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio

override def filterCondition: Option[String] = where

private def criterion(convertNull: Boolean): Column = {
if (convertNull) {
val colLengths: Column = length(conditionalSelection(column, where)).cast(IntegerType)
conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Integer.MAX_VALUE)
} else {
length(conditionalSelection(column, where)).cast(DoubleType)
private def criterion(nullBehavior: NullBehavior): Column = {
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MaxValue)
case NullBehavior.Empty =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}

private def getNullBehavior(): NullBehavior = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The () can be removed. In idiomatic Scala, (), called an empty paren(theses) method, denotes that function performs some side effect. Like calling a service or even just logging. Here, we are just extracting data from an object, and therefore, we can remove this () from the method.

analyzerOptions
.map { options => options.getNullBehavior() }
.getOrElse(NullBehavior.Ignore)
}
}
34 changes: 20 additions & 14 deletions src/main/scala/com/amazon/deequ/analyzers/MinLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@
package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.Preconditions.hasColumn
import com.amazon.deequ.analyzers.Preconditions.isString
import com.google.common.annotations.VisibleForTesting
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.length
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType

case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None)
extends StandardScanShareableAnalyzer[MinState]("MinLength", column)
with FilterableAnalyzer {

override def aggregationFunctions(): Seq[Column] = {
min(criterion(false)) :: Nil
min(criterion(getNullBehavior())) :: Nil
}

override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = {
val convertNull: Boolean = analyzerOptions
.map { options => options.getConvertNull() }
.getOrElse(false)
ifNoNullsIn(result, offset) { _ =>
MinState(result.getDouble(offset), Some(criterion(convertNull)))
MinState(result.getDouble(offset), Some(criterion(getNullBehavior())))
}
}

Expand All @@ -52,12 +50,20 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio
override def filterCondition: Option[String] = where

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed if we are using package private modifier and have the test in the appropriate package?

private[deequ] def criterion(convertNull: Boolean): Column = {
if (convertNull) {
val colLengths: Column = length(conditionalSelection(column, where)).cast(IntegerType)
conditionalSelectionForLength(colLengths, Option(s"${column} IS NULL"), Integer.MIN_VALUE)
} else {
length(conditionalSelection(column, where)).cast(DoubleType)
private[deequ] def criterion(nullBehavior: NullBehavior): Column = {
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MinValue)
case NullBehavior.Empty =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}

private def getNullBehavior(): NullBehavior = {
analyzerOptions
.map { options => options.getNullBehavior() }
.getOrElse(NullBehavior.Ignore)
}
}
54 changes: 22 additions & 32 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7)
val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item")
val minLength = new Check(CheckLevel.Error, "rule4")
.hasMinLength("item", _ >= 3, analyzerOptions = Option(AnalyzerOptions(convertNull = true)))
.hasMinLength("item", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val maxLength = new Check(CheckLevel.Error, "rule5")
.hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true)))
.hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val expectedColumn1 = isComplete.description
val expectedColumn2 = completeness.description
val expectedColumn3 = minLength.description
Expand Down Expand Up @@ -202,19 +202,19 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2))

val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0))
assert(Seq(false, false, true, true, true, true).sameElements(rowLevel3))
assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3))

val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0))
assert(Seq(true, false, false, false, false, false).sameElements(rowLevel4))
}

"generate a result that contains row-level results for dates" in withSparkSession { session =>
val data = getDateDf(session)
"generate a result that contains length row-level results with nullBehavior fail" in withSparkSession { session =>
val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session)

val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("date", _ >= 10,
analyzerOptions = Option(AnalyzerOptions(convertNull = true)))
val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("date", _ <= 10,
analyzerOptions = Option(AnalyzerOptions(convertNull = true)))
val minLength = new Check(CheckLevel.Error, "rule1")
.hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val maxLength = new Check(CheckLevel.Error, "rule2")
.hasMaxLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val expectedColumn1 = minLength.description
val expectedColumn2 = maxLength.description

Expand All @@ -224,7 +224,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec

val result: VerificationResult = suite.run()

assert(result.status == CheckStatus.Success)
assert(result.status == CheckStatus.Error)

val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data)

Expand All @@ -235,31 +235,27 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec


val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0))
assert(Seq(false, true, false, false, false).sameElements(rowLevel1))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1))

val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0))
assert(Seq(false, true, false, false, false).sameElements(rowLevel2))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel2))
}

"generate a result that contains row-level results for length" in withSparkSession { session =>
"generate a result that contains length row-level results with nullBehavior empty" in withSparkSession { session =>
val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session)

val minLength = new Check(CheckLevel.Error, "rule1").hasMinLength("item", _ >= 3)
val maxLength = new Check(CheckLevel.Error, "rule2").hasMaxLength("item", _ < 1)
val isLengthMin = new Check(CheckLevel.Error, "rule3")
.hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true)))
val isLengthMax = new Check(CheckLevel.Error, "rule4")
.hasMaxLength("att2", _ <= 1, analyzerOptions = Option(AnalyzerOptions(convertNull = true)))
// null should fail since length 0 is not >= 1
val minLength = new Check(CheckLevel.Error, "rule1")
.hasMinLength("att2", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty)))
// nulls should succeed since length 0 is < 2
val maxLength = new Check(CheckLevel.Error, "rule2")
.hasMaxLength("att2", _ < 2, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Empty)))
val expectedColumn1 = minLength.description
val expectedColumn2 = maxLength.description
val expectedColumn3 = isLengthMin.description
val expectedColumn4 = isLengthMax.description

val suite = new VerificationSuite().onData(data)
.addCheck(minLength)
.addCheck(maxLength)
.addCheck(isLengthMin)
.addCheck(isLengthMax)

val result: VerificationResult = suite.run()

Expand All @@ -269,21 +265,15 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec

resultData.show()
val expectedColumns: Set[String] =
data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4
data.columns.toSet + expectedColumn1 + expectedColumn2
assert(resultData.columns.toSet == expectedColumns)


val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getBoolean(0))
assert(Seq(false, false, true, true, true, true).sameElements(rowLevel1))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel1))

val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getBoolean(0))
assert(Seq(false, false, false, false, false, false).sameElements(rowLevel2))

val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getBoolean(0))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel3))

val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel4))
assert(Seq(true, true, true, true, true, true).sameElements(rowLevel2))
}

"accept analysis config for mandatory analysis" in withSparkSession { sparkSession =>
Expand Down
5 changes: 3 additions & 2 deletions src/test/scala/com/amazon/deequ/analyzers/MaxLengthTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ class MaxLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with

val data = getEmptyColumnDataDf(session)

val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(true))) // It's null in two rows
// It's null in two rows
val addressLength = MaxLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val state: Option[MaxState] = addressLength.computeStateFrom(data)
val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)

data.withColumn("new", metric.fullColumn.get)
.collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Integer.MAX_VALUE, 1.0, Integer.MAX_VALUE, 1.0)
.collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MaxValue, 1.0, Double.MaxValue, 1.0)
}

"return row-level results for blank strings" in withSparkSession { session =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class MinLengthTest extends AnyWordSpec with Matchers with SparkContextSpec with

val data = getEmptyColumnDataDf(session)

val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(true))) // It's null in two rows
// It's null in two rows
val addressLength = MinLength("att3", analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val state: Option[MinState] = addressLength.computeStateFrom(data)
val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)

data.withColumn("new", metric.fullColumn.get)
.collect().map(_.getAs[Double]("new")) shouldBe Seq(1.0, 1.0, Double.MinValue, 1.0, Double.MinValue, 1.0)
}

"return row-level results for blank strings" in withSparkSession { session =>
Expand Down