Skip to content

Commit

Permalink
Added test and removed dead code for Sanity Checker dealing with maps…
Browse files Browse the repository at this point in the history
… with same key (#153)
  • Loading branch information
leahmcguire committed Oct 16, 2018
1 parent b1aec92 commit a35108b
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,6 @@ class SanityChecker(uid: String = UID[SanityChecker])
.toMap[String, Double]
val numCorrIndices = corrIndices.length

// build map from indicator group name to any null indicator it may have
val nullGroups = for {
col <- metaCols
if col.isNullIndicator
grouping <- col.grouping
} yield (grouping, (col, col.index))

nullGroups.groupBy(_._1).foreach {
case (group, cols) =>
require(cols.length == 1, s"Vector column $group has multiple null indicator fields: $cols")
}

def maxByParent(seq: Seq[(String, Double)]) = seq.groupBy(_._1).map{ case(k, v) =>
// Filter out the NaNs because max(3.4, NaN) = NaN, and we still want the keep the largest correlation
k -> v.filterNot(_._2.isNaN).foldLeft(0.0)((a, b) => math.max(a, math.abs(b._2)))
Expand Down Expand Up @@ -329,7 +317,7 @@ class SanityChecker(uid: String = UID[SanityChecker])
// inside cramersVParent first and then check without keys for removal. This will over-remove features (eg.
// an entire map), but should only affect custom map vectorizers that don't set indicator groups on columns.
def getParentValue(col: OpVectorColumnMetadata, check1: Map[String, Double], check2: Map[String, Double]) =
col.parentNamesWithMapKeys().flatMap( k => check1.get(k).orElse(check2.get(k)) ).reduceOption(_ max _)
col.parentNamesWithMapKeys().flatMap( k => check1.get(k).orElse(check2.get(k)) ).reduceOption(_ max _)

val featuresStats = metaCols.map {
col =>
Expand Down Expand Up @@ -387,7 +375,7 @@ class SanityChecker(uid: String = UID[SanityChecker])

// Calculate groups to remove separately. This is for more complicated checks where you can't determine whether
// to remove a feature from a single column stats (eg. associate rule confidence/support check)
val groupByGroups = stats.groupBy(_.column.flatMap(_.grouping))
val groupByGroups = stats.groupBy(_.column.flatMap(_.featureGroup()))
val ruleConfGroupsToDrop = groupByGroups.toSeq.flatMap{
case (Some(group), colStats) =>
val colsToRemove = colStats.filter(f =>
Expand Down Expand Up @@ -457,63 +445,72 @@ class SanityChecker(uid: String = UID[SanityChecker])
val distinctLabels = contingencyData.count()
val stats =
if (isDefined(categoricalLabel) && $(categoricalLabel) || distinctLabels < min(100.0, sampleSize * 0.1)) {
val contingencyWithKeys = contingencyData.collect()
val contingency = contingencyWithKeys.sortBy(_._1).map { case (_, vector) => vector }

logInfo("Label is assumed to be categorical since either categoricalLabel = true or " +
"number of distinct labels < count * 0.1")

// Only perform Cramer's V calculation on columns that have an grouping and indicatorValue defined (right
// now, the only things that will have indicatorValue defined and grouping be None is numeric maps)
val columnsWithIndicator = columnMeta.filter(f => f.grouping.isDefined && f.indicatorValue.isDefined)
val colIndicesByGrouping =
columnsWithIndicator
.map { meta => meta.grouping.get -> meta }
.groupBy(_._1)
// Keep track of the group, column name, column index, and whether the parent was a MultiPickList or not
.map { case (group, cols) => (group, cols.map(_._2.makeColName()), cols.map(_._2.index),
cols.exists(_._2.hasParentOfSubType[MultiPickList]))
val contingencyWithKeys = contingencyData.collect()
val contingency = contingencyWithKeys.sortBy(_._1).map { case (_, vector) => vector }

logInfo("Label is assumed to be categorical since either categoricalLabel = true or " +
"number of distinct labels < count * 0.1")

// Only perform Cramer's V calculation on columns that have an grouping and indicatorValue defined (right
// now, the only things that will have indicatorValue defined and grouping be None is numeric maps)
val columnsWithIndicator = columnMeta.filter(f => f.grouping.isDefined && f.indicatorValue.isDefined)
val colIndicesByGrouping =
columnsWithIndicator
.map { meta => meta.featureGroup().get -> meta }
.groupBy(_._1)
// Keep track of the group, column name, column index, and whether the parent was a MultiPickList or not
.map{ case (group, cols) =>
val repeats = cols.map(c => (c._2.indicatorValue, c._2.index)).groupBy(_._1)
.collect{ case (_, seq) if seq.length > 1 => seq.tail.map(_._2) } // only first used in stats
.flatten.toSet
val colsCleaned = cols.map(_._2).filterNot(c => repeats.contains(c.index))
(group, colsCleaned.map(_.makeColName()), colsCleaned.map(_.index),
colsCleaned.exists(_.hasParentOfSubType[MultiPickList]))
}

colIndicesByGrouping.map {
case (group, colNames, valueIndices, isMpl) =>
val groupContingency =
if (valueIndices.length == 1) {
// parentFeatureNames only has a single indicator column, construct the other from label sums
contingency.flatMap(features => {
val indicatorSum = valueIndices.map(features.apply)
indicatorSum ++ indicatorSum.map(features.last - _)
})
} else contingency.flatMap { features => valueIndices.map(features.apply) }

// columns are label value, rows are feature value
val contingencyMatrix = if (valueIndices.length == 1) {
new DenseMatrix(2, groupContingency.length / 2, groupContingency)
}
else new DenseMatrix(valueIndices.length, groupContingency.length / valueIndices.length, groupContingency)

val cStats =
if (isMpl) {
val labelCounts = contingency.map(_.last)
OpStatistics.contingencyStatsFromMultiPickList(contingencyMatrix, labelCounts)
} else OpStatistics.contingencyStats(contingencyMatrix)

CategoricalGroupStats(
group = group,
categoricalFeatures = colNames.toArray,
contingencyMatrix = cStats.contingencyMatrix,
pointwiseMutualInfo = cStats.pointwiseMutualInfo,
cramersV = cStats.chiSquaredResults.cramersV,
mutualInfo = cStats.mutualInfo,
maxRuleConfidences = cStats.confidenceResults.maxConfidences,
supports = cStats.confidenceResults.supports
)
}.toArray
} else {
logInfo(s"Label is assumed to be continuous since number of distinct labels = $distinctLabels" +
s"which is greater than 10% the size of the sample $sampleSize skipping calculation of Cramer's V")
Array.empty[CategoricalGroupStats]
}
colIndicesByGrouping.map {
case (group, colNames, valueIndices, isMpl) =>
val groupContingency =
if (valueIndices.length == 1) {
// parentFeatureNames only has a single indicator column, construct the other from label sums
contingency.flatMap(features => {
val indicatorSum = valueIndices.map(features.apply)
indicatorSum ++ indicatorSum.map(features.last - _)
})
} else {
contingency.flatMap { features => valueIndices.map(features.apply) }
}

// columns are label value, rows are feature value
val contingencyMatrix =
if (valueIndices.length == 1) {
new DenseMatrix(2, groupContingency.length / 2, groupContingency)
} else {
new DenseMatrix(valueIndices.length, groupContingency.length / valueIndices.length, groupContingency)
}

val cStats =
if (isMpl) {
val labelCounts = contingency.map(_.last)
OpStatistics.contingencyStatsFromMultiPickList(contingencyMatrix, labelCounts)
} else OpStatistics.contingencyStats(contingencyMatrix)

CategoricalGroupStats(
group = group,
categoricalFeatures = colNames.toArray,
contingencyMatrix = cStats.contingencyMatrix,
pointwiseMutualInfo = cStats.pointwiseMutualInfo,
cramersV = cStats.chiSquaredResults.cramersV,
mutualInfo = cStats.mutualInfo,
maxRuleConfidences = cStats.confidenceResults.maxConfidences,
supports = cStats.confidenceResults.supports
)
}.toArray
} else {
logInfo(s"Label is assumed to be continuous since number of distinct labels = $distinctLabels" +
s"which is greater than 10% the size of the sample $sampleSize skipping calculation of Cramer's V")
Array.empty[CategoricalGroupStats]
}
contingencyData.unpersist(blocking = false)
stats
}
Expand Down Expand Up @@ -616,10 +613,10 @@ class SanityChecker(uid: String = UID[SanityChecker])

// Exclude feature vector entries coming from hashed text features if requested
val localVectorRowsForCorr = vectorRows.map {
case (v: OldDenseVector) =>
case v: OldDenseVector =>
val res = localCorrIndices.map(v.apply)
OldVectors.dense(res)
case (v: OldSparseVector) => {
case v: OldSparseVector => {
val res = new ArrayBuffer[(Int, Double)]()
v.foreachActive((i, v) => if (localCorrIndices.contains(i)) res += localCorrIndices.indexOf(i) -> v)
OldVectors.sparse(localCorrIndices.length, res).compressed
Expand Down Expand Up @@ -812,9 +809,9 @@ private[op] case class ColumnStatistics
maxRuleConfidences.zip(supports).collectFirst {
case (conf, sup) if (conf > maxRuleConfidence && sup > minRequiredRuleSupport) =>
s"Max association rule confidence $conf is above threshold of $maxRuleConfidence and support $sup is " +
s"above the required support threshold of $minRequiredRuleSupport"
s"above the required support threshold of $minRequiredRuleSupport"
},
column.flatMap(_.grouping).filter(removedGroups.contains(_)).map(ig =>
column.flatMap(_.featureGroup()).filter(removedGroups.contains(_)).map(ig =>
s"other feature in indicator group $ig flagged for removal via rule confidence checks"
)
).flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,40 +104,6 @@ class BadFeatureZooTest extends FlatSpec with TestSparkContext with Logging {
retrieved.names.length - 2
}

ignore should "Group groupings separately for transformations computed on same feature" in {
val ageData: Seq[Real] = RandomReal.uniform[Real](minValue = 0.0, maxValue = 20.0)
.withProbabilityOfEmpty(0.5).limit(200) ++ RandomReal.uniform[Real](minValue = 40.0, maxValue = 70.0)
.withProbabilityOfEmpty(0.0).limit(100)
val (rawDF, rawAge) = TestFeatureBuilder("age", ageData)
val labelTransformer = new UnaryLambdaTransformer[Real, RealNN](operationName = "labelFunc",
transformFn = p => p.value match {
case Some(x) if Some(x).get > 30.0 => RealNN(1.0)
case _ => RealNN(0.0)
}
)
val labelData = labelTransformer.setInput(rawAge).getOutput().asInstanceOf[Feature[RealNN]]
.copy(isResponse = true)
rawAge.bucketize(trackNulls = true,
splits = Array(Double.NegativeInfinity, 30.0, Double.PositiveInfinity),
splitInclusion = Inclusion.Right
)
val ageBuckets = rawAge.autoBucketize(labelData, trackNulls = true)
val genFeatureVector = Seq(ageBuckets,
rawAge.vectorize(fillValue = 0.0, fillWithMean = true, trackNulls = true)
).transmogrify()
val transformed = new OpWorkflow().setResultFeatures(genFeatureVector).transform(rawDF)
val metaCols = OpVectorMetadata(transformed.schema(genFeatureVector.name)).columns
val nullGroups = for {
col <- metaCols
if col.isNullIndicator
group <- col.grouping
} yield (group, (col, col.index))
nullGroups.groupBy(_._1).foreach {
case (group, cols) =>
require(cols.length == 1, s"Vector column $group has multiple null indicator fields: $cols")
}
}

ignore should "Compute The same Cramer's V value a categorical feature whether or not other categorical " +
"features are derived from the same parent feature" in {
/* Generate an age feature for which young ages imply label is 1, old ages imply label is 0 and an empty age
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,36 @@ class SanityCheckerTest extends OpEstimatorSpec[OPVector, BinaryModel[RealNN, OP
featuresToDrop, featuresWithNaNCorr)
}

it should "not fail when maps have the same keys" in {
val mapData = textRawData.map{
case (i, t, tm) => (i, t, tm.value.toPickListMap, tm.value.toPickListMap,
tm.value.map{ case (k, v) => k -> math.random }.toRealMap)
}
val (mapDataFrame, id, target, plMap1, plMap2, doubleMap) = TestFeatureBuilder(
"id", "target", "textMap1", "textMap2", "doubleMap", mapData)
val targetResponse: FeatureLike[RealNN] = target.copy(isResponse = true)
val features = Seq(id, target, plMap1, plMap2, doubleMap).transmogrify()
val checked = targetResponse.sanityCheck(features, categoricalLabel = Option(true))
val output = new OpWorkflow().setResultFeatures(checked).transform(mapDataFrame)
output.select(checked.name).count() shouldBe 12
val meta = SanityCheckerSummary.fromMetadata(checked.originStage.getMetadata().getSummaryMetadata())
meta.dropped.size shouldBe 0
meta.categoricalStats.size shouldBe 10
meta.categoricalStats.foreach(_.contingencyMatrix("0").length shouldBe 2)
}

it should "produce the same statistics if the same transformation is applied twice" in {
val plMap = textMap.map[PickListMap](_.value.toPickListMap)
val features = Seq(id, target, plMap, plMap).transmogrify()
val checked = targetResponse.sanityCheck(features, categoricalLabel = Option(true))
val output = new OpWorkflow().setResultFeatures(checked).transform(textData)
output.select(checked.name).count() shouldBe 12
val meta = SanityCheckerSummary.fromMetadata(checked.originStage.getMetadata().getSummaryMetadata())
meta.dropped.size shouldBe 0
meta.categoricalStats.size shouldBe 4
meta.categoricalStats.foreach(_.contingencyMatrix("0").length shouldBe 2)
}

private def validateEstimatorOutput(outputColName: String, model: BinaryModel[RealNN, OPVector, OPVector],
expectedFeaturesToDrop: Seq[String], label: String): Unit = {
val metadata = model.getMetadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ case class OpVectorColumnMetadata // TODO make separate case classes extending t
if (hasParentOfSubType[OPMap[_]]) parentFeatureName.map(p => grouping.map(p + "_" + _).getOrElse(p))
else parentFeatureName

/**
* Get the feature grouping qualified by the parent feature name
* @return Optional string of feature grouping
*/
def featureGroup(): Option[String] = grouping.map(g => s"${parentFeatureName.mkString("_")}_$g")

}

object OpVectorColumnMetadata {
Expand Down

0 comments on commit a35108b

Please sign in to comment.