diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java index 3fd46523fde..174986de992 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -19,7 +19,6 @@ import ai.rapids.cudf.DType; import ai.rapids.cudf.DeviceMemoryBuffer; import com.nvidia.spark.rapids.format.ColumnMeta; -import com.nvidia.spark.rapids.format.SubBufferMeta; import com.nvidia.spark.rapids.format.TableMeta; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.BinaryType; @@ -55,32 +54,6 @@ public static boolean isBatchCompressed(ColumnarBatch batch) { } } - public static long[] getUncompressedColumnSizes(ColumnarBatch batch) { - GpuCompressedColumnVector compressedVector = (GpuCompressedColumnVector)batch.column(0); - TableMeta tableMeta = compressedVector.getTableMeta(); - assert (tableMeta.columnMetasLength() == batch.numCols()); - int numCols = tableMeta.columnMetasLength(); - ColumnMeta columnMeta = new ColumnMeta(); - SubBufferMeta subBufferMetaObj = new SubBufferMeta(); - long[] sizes = new long[numCols]; - for (int i = 0; i < numCols; i++) { - tableMeta.columnMetas(columnMeta, i); - SubBufferMeta subBuffer = columnMeta.data(subBufferMetaObj); - if (subBuffer != null) { - sizes[i] += subBuffer.length(); - } - subBuffer = columnMeta.offsets(subBufferMetaObj); - if (subBuffer != null) { - sizes[i] += subBuffer.length(); - } - subBuffer = columnMeta.validity(subBufferMetaObj); - if (subBuffer != null) { - sizes[i] += subBuffer.length(); - } - } - return sizes; - } - /** * This should only ever be called from an assertion. */ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index d303170bae5..2cebd4e6d1d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -18,9 +18,8 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{BufferType, Cuda, NvtxColor, Table} +import ai.rapids.cudf.{Cuda, NvtxColor, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.format.{ColumnMeta, SubBufferMeta, TableMeta} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -127,11 +126,13 @@ object RequireSingleBatch extends CoalesceGoal { override def toString: String = "RequireSingleBatch" } -case class TargetSize(override val targetSizeBytes: Long) extends CoalesceGoal +case class TargetSize(override val targetSizeBytes: Long) extends CoalesceGoal { + require(targetSizeBytes <= Integer.MAX_VALUE, + "Target cannot exceed 2GB without checks for cudf row count limit") +} abstract class AbstractGpuCoalesceIterator( iter: Iterator[ColumnarBatch], - schema: StructType, goal: CoalesceGoal, numInputRows: SQLMetric, numInputBatches: SQLMetric, @@ -163,11 +164,6 @@ abstract class AbstractGpuCoalesceIterator( */ protected def popOnDeck(): ColumnarBatch - /** We need to track the sizes of string columns to make sure we don't exceed 2GB */ - private val stringFieldIndices: Array[Int] = schema.fields.zipWithIndex - .filter(_._1.dataType == DataTypes.StringType) - .map(_._2) - /** Optional row limit */ var batchRowLimit: Int = 0 @@ -214,13 +210,6 @@ abstract class AbstractGpuCoalesceIterator( */ def addBatchToConcat(batch: ColumnarBatch): Unit - /** - * Calculate (or estimate) the size of each column in a batch in bytes. - * - * @return Array of column sizes in bytes - */ - def getColumnSizes(batch: ColumnarBatch): Array[Long] - /** * Called after all of the batches have been added in. * @@ -236,19 +225,22 @@ abstract class AbstractGpuCoalesceIterator( /** * Gets the size in bytes of the data buffer for a given column */ - def getColumnDataSize(cb: ColumnarBatch, index: Int, defaultSize: Long): Long = { - cb.column(index) match { - case g: GpuColumnVector => - val buff = g.getBase.getDeviceBufferFor(BufferType.DATA) - if (buff == null) 0 else buff.getLength - case h: RapidsHostColumnVector => - val buff = h.getBase.getHostBufferFor(BufferType.DATA) - if (buff == null) 0 else buff.getLength - case g: GpuCompressedColumnVector => - val columnMeta = g.getTableMeta.columnMetas(index) - columnMeta.data().length() - case _ => - defaultSize + def getBatchDataSize(cb: ColumnarBatch): Long = { + if (cb.numCols() > 0) { + cb.column(0) match { + case g: GpuColumnVectorFromBuffer => + g.getBuffer.getLength + case _: GpuColumnVector => + (0 until cb.numCols()).map { + i => cb.column(i).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize + }.sum + case g: GpuCompressedColumnVector => + g.getBuffer.getLength + case g => + throw new IllegalStateException(s"Unexpected column type: $g") + } + } else { + 0 } } @@ -270,18 +262,12 @@ abstract class AbstractGpuCoalesceIterator( try { var numRows: Long = 0 // to avoid overflows var numBytes: Long = 0 - var columnSizes: Array[Long] = schema.fields.indices.map(_ => 0L).toArray - var stringColumnSizes: Array[Long] = stringFieldIndices.map(_ => 0L) // check if there is a batch "on deck" from a previous call to next() if (hasOnDeck) { val batch = popOnDeck() numRows += batch.numRows() - columnSizes = getColumnSizes(batch) - numBytes += columnSizes.sum - stringColumnSizes = stringFieldIndices.map(i => getColumnDataSize(batch, i, columnSizes(i))) - .zip(stringColumnSizes) - .map(pair => pair._1 + pair._2) + numBytes += getBatchDataSize(batch) addBatch(batch) } @@ -294,25 +280,12 @@ abstract class AbstractGpuCoalesceIterator( // filter out empty batches if (nextRows > 0) { numInputRows += nextRows - val nextColumnSizes = getColumnSizes(cb) - val nextBytes = nextColumnSizes.sum + val nextBytes = getBatchDataSize(cb) // calculate the new sizes based on this input batch being added to the current // output batch val wouldBeRows = numRows + nextRows val wouldBeBytes = numBytes + nextBytes - val wouldBeColumnSizes = columnSizes.zip(nextColumnSizes).map(pair => pair._1 + pair._2) - - // CuDF has a hard limit on the size of string data in a column so we check to make - // sure that the string columns each use no more than Int.MaxValue bytes. This check is - // overly cautious because the calculated size includes the offset bytes. When nested - // types are supported, this logic will need to be enhanced to take offset and validity - // buffers into account since they could account for a larger percentage of overall - // memory usage. - val wouldBeStringColumnSizes = - stringFieldIndices.map(i => getColumnDataSize(cb, i, wouldBeColumnSizes(i))) - .zip(stringColumnSizes) - .map(pair => pair._1 + pair._2) if (wouldBeRows > Int.MaxValue) { if (goal == RequireSingleBatch) { @@ -324,21 +297,15 @@ abstract class AbstractGpuCoalesceIterator( } else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) { saveOnDeck(cb) } else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) { - saveOnDeck(cb) - } else if (wouldBeStringColumnSizes.exists(size => size > Int.MaxValue)) { - if (goal == RequireSingleBatch) { - throw new IllegalStateException("A single batch is required for this operation," + - s" but cuDF only supports ${Int.MaxValue} bytes in a single string column." + - s" At least ${wouldBeStringColumnSizes.max} are in a single column in this" + - s" partition. Please try increasing your partition count.") - } + // There are no explicit checks for the concatenate result exceeding the cudf 2^31 + // row count limit for any column. We are relying on cudf's concatenate to throw + // an exception if this occurs and limiting performance-oriented goals to under + // 2GB data total to avoid hitting that error. saveOnDeck(cb) } else { addBatch(cb) numRows = wouldBeRows numBytes = wouldBeBytes - columnSizes = wouldBeColumnSizes - stringColumnSizes = wouldBeStringColumnSizes } } else { cb.close() @@ -386,7 +353,6 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch], peakDevMemory: SQLMetric, opName: String) extends AbstractGpuCoalesceIterator(iter, - schema, goal, numInputRows, numInputBatches, @@ -429,40 +395,6 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch], } } - private def getUncompressedColumnSizes(tableMeta: TableMeta): Array[Long] = { - val numCols = tableMeta.columnMetasLength - val columnMeta = new ColumnMeta - val subBufferMetaObj = new SubBufferMeta - val sizes = new Array[Long](numCols) - (0 until numCols).foreach { i => - tableMeta.columnMetas(columnMeta, i) - var subBuffer = columnMeta.data(subBufferMetaObj) - if (subBuffer != null) { - sizes(i) += subBuffer.length - } - subBuffer = columnMeta.offsets(subBufferMetaObj) - if (subBuffer != null) { - sizes(i) += subBuffer.length - } - subBuffer = columnMeta.validity(subBufferMetaObj) - if (subBuffer != null) { - sizes(i) += subBuffer.length - } - } - sizes - } - - override def getColumnSizes(cb: ColumnarBatch): Array[Long] = { - if (!isBatchCompressed(cb)) { - GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize) - } else { - val compressedVector = cb.column(0).asInstanceOf[GpuCompressedColumnVector] - val tableMeta = compressedVector.getTableMeta - require(tableMeta.columnMetasLength == cb.numCols) - getUncompressedColumnSizes(tableMeta) - } - } - override def concatAllAndPutOnGPU(): ColumnarBatch = { decompressBatches() val tmp = batches.toArray @@ -539,7 +471,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], peakDevMemory: SQLMetric, opName: String) extends AbstractGpuCoalesceIterator(iter, - schema, goal, numInputRows, numInputBatches, @@ -562,14 +493,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], override def addBatchToConcat(batch: ColumnarBatch): Unit = batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) - override def getColumnSizes(cb: ColumnarBatch): Array[Long] = { - if (!GpuCompressedColumnVector.isBatchCompressed(cb)) { - GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize) - } else { - GpuCompressedColumnVector.getUncompressedColumnSizes(cb) - } - } - private[this] var codec: TableCompressionCodec = _ private[this] def popAllDecompressed(): Array[ColumnarBatch] = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 00ef2e64b71..8f36537233a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -160,7 +160,6 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], peakDevMemory: SQLMetric, opName: String) extends AbstractGpuCoalesceIterator(iter, - schema, goal, numInputRows, numInputBatches, @@ -205,8 +204,8 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], totalRows += rows } - override def getColumnSizes(batch: ColumnarBatch): Array[Long] = { - schema.fields.indices.map(GpuBatchUtils.estimateGpuMemory(schema, _, batch.numRows())).toArray + override def getBatchDataSize(batch: ColumnarBatch): Long = { + schema.fields.indices.map(GpuBatchUtils.estimateGpuMemory(schema, _, batch.numRows())).sum } override def concatAllAndPutOnGPU(): ColumnarBatch = { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index af0d0482992..623e3f729bc 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.format.CodecType import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.metrics.source.MockTaskContext -import org.apache.spark.sql.types.{DataType, DataTypes, LongType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { @@ -58,75 +58,6 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { }) } - test("limit batches by string size") { - // TODO figure out a better way to deal with the Rmm Event handler - // because we cannot do this multiple times without issues. - - // If this test is run on it's own this is needed. - // RapidsBufferCatalog.init(new RapidsConf(new HashMap[String, String]())) - - val schema = new StructType(Array( - StructField("a", DataTypes.DoubleType), - StructField("b", DataTypes.StringType) - )) - - // create input with 2 rows where the combined string length is > Integer.MAX_VALUE - val input = new BatchIterator(schema, rowCount = 2) - - val numInputRows = createMetric() - val numInputBatches = createMetric() - val numOutputRows = createMetric() - val numOutputBatches = createMetric() - val collectTime = createMetric() - val concatTime = createMetric() - val totalTime = createMetric() - val peakDevMemory = createMetric() - - val it = new GpuCoalesceIterator(input, - schema, - TargetSize(Long.MaxValue), - 0, - numInputRows, - numInputBatches, - numOutputRows, - numOutputBatches, - collectTime, - concatTime, - totalTime, - peakDevMemory, - opName = "opname" - ) { - // override for this test so we can mock the response to make it look the strings are large - override def getColumnSizes(cb: ColumnarBatch): Array[Long] = Array(64, Int.MaxValue) - - override def getColumnDataSize(cb: ColumnarBatch, index: Int, default: Long): Long = - index match { - case 0 => 64L - case 1 => ((Int.MaxValue / 4) * 3).toLong - case _ => default - } - } - - while (it.hasNext) { - val batch = it.next() - batch.close() - } - - assert(numInputBatches.value == 2) - assert(numOutputBatches.value == 2) - } - - private def createMetric() = new SQLMetric("sum") - - class BatchIterator(schema: StructType, var rowCount: Int) extends Iterator[ColumnarBatch] { - override def hasNext: Boolean = { - val hasNext = rowCount > 0 - rowCount -= 1 - hasNext - } - override def next(): ColumnarBatch = FuzzerUtils.createColumnarBatch(schema, 3, 64) - } - test("require single batch") { val conf = makeBatchedBytes(1)