Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor batch coalesce to be based solely on batch data size #1133

Merged
merged 4 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import ai.rapids.cudf.DType;
import ai.rapids.cudf.DeviceMemoryBuffer;
import com.nvidia.spark.rapids.format.ColumnMeta;
import com.nvidia.spark.rapids.format.SubBufferMeta;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
Expand Down Expand Up @@ -55,32 +54,6 @@ public static boolean isBatchCompressed(ColumnarBatch batch) {
}
}

public static long[] getUncompressedColumnSizes(ColumnarBatch batch) {
GpuCompressedColumnVector compressedVector = (GpuCompressedColumnVector)batch.column(0);
TableMeta tableMeta = compressedVector.getTableMeta();
assert (tableMeta.columnMetasLength() == batch.numCols());
int numCols = tableMeta.columnMetasLength();
ColumnMeta columnMeta = new ColumnMeta();
SubBufferMeta subBufferMetaObj = new SubBufferMeta();
long[] sizes = new long[numCols];
for (int i = 0; i < numCols; i++) {
tableMeta.columnMetas(columnMeta, i);
SubBufferMeta subBuffer = columnMeta.data(subBufferMetaObj);
if (subBuffer != null) {
sizes[i] += subBuffer.length();
}
subBuffer = columnMeta.offsets(subBufferMetaObj);
if (subBuffer != null) {
sizes[i] += subBuffer.length();
}
subBuffer = columnMeta.validity(subBufferMetaObj);
if (subBuffer != null) {
sizes[i] += subBuffer.length();
}
}
return sizes;
}

/**
* This should only ever be called from an assertion.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BufferType, Cuda, NvtxColor, Table}
import ai.rapids.cudf.{Cuda, NvtxColor, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.format.{ColumnMeta, SubBufferMeta, TableMeta}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand All @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -131,7 +130,6 @@ case class TargetSize(override val targetSizeBytes: Long) extends CoalesceGoal

abstract class AbstractGpuCoalesceIterator(
iter: Iterator[ColumnarBatch],
schema: StructType,
goal: CoalesceGoal,
numInputRows: SQLMetric,
numInputBatches: SQLMetric,
Expand Down Expand Up @@ -163,11 +161,6 @@ abstract class AbstractGpuCoalesceIterator(
*/
protected def popOnDeck(): ColumnarBatch

/** We need to track the sizes of string columns to make sure we don't exceed 2GB */
private val stringFieldIndices: Array[Int] = schema.fields.zipWithIndex
.filter(_._1.dataType == DataTypes.StringType)
.map(_._2)

/** Optional row limit */
var batchRowLimit: Int = 0

