Skip to content

Commit

Permalink
Feature: Length Row Level Results (#465)
Browse files Browse the repository at this point in the history
- Add row level results for MinLength in addition to MaxLength
- Add AnalzyerOptions case class with options to change null behavior (Ignore, EmptyString, Fail)
  • Loading branch information
eycho-am authored and rdsharma26 committed Apr 16, 2024
1 parent 6486fb2 commit 9a972e3
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 52 deletions.
9 changes: 5 additions & 4 deletions src/main/scala/com/amazon/deequ/VerificationResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package com.amazon.deequ

import com.amazon.deequ.analyzers.Analyzer
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.checks.{Check, CheckResult, CheckStatus}
import com.amazon.deequ.constraints.AnalysisBasedConstraint
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckResult
import com.amazon.deequ.checks.CheckStatus
import com.amazon.deequ.constraints.ConstraintResult
import com.amazon.deequ.constraints.NamedConstraint
import com.amazon.deequ.constraints.RowLevelAssertedConstraint
import com.amazon.deequ.constraints.RowLevelConstraint
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.metrics.Metric
import com.amazon.deequ.repository.SimpleResultSerde
import org.apache.spark.sql.Column
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

/**
* The result returned from the VerificationSuite
Expand Down
38 changes: 33 additions & 5 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@
package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.runners._
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import scala.language.existentials
import scala.util.{Failure, Success}
import scala.util.Failure
import scala.util.Success

/**
* A state (sufficient statistic) computed from data, from which we can compute a metric.
Expand Down Expand Up @@ -249,6 +256,13 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo
}
}

case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore)

object NullBehavior extends Enumeration {
type NullBehavior = Value
val Ignore, EmptyString, Fail = Value
}

/** Base class for analyzers that compute ratios of matching predicates */
abstract class PredicateMatchingAnalyzer(
name: String,
Expand Down Expand Up @@ -453,6 +467,20 @@ private[deequ] object Analyzers {
conditionalSelection(col(selection), where)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
val conditionColumn = where.map(expr)
conditionColumn
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
val conditionColumn = where.map(expr)
conditionColumn
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}

def conditionalSelection(selection: Column, condition: Option[String]): Column = {
val conditionColumn = condition.map { expression => expr(expression) }
conditionalSelectionFromColumns(selection, conditionColumn)
Expand Down
36 changes: 28 additions & 8 deletions src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@
package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.sql.types.{DoubleType, StructType}
import org.apache.spark.sql.{Column, Row}
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.Preconditions.hasColumn
import com.amazon.deequ.analyzers.Preconditions.isString
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.length
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType

case class MaxLength(column: String, where: Option[String] = None)
case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None)
extends StandardScanShareableAnalyzer[MaxState]("MaxLength", column)
with FilterableAnalyzer {

override def aggregationFunctions(): Seq[Column] = {
max(criterion) :: Nil
max(criterion(getNullBehavior)) :: Nil
}

override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = {
ifNoNullsIn(result, offset) { _ =>
MaxState(result.getDouble(offset), Some(criterion))
MaxState(result.getDouble(offset), Some(criterion(getNullBehavior)))
}
}

Expand All @@ -42,5 +48,19 @@ case class MaxLength(column: String, where: Option[String] = None)

override def filterCondition: Option[String] = where

private def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType)
private def criterion(nullBehavior: NullBehavior): Column = {
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue)
case NullBehavior.EmptyString =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}
private def getNullBehavior: NullBehavior = {
analyzerOptions
.map { options => options.nullBehavior }
.getOrElse(NullBehavior.Ignore)
}
}
37 changes: 30 additions & 7 deletions src/main/scala/com/amazon/deequ/analyzers/MinLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@
package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString}
import org.apache.spark.sql.functions.{length, min}
import org.apache.spark.sql.types.{DoubleType, StructType}
import org.apache.spark.sql.{Column, Row}
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.Preconditions.hasColumn
import com.amazon.deequ.analyzers.Preconditions.isString
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.length
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType

case class MinLength(column: String, where: Option[String] = None)
case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None)
extends StandardScanShareableAnalyzer[MinState]("MinLength", column)
with FilterableAnalyzer {

override def aggregationFunctions(): Seq[Column] = {
min(length(conditionalSelection(column, where))).cast(DoubleType) :: Nil
min(criterion(getNullBehavior)) :: Nil
}

override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = {
ifNoNullsIn(result, offset) { _ =>
MinState(result.getDouble(offset))
MinState(result.getDouble(offset), Some(criterion(getNullBehavior)))
}
}

Expand All @@ -41,4 +47,21 @@ case class MinLength(column: String, where: Option[String] = None)
}

override def filterCondition: Option[String] = where

private[deequ] def criterion(nullBehavior: NullBehavior): Column = {
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue)
case NullBehavior.EmptyString =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}

private def getNullBehavior: NullBehavior = {
analyzerOptions
.map { options => options.nullBehavior }
.getOrElse(NullBehavior.Ignore)
}
}
6 changes: 4 additions & 2 deletions src/main/scala/com/amazon/deequ/analyzers/Minimum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.types.{DoubleType, StructType}
import Analyzers._
import com.amazon.deequ.metrics.FullColumn

case class MinState(minValue: Double) extends DoubleValuedState[MinState] {
case class MinState(minValue: Double, override val fullColumn: Option[Column] = None)
extends DoubleValuedState[MinState] with FullColumn {

override def sum(other: MinState): MinState = {
MinState(math.min(minValue, other.minValue))
MinState(math.min(minValue, other.minValue), sum(fullColumn, other.fullColumn))
}

override def metricValue(): Double = {
Expand Down
15 changes: 9 additions & 6 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package com.amazon.deequ.checks

import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint}
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.analyzers.{Analyzer, Histogram, Patterns, State, KLLParameters}
import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.constraints.Constraint._
import com.amazon.deequ.constraints._
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric}
import com.amazon.deequ.repository.MetricsRepository
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.{isEachNotNull, isAnyNotNull}
import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull}

import scala.util.matching.Regex

Expand Down Expand Up @@ -516,10 +517,11 @@ case class Check(
def hasMinLength(
column: String,
assertion: Double => Boolean,
hint: Option[String] = None)
hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None)
: CheckWithLastConstraintFilterable = {

addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint) }
addFilterableConstraint { filter => minLengthConstraint(column, assertion, filter, hint, analyzerOptions) }
}

/**
Expand All @@ -533,10 +535,11 @@ case class Check(
def hasMaxLength(
column: String,
assertion: Double => Boolean,
hint: Option[String] = None)
hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None)
: CheckWithLastConstraintFilterable = {

addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint) }
addFilterableConstraint { filter => maxLengthConstraint(column, assertion, filter, hint, analyzerOptions) }
}

/**
Expand Down
18 changes: 13 additions & 5 deletions src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,11 @@ object Constraint {
column: String,
assertion: Double => Boolean,
where: Option[String] = None,
hint: Option[String] = None)
hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None)
: Constraint = {

val maxLength = MaxLength(column, where)
val maxLength = MaxLength(column, where, analyzerOptions)

val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion,
hint = hint)
Expand All @@ -454,15 +455,22 @@ object Constraint {
column: String,
assertion: Double => Boolean,
where: Option[String] = None,
hint: Option[String] = None)
hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None)
: Constraint = {

val minLength = MinLength(column, where)
val minLength = MinLength(column, where, analyzerOptions)

val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion,
hint = hint)

new NamedConstraint(constraint, s"MinLengthConstraint($minLength)")
val sparkAssertion = org.apache.spark.sql.functions.udf(assertion)

new RowLevelAssertedConstraint(
constraint,
s"MinLengthConstraint($minLength)",
s"ColumnLength-$column",
sparkAssertion)
}

/**
Expand Down
Loading

0 comments on commit 9a972e3

Please sign in to comment.