Skip to content

Commit

Permalink
Refactor batch coalesce to be based solely on batch data size (#1133)
Browse files Browse the repository at this point in the history
* Refactor batch coalesce to be based solely on batch data size

Signed-off-by: Jason Lowe <[email protected]>

* Add TargetSize limit check and comments

Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe committed Dec 2, 2020
1 parent 0d092e0 commit ffcfaa1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import ai.rapids.cudf.DType;
import ai.rapids.cudf.DeviceMemoryBuffer;
import com.nvidia.spark.rapids.format.ColumnMeta;
import com.nvidia.spark.rapids.format.SubBufferMeta;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
Expand Down Expand Up @@ -55,32 +54,6 @@ public static boolean isBatchCompressed(ColumnarBatch batch) {
}
}

public static long[] getUncompressedColumnSizes(ColumnarBatch batch) {
GpuCompressedColumnVector compressedVector = (GpuCompressedColumnVector)batch.column(0);
TableMeta tableMeta = compressedVector.getTableMeta();
assert (tableMeta.columnMetasLength() == batch.numCols());
int numCols = tableMeta.columnMetasLength();
ColumnMeta columnMeta = new ColumnMeta();
SubBufferMeta subBufferMetaObj = new SubBufferMeta();
long[] sizes = new long[numCols];
for (int i = 0; i < numCols; i++) {
tableMeta.columnMetas(columnMeta, i);
SubBufferMeta subBuffer = columnMeta.data(subBufferMetaObj);
if (subBuffer != null) {
sizes[i] += subBuffer.length();
}
subBuffer = columnMeta.offsets(subBufferMetaObj);
if (subBuffer != null) {
sizes[i] += subBuffer.length();
}
subBuffer = columnMeta.validity(subBufferMetaObj);
if (subBuffer != null) {
sizes[i] += subBuffer.length();
}
}
return sizes;
}

/**
* This should only ever be called from an assertion.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BufferType, Cuda, NvtxColor, Table}
import ai.rapids.cudf.{Cuda, NvtxColor, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.format.{ColumnMeta, SubBufferMeta, TableMeta}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand All @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -127,11 +126,13 @@ object RequireSingleBatch extends CoalesceGoal {
override def toString: String = "RequireSingleBatch"
}

case class TargetSize(override val targetSizeBytes: Long) extends CoalesceGoal
case class TargetSize(override val targetSizeBytes: Long) extends CoalesceGoal {
require(targetSizeBytes <= Integer.MAX_VALUE,
"Target cannot exceed 2GB without checks for cudf row count limit")
}

abstract class AbstractGpuCoalesceIterator(
iter: Iterator[ColumnarBatch],
schema: StructType,
goal: CoalesceGoal,
numInputRows: SQLMetric,
numInputBatches: SQLMetric,
Expand Down Expand Up @@ -163,11 +164,6 @@ abstract class AbstractGpuCoalesceIterator(
*/
protected def popOnDeck(): ColumnarBatch

/** We need to track the sizes of string columns to make sure we don't exceed 2GB */
private val stringFieldIndices: Array[Int] = schema.fields.zipWithIndex
.filter(_._1.dataType == DataTypes.StringType)
.map(_._2)

/** Optional row limit */
var batchRowLimit: Int = 0