Expand Down Expand Up @@ -214,13 +207,6 @@ abstract class AbstractGpuCoalesceIterator(
*/
def addBatchToConcat(batch: ColumnarBatch): Unit

/**
* Calculate (or estimate) the size of each column in a batch in bytes.
*
* @return Array of column sizes in bytes
*/
def getColumnSizes(batch: ColumnarBatch): Array[Long]

/**
* Called after all of the batches have been added in.
*
Expand All @@ -236,19 +222,22 @@ abstract class AbstractGpuCoalesceIterator(
/**
* Gets the size in bytes of the data buffer for a given column
*/
def getColumnDataSize(cb: ColumnarBatch, index: Int, defaultSize: Long): Long = {
cb.column(index) match {
case g: GpuColumnVector =>
val buff = g.getBase.getDeviceBufferFor(BufferType.DATA)
if (buff == null) 0 else buff.getLength
case h: RapidsHostColumnVector =>
val buff = h.getBase.getHostBufferFor(BufferType.DATA)
if (buff == null) 0 else buff.getLength
case g: GpuCompressedColumnVector =>
val columnMeta = g.getTableMeta.columnMetas(index)
columnMeta.data().length()
case _ =>
defaultSize
def getBatchDataSize(cb: ColumnarBatch): Long = {
if (cb.numCols() > 0) {
cb.column(0) match {
case g: GpuColumnVectorFromBuffer =>
g.getBuffer.getLength
case _: GpuColumnVector =>
(0 until cb.numCols()).map {
i => cb.column(i).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize
}.sum
case g: GpuCompressedColumnVector =>
g.getBuffer.getLength
case g =>
throw new IllegalStateException(s"Unexpected column type: $g")
}
} else {
0
}
}

Expand All @@ -270,18 +259,12 @@ abstract class AbstractGpuCoalesceIterator(
try {
var numRows: Long = 0 // to avoid overflows
var numBytes: Long = 0
var columnSizes: Array[Long] = schema.fields.indices.map(_ => 0L).toArray
var stringColumnSizes: Array[Long] = stringFieldIndices.map(_ => 0L)

// check if there is a batch "on deck" from a previous call to next()
if (hasOnDeck) {
val batch = popOnDeck()
numRows += batch.numRows()
columnSizes = getColumnSizes(batch)
numBytes += columnSizes.sum
stringColumnSizes = stringFieldIndices.map(i => getColumnDataSize(batch, i, columnSizes(i)))
.zip(stringColumnSizes)
.map(pair => pair._1 + pair._2)
numBytes += getBatchDataSize(batch)
addBatch(batch)
}

Expand All @@ -294,25 +277,12 @@ abstract class AbstractGpuCoalesceIterator(
// filter out empty batches
if (nextRows > 0) {
numInputRows += nextRows
val nextColumnSizes = getColumnSizes(cb)
val nextBytes = nextColumnSizes.sum
val nextBytes = getBatchDataSize(cb)

// calculate the new sizes based on this input batch being added to the current
// output batch
val wouldBeRows = numRows + nextRows
val wouldBeBytes = numBytes + nextBytes
val wouldBeColumnSizes = columnSizes.zip(nextColumnSizes).map(pair => pair._1 + pair._2)

// CuDF has a hard limit on the size of string data in a column so we check to make
// sure that the string columns each use no more than Int.MaxValue bytes. This check is
// overly cautious because the calculated size includes the offset bytes. When nested
// types are supported, this logic will need to be enhanced to take offset and validity
// buffers into account since they could account for a larger percentage of overall
// memory usage.
val wouldBeStringColumnSizes =
stringFieldIndices.map(i => getColumnDataSize(cb, i, wouldBeColumnSizes(i)))
.zip(stringColumnSizes)
.map(pair => pair._1 + pair._2)

if (wouldBeRows > Int.MaxValue) {
if (goal == RequireSingleBatch) {
Expand All @@ -325,20 +295,10 @@ abstract class AbstractGpuCoalesceIterator(
saveOnDeck(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
saveOnDeck(cb)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
} else if (wouldBeStringColumnSizes.exists(size => size > Int.MaxValue)) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} bytes in a single string column." +
s" At least ${wouldBeStringColumnSizes.max} are in a single column in this" +
s" partition. Please try increasing your partition count.")
}
saveOnDeck(cb)
} else {
addBatch(cb)
numRows = wouldBeRows
numBytes = wouldBeBytes
columnSizes = wouldBeColumnSizes
stringColumnSizes = wouldBeStringColumnSizes
}
} else {
cb.close()
Expand Down Expand Up @@ -386,7 +346,6 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
schema,
goal,
numInputRows,
numInputBatches,
Expand Down Expand Up @@ -429,40 +388,6 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch],
}
}

private def getUncompressedColumnSizes(tableMeta: TableMeta): Array[Long] = {
val numCols = tableMeta.columnMetasLength
val columnMeta = new ColumnMeta
val subBufferMetaObj = new SubBufferMeta
val sizes = new Array[Long](numCols)
(0 until numCols).foreach { i =>
tableMeta.columnMetas(columnMeta, i)
var subBuffer = columnMeta.data(subBufferMetaObj)
if (subBuffer != null) {
sizes(i) += subBuffer.length
}
subBuffer = columnMeta.offsets(subBufferMetaObj)
if (subBuffer != null) {
sizes(i) += subBuffer.length
}
subBuffer = columnMeta.validity(subBufferMetaObj)
if (subBuffer != null) {
sizes(i) += subBuffer.length
}
}
sizes
}

override def getColumnSizes(cb: ColumnarBatch): Array[Long] = {
if (!isBatchCompressed(cb)) {
GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize)
} else {
val compressedVector = cb.column(0).asInstanceOf[GpuCompressedColumnVector]
val tableMeta = compressedVector.getTableMeta
require(tableMeta.columnMetasLength == cb.numCols)
getUncompressedColumnSizes(tableMeta)
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
decompressBatches()
val tmp = batches.toArray
Expand Down Expand Up @@ -539,7 +464,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
schema,
goal,
numInputRows,
numInputBatches,
Expand All @@ -562,14 +486,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
override def addBatchToConcat(batch: ColumnarBatch): Unit =
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY))

override def getColumnSizes(cb: ColumnarBatch): Array[Long] = {
if (!GpuCompressedColumnVector.isBatchCompressed(cb)) {
GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize)
} else {
GpuCompressedColumnVector.getUncompressedColumnSizes(cb)
}
}

private[this] var codec: TableCompressionCodec = _

private[this] def popAllDecompressed(): Array[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ object HostColumnarToGpu {
for (i <- 0 until rows) {
b.appendUTF8String(cv.getUTF8String(i).getBytes)
}
case (t, n) =>
throw new UnsupportedOperationException(s"Converting to GPU for ${t} is not currently " +
case (t, _) =>
throw new UnsupportedOperationException(s"Converting to GPU for $t is not currently " +
s"supported")
}
}
Expand All @@ -142,7 +142,6 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
schema,
goal,
numInputRows,
numInputBatches,
Expand Down Expand Up @@ -187,8 +186,8 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
totalRows += rows
}

override def getColumnSizes(batch: ColumnarBatch): Array[Long] = {
schema.fields.indices.map(GpuBatchUtils.estimateGpuMemory(schema, _, batchRowLimit)).toArray
override def getBatchDataSize(cb: ColumnarBatch): Long = {
schema.fields.indices.map(GpuBatchUtils.estimateGpuMemory(schema, _, batchRowLimit)).sum
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.format.CodecType

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.metrics.source.MockTaskContext
import org.apache.spark.sql.types.{DataType, DataTypes, LongType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
Expand Down Expand Up @@ -58,75 +58,6 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
})
}

test("limit batches by string size") {
// TODO figure out a better way to deal with the Rmm Event handler
// because we cannot do this multiple times without issues.

// If this test is run on it's own this is needed.
// RapidsBufferCatalog.init(new RapidsConf(new HashMap[String, String]()))

val schema = new StructType(Array(
StructField("a", DataTypes.DoubleType),
StructField("b", DataTypes.StringType)
))

// create input with 2 rows where the combined string length is > Integer.MAX_VALUE
val input = new BatchIterator(schema, rowCount = 2)

val numInputRows = createMetric()
val numInputBatches = createMetric()
val numOutputRows = createMetric()
val numOutputBatches = createMetric()
val collectTime = createMetric()
val concatTime = createMetric()
val totalTime = createMetric()
val peakDevMemory = createMetric()

val it = new GpuCoalesceIterator(input,
schema,
TargetSize(Long.MaxValue),
0,
numInputRows,
numInputBatches,
numOutputRows,
numOutputBatches,
collectTime,
concatTime,
totalTime,
peakDevMemory,
opName = "opname"
) {
// override for this test so we can mock the response to make it look the strings are large
override def getColumnSizes(cb: ColumnarBatch): Array[Long] = Array(64, Int.MaxValue)

override def getColumnDataSize(cb: ColumnarBatch, index: Int, default: Long): Long =
index match {
case 0 => 64L
case 1 => ((Int.MaxValue / 4) * 3).toLong
case _ => default
}
}

while (it.hasNext) {
val batch = it.next()
batch.close()
}

assert(numInputBatches.value == 2)
assert(numOutputBatches.value == 2)
}

private def createMetric() = new SQLMetric("sum")

class BatchIterator(schema: StructType, var rowCount: Int) extends Iterator[ColumnarBatch] {
override def hasNext: Boolean = {
val hasNext = rowCount > 0
rowCount -= 1
hasNext
}
override def next(): ColumnarBatch = FuzzerUtils.createColumnarBatch(schema, 3, 64)
}

test("require single batch") {

val conf = makeBatchedBytes(1)
Expand Down