-
Notifications
You must be signed in to change notification settings - Fork 533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Length Row Level Results #465
Changes from 5 commits
e14a32f
e4d6d3e
4d76899
f170a3b
f6fb874
bdbd294
296965e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,16 +17,23 @@ | |
package com.amazon.deequ.analyzers | ||
|
||
import com.amazon.deequ.analyzers.Analyzers._ | ||
import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} | ||
import org.apache.spark.sql.functions._ | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} | ||
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior | ||
import com.amazon.deequ.analyzers.runners._ | ||
import com.amazon.deequ.metrics.FullColumn | ||
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn | ||
import com.amazon.deequ.metrics.DoubleMetric | ||
import com.amazon.deequ.metrics.Entity | ||
import com.amazon.deequ.metrics.Metric | ||
import org.apache.spark.sql.functions._ | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.DataFrame | ||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.SparkSession | ||
|
||
import scala.language.existentials | ||
import scala.util.{Failure, Success} | ||
import scala.util.Failure | ||
import scala.util.Success | ||
|
||
/** | ||
* A state (sufficient statistic) computed from data, from which we can compute a metric. | ||
|
@@ -249,6 +256,17 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo | |
} | ||
} | ||
|
||
case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) { | ||
def getNullBehavior(): NullBehavior = { | ||
nullBehavior | ||
} | ||
} | ||
|
||
object NullBehavior extends Enumeration { | ||
type NullBehavior = Value | ||
val Ignore, Empty, Fail = Value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does |
||
} | ||
|
||
/** Base class for analyzers that compute ratios of matching predicates */ | ||
abstract class PredicateMatchingAnalyzer( | ||
name: String, | ||
|
@@ -453,6 +471,13 @@ private[deequ] object Analyzers { | |
conditionalSelection(col(selection), where) | ||
} | ||
|
||
def conditionalSelection(selection: Column, where: Option[String], replaceWith: Any): Column = { | ||
eycho-am marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val conditionColumn = where.map { expression => expr(expression) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
conditionColumn | ||
.map { condition => when(condition, replaceWith).otherwise(selection) } | ||
.getOrElse(selection) | ||
} | ||
|
||
def conditionalSelection(selection: Column, condition: Option[String]): Column = { | ||
val conditionColumn = condition.map { expression => expr(expression) } | ||
conditionalSelectionFromColumns(selection, conditionColumn) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,28 @@ | |
package com.amazon.deequ.analyzers | ||
|
||
import com.amazon.deequ.analyzers.Analyzers._ | ||
import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} | ||
import org.apache.spark.sql.functions.{length, max} | ||
import org.apache.spark.sql.types.{DoubleType, StructType} | ||
import org.apache.spark.sql.{Column, Row} | ||
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior | ||
import com.amazon.deequ.analyzers.Preconditions.hasColumn | ||
import com.amazon.deequ.analyzers.Preconditions.isString | ||
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.functions.col | ||
import org.apache.spark.sql.functions.length | ||
import org.apache.spark.sql.functions.max | ||
import org.apache.spark.sql.types.DoubleType | ||
import org.apache.spark.sql.types.StructType | ||
|
||
case class MaxLength(column: String, where: Option[String] = None) | ||
case class MaxLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) | ||
extends StandardScanShareableAnalyzer[MaxState]("MaxLength", column) | ||
with FilterableAnalyzer { | ||
|
||
override def aggregationFunctions(): Seq[Column] = { | ||
max(criterion) :: Nil | ||
max(criterion(getNullBehavior())) :: Nil | ||
} | ||
|
||
override def fromAggregationResult(result: Row, offset: Int): Option[MaxState] = { | ||
ifNoNullsIn(result, offset) { _ => | ||
MaxState(result.getDouble(offset), Some(criterion)) | ||
MaxState(result.getDouble(offset), Some(criterion(getNullBehavior()))) | ||
} | ||
} | ||
|
||
|
@@ -42,5 +48,19 @@ case class MaxLength(column: String, where: Option[String] = None) | |
|
||
override def filterCondition: Option[String] = where | ||
|
||
private def criterion: Column = length(conditionalSelection(column, where)).cast(DoubleType) | ||
private def criterion(nullBehavior: NullBehavior): Column = { | ||
nullBehavior match { | ||
case NullBehavior.Fail => | ||
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) | ||
conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MaxValue) | ||
case NullBehavior.Empty => | ||
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType) | ||
case _ => length(conditionalSelection(column, where)).cast(DoubleType) | ||
} | ||
} | ||
private def getNullBehavior(): NullBehavior = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: The |
||
analyzerOptions | ||
.map { options => options.getNullBehavior() } | ||
.getOrElse(NullBehavior.Ignore) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,29 @@ | |
package com.amazon.deequ.analyzers | ||
|
||
import com.amazon.deequ.analyzers.Analyzers._ | ||
import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isString} | ||
import org.apache.spark.sql.functions.{length, min} | ||
import org.apache.spark.sql.types.{DoubleType, StructType} | ||
import org.apache.spark.sql.{Column, Row} | ||
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior | ||
import com.amazon.deequ.analyzers.Preconditions.hasColumn | ||
import com.amazon.deequ.analyzers.Preconditions.isString | ||
import com.google.common.annotations.VisibleForTesting | ||
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.functions.col | ||
import org.apache.spark.sql.functions.length | ||
import org.apache.spark.sql.functions.min | ||
import org.apache.spark.sql.types.DoubleType | ||
import org.apache.spark.sql.types.StructType | ||
|
||
case class MinLength(column: String, where: Option[String] = None) | ||
case class MinLength(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) | ||
extends StandardScanShareableAnalyzer[MinState]("MinLength", column) | ||
with FilterableAnalyzer { | ||
|
||
override def aggregationFunctions(): Seq[Column] = { | ||
min(length(conditionalSelection(column, where))).cast(DoubleType) :: Nil | ||
min(criterion(getNullBehavior())) :: Nil | ||
} | ||
|
||
override def fromAggregationResult(result: Row, offset: Int): Option[MinState] = { | ||
ifNoNullsIn(result, offset) { _ => | ||
MinState(result.getDouble(offset)) | ||
MinState(result.getDouble(offset), Some(criterion(getNullBehavior()))) | ||
} | ||
} | ||
|
||
|
@@ -41,4 +48,22 @@ case class MinLength(column: String, where: Option[String] = None) | |
} | ||
|
||
override def filterCondition: Option[String] = where | ||
|
||
@VisibleForTesting | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed if we are using package private modifier and have the test in the appropriate package? |
||
private[deequ] def criterion(nullBehavior: NullBehavior): Column = { | ||
nullBehavior match { | ||
case NullBehavior.Fail => | ||
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType) | ||
conditionalSelection(colLengths, Option(s"${column} IS NULL"), Double.MinValue) | ||
case NullBehavior.Empty => | ||
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), "")).cast(DoubleType) | ||
case _ => length(conditionalSelection(column, where)).cast(DoubleType) | ||
} | ||
} | ||
|
||
private def getNullBehavior(): NullBehavior = { | ||
analyzerOptions | ||
.map { options => options.getNullBehavior() } | ||
.getOrElse(NullBehavior.Ignore) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required.