Expand Down Expand Up @@ -214,13 +210,6 @@ abstract class AbstractGpuCoalesceIterator(
*/
def addBatchToConcat(batch: ColumnarBatch): Unit

/**
* Calculate (or estimate) the size of each column in a batch in bytes.
*
* @return Array of column sizes in bytes
*/
def getColumnSizes(batch: ColumnarBatch): Array[Long]

/**
* Called after all of the batches have been added in.
*
Expand All @@ -236,19 +225,22 @@ abstract class AbstractGpuCoalesceIterator(
/**
* Gets the size in bytes of the data buffer for a given column
*/
def getColumnDataSize(cb: ColumnarBatch, index: Int, defaultSize: Long): Long = {
cb.column(index) match {
case g: GpuColumnVector =>
val buff = g.getBase.getDeviceBufferFor(BufferType.DATA)
if (buff == null) 0 else buff.getLength
case h: RapidsHostColumnVector =>
val buff = h.getBase.getHostBufferFor(BufferType.DATA)
if (buff == null) 0 else buff.getLength
case g: GpuCompressedColumnVector =>
val columnMeta = g.getTableMeta.columnMetas(index)
columnMeta.data().length()
case _ =>
defaultSize
def getBatchDataSize(cb: ColumnarBatch): Long = {
if (cb.numCols() > 0) {
cb.column(0) match {
case g: GpuColumnVectorFromBuffer =>
g.getBuffer.getLength
case _: GpuColumnVector =>
(0 until cb.numCols()).map {
i => cb.column(i).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize
}.sum
case g: GpuCompressedColumnVector =>
g.getBuffer.getLength
case g =>
throw new IllegalStateException(s"Unexpected column type: $g")
}
} else {
0
}
}

Expand All @@ -270,18 +262,12 @@ abstract class AbstractGpuCoalesceIterator(
try {
var numRows: Long = 0 // to avoid overflows
var numBytes: Long = 0
var columnSizes: Array[Long] = schema.fields.indices.map(_ => 0L).toArray
var stringColumnSizes: Array[Long] = stringFieldIndices.map(_ => 0L)

// check if there is a batch "on deck" from a previous call to next()
if (hasOnDeck) {
val batch = popOnDeck()
numRows += batch.numRows()
columnSizes = getColumnSizes(batch)
numBytes += columnSizes.sum
stringColumnSizes = stringFieldIndices.map(i => getColumnDataSize(batch, i, columnSizes(i)))
.zip(stringColumnSizes)
.map(pair => pair._1 + pair._2)
numBytes += getBatchDataSize(batch)
addBatch(batch)
}

Expand All @@ -294,25 +280,12 @@ abstract class AbstractGpuCoalesceIterator(
// filter out empty batches
if (nextRows > 0) {
numInputRows += nextRows
val nextColumnSizes = getColumnSizes(cb)
val nextBytes = nextColumnSizes.sum
val nextBytes = getBatchDataSize(cb)

// calculate the new sizes based on this input batch being added to the current
// output batch
val wouldBeRows = numRows + nextRows
val wouldBeBytes = numBytes + nextBytes
val wouldBeColumnSizes = columnSizes.zip(nextColumnSizes).map(pair => pair._1 + pair._2)

// CuDF has a hard limit on the size of string data in a column so we check to make
// sure that the string columns each use no more than Int.MaxValue bytes. This check is
// overly cautious because the calculated size includes the offset bytes. When nested
// types are supported, this logic will need to be enhanced to take offset and validity
// buffers into account since they could account for a larger percentage of overall
// memory usage.
val wouldBeStringColumnSizes =
stringFieldIndices.map(i => getColumnDataSize(cb, i, wouldBeColumnSizes(i)))
.zip(stringColumnSizes)
.map(pair => pair._1 + pair._2)

if (wouldBeRows > Int.MaxValue) {
if (goal == RequireSingleBatch) {
Expand All @@ -324,21 +297,15 @@ abstract class AbstractGpuCoalesceIterator(
} else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) {
saveOnDeck(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
saveOnDeck(cb)
} else if (wouldBeStringColumnSizes.exists(size => size > Int.MaxValue)) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} bytes in a single string column." +
s" At least ${wouldBeStringColumnSizes.max} are in a single column in this" +
s" partition. Please try increasing your partition count.")
}
// There are no explicit checks for the concatenate result exceeding the cudf 2^31
// row count limit for any column. We are relying on cudf's concatenate to throw
// an exception if this occurs and limiting performance-oriented goals to under
// 2GB data total to avoid hitting that error.
saveOnDeck(cb)
} else {
addBatch(cb)
numRows = wouldBeRows
numBytes = wouldBeBytes
columnSizes = wouldBeColumnSizes
stringColumnSizes = wouldBeStringColumnSizes
}
} else {
cb.close()
Expand Down Expand Up @@ -386,7 +353,6 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
schema,
goal,
numInputRows,
numInputBatches,
Expand Down Expand Up @@ -429,40 +395,6 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch],
}
}

private def getUncompressedColumnSizes(tableMeta: TableMeta): Array[Long] = {
val numCols = tableMeta.columnMetasLength
val columnMeta = new ColumnMeta
val subBufferMetaObj = new SubBufferMeta
val sizes = new Array[Long](numCols)
(0 until numCols).foreach { i =>
tableMeta.columnMetas(columnMeta, i)
var subBuffer = columnMeta.data(subBufferMetaObj)
if (subBuffer != null) {
sizes(i) += subBuffer.length
}
subBuffer = columnMeta.offsets(subBufferMetaObj)
if (subBuffer != null) {
sizes(i) += subBuffer.length
}
subBuffer = columnMeta.validity(subBufferMetaObj)
if (subBuffer != null) {
sizes(i) += subBuffer.length
}
}
sizes
}

override def getColumnSizes(cb: ColumnarBatch): Array[Long] = {
if (!isBatchCompressed(cb)) {
GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize)
} else {
val compressedVector = cb.column(0).asInstanceOf[GpuCompressedColumnVector]
val tableMeta = compressedVector.getTableMeta
require(tableMeta.columnMetasLength == cb.numCols)
getUncompressedColumnSizes(tableMeta)
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
decompressBatches()
val tmp = batches.toArray
Expand Down Expand Up @@ -539,7 +471,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
schema,
goal,
numInputRows,
numInputBatches,
Expand All @@ -562,14 +493,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
override def addBatchToConcat(batch: ColumnarBatch): Unit =
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY))

override def getColumnSizes(cb: ColumnarBatch): Array[Long] = {
if (!GpuCompressedColumnVector.isBatchCompressed(cb)) {
GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize)
} else {
GpuCompressedColumnVector.getUncompressedColumnSizes(cb)
}
}

private[this] var codec: TableCompressionCodec = _

private[this] def popAllDecompressed(): Array[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
schema,
goal,
numInputRows,
numInputBatches,
Expand Down Expand Up @@ -205,8 +204,8 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
totalRows += rows
}

override def getColumnSizes(batch: ColumnarBatch): Array[Long] = {
schema.fields.indices.map(GpuBatchUtils.estimateGpuMemory(schema, _, batch.numRows())).toArray
override def getBatchDataSize(batch: ColumnarBatch): Long = {
schema.fields.indices.map(GpuBatchUtils.estimateGpuMemory(schema, _, batch.numRows())).sum
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.format.CodecType

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.metrics.source.MockTaskContext
import org.apache.spark.sql.types.{DataType, DataTypes, LongType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
Expand Down Expand Up @@ -58,75 +58,6 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
})
}

test("limit batches by string size") {
// TODO figure out a better way to deal with the Rmm Event handler
// because we cannot do this multiple times without issues.

// If this test is run on it's own this is needed.
// RapidsBufferCatalog.init(new RapidsConf(new HashMap[String, String]()))

val schema = new StructType(Array(
StructField("a", DataTypes.DoubleType),
StructField("b", DataTypes.StringType)
))

// create input with 2 rows where the combined string length is > Integer.MAX_VALUE
val input = new BatchIterator(schema, rowCount = 2)

val numInputRows = createMetric()
val numInputBatches = createMetric()
val numOutputRows = createMetric()
val numOutputBatches = createMetric()
val collectTime = createMetric()
val concatTime = createMetric()
val totalTime = createMetric()
val peakDevMemory = createMetric()

val it = new GpuCoalesceIterator(input,
schema,
TargetSize(Long.MaxValue),
0,
numInputRows,
numInputBatches,
numOutputRows,
numOutputBatches,
collectTime,
concatTime,
totalTime,
peakDevMemory,
opName = "opname"
) {
// override for this test so we can mock the response to make it look the strings are large
override def getColumnSizes(cb: ColumnarBatch): Array[Long] = Array(64, Int.MaxValue)

override def getColumnDataSize(cb: ColumnarBatch, index: Int, default: Long): Long =
index match {
case 0 => 64L
case 1 => ((Int.MaxValue / 4) * 3).toLong
case _ => default
}
}

while (it.hasNext) {
val batch = it.next()
batch.close()
}

assert(numInputBatches.value == 2)
assert(numOutputBatches.value == 2)
}

private def createMetric() = new SQLMetric("sum")

class BatchIterator(schema: StructType, var rowCount: Int) extends Iterator[ColumnarBatch] {
override def hasNext: Boolean = {
val hasNext = rowCount > 0
rowCount -= 1
hasNext
}
override def next(): ColumnarBatch = FuzzerUtils.createColumnarBatch(schema, 3, 64)
}

test("require single batch") {

val conf = makeBatchedBytes(1)
Expand Down

0 comments on commit ffcfaa1

Please sign in to comment.