Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add parquet chunked writing ability for list columns #6831

Merged
merged 18 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- PR #6817 Add support for scatter() on lists-of-struct columns
- PR #6805 Implement `cudf::detail::copy_if` for `decimal32` and `decimal64`
- PR #6619 Improve Dockerfile
- PR #6831 Added parquet chunked writing ability for list columns

## Improvements

Expand Down
16 changes: 13 additions & 3 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,25 @@ struct table_metadata {
};

/**
* @brief Derived class of table_metadata which includes nullability information per column of
* input.
* @brief Derived class of table_metadata which includes flattened nullability information of input.
*
* This information is used as an optimization for chunked writes. If the caller leaves
* column_nullable uninitialized, the writer code will assume the worst case : that all columns are
* nullable.
*
* If the column_nullable field is not empty, it is expected that it has a length equal to the
* number of columns in the table being written.
* number of columns in the flattened table being written.
*
* Flattening refers to the flattening of nested columns. For list columns, the number of values
* expected in the nullability vector is equal to the depth of the nesting. e.g. for a table of
* three columns of types: {int, list<double>, float}, the nullability vector contains the values:
*
* |Index| Nullability of |
* |-----|----------------------------------------|
* | 0 | int column |
* | 1 | Level 0 of list column (list itself) |
* | 2 | Level 1 of list column (double values) |
* | 3 | float column |
*
* In the case where column nullability is known, pass `true` if the corresponding column could
* contain nulls in one or more subtables to be written, otherwise `false`.
Expand Down
61 changes: 47 additions & 14 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,9 @@ __global__ void __launch_bounds__(1024) gpuGatherPages(EncColumnChunk *chunks, c
*
* Similarly we merge up all the way till level 0 offsets
*/
dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream)
dremel_data get_dremel_data(column_view h_col,
std::vector<bool> const &level_nullability,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(h_col.type().id() == type_id::LIST,
"Can only get rep/def levels for LIST type column");
Expand Down Expand Up @@ -1699,28 +1701,37 @@ dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream)
size_t max_vals_size = 0;
std::vector<column_view> nesting_levels;
std::vector<uint8_t> def_at_level;
size_type level = 0;
auto add_def_at_level = [&](size_type level) {
auto is_level_nullable =
curr_col.nullable() or (not level_nullability.empty() and level_nullability[level]);
def_at_level.push_back(is_level_nullable ? 2 : 1);
};
while (curr_col.type().id() == type_id::LIST) {
nesting_levels.push_back(curr_col);
def_at_level.push_back(curr_col.nullable() ? 2 : 1);
add_def_at_level(level);
auto lcv = lists_column_view(curr_col);
max_vals_size += lcv.offsets().size();
curr_col = lcv.child();
level++;
}
// One more entry for leaf col
def_at_level.push_back(curr_col.nullable() ? 2 : 1);
add_def_at_level(level);
max_vals_size += curr_col.size();

// Add one more value at the end so that we can have the max def level
def_at_level.push_back(0);
thrust::exclusive_scan(
thrust::host, def_at_level.begin(), def_at_level.end(), def_at_level.begin());

// Sliced list column views only have offsets applied to top level. Get offsets for each level.
hostdevice_vector<size_type> column_offsets(nesting_levels.size() + 1, stream);
hostdevice_vector<size_type> column_ends(nesting_levels.size() + 1, stream);
rmm::device_uvector<size_type> d_column_offsets(nesting_levels.size() + 1, stream);
rmm::device_uvector<size_type> d_column_ends(nesting_levels.size() + 1, stream);

auto d_col = column_device_view::create(h_col, stream);
cudf::detail::device_single_thread(
[offset_at_level = column_offsets.device_ptr(),
end_idx_at_level = column_ends.device_ptr(),
[offset_at_level = d_column_offsets.data(),
end_idx_at_level = d_column_ends.data(),
col = *d_col] __device__() {
auto curr_col = col;
size_type off = curr_col.offset();
Expand All @@ -1741,8 +1752,20 @@ dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream)
},
stream);

column_offsets.device_to_host(stream, true);
column_ends.device_to_host(stream, true);
thrust::host_vector<size_type> column_offsets(nesting_levels.size() + 1);
CUDA_TRY(cudaMemcpyAsync(column_offsets.data(),
d_column_offsets.data(),
d_column_offsets.size() * sizeof(size_type),
cudaMemcpyDeviceToHost,
stream.value()));
thrust::host_vector<size_type> column_ends(nesting_levels.size() + 1);
CUDA_TRY(cudaMemcpyAsync(column_ends.data(),
d_column_ends.data(),
d_column_ends.size() * sizeof(size_type),
cudaMemcpyDeviceToHost,
stream.value()));
harrism marked this conversation as resolved.
Show resolved Hide resolved

stream.synchronize();

rmm::device_uvector<uint8_t> rep_level(max_vals_size, stream);
rmm::device_uvector<uint8_t> def_level(max_vals_size, stream);
Expand Down Expand Up @@ -1773,15 +1796,21 @@ dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream)
thrust::make_counting_iterator(0),
[idx = empties_idx.data(),
mask = lcv.null_mask(),
level_nullable = level_nullability.empty() ? false : level_nullability[level],
curr_def_level = def_at_level[level]] __device__(auto i) {
return curr_def_level + ((mask && bit_is_set(mask, idx[i])) ? 1 : 0);
return curr_def_level +
((mask && bit_is_set(mask, idx[i]) or (!mask && level_nullable)) ? 1 : 0);
});

auto input_child_rep_it = thrust::make_constant_iterator(nesting_levels.size());
auto input_child_def_it = thrust::make_transform_iterator(
thrust::make_counting_iterator(column_offsets[level + 1]),
[mask = lcv.child().null_mask(), curr_def_level = def_at_level[level + 1]] __device__(
auto i) { return curr_def_level + ((mask && bit_is_set(mask, i)) ? 1 : 0); });
[mask = lcv.child().null_mask(),
level_nullable = level_nullability.empty() ? false : level_nullability[level + 1],
curr_def_level = def_at_level[level + 1]] __device__(auto i) {
return curr_def_level +
((mask && bit_is_set(mask, i) or (!mask && level_nullable)) ? 1 : 0);
});

// Zip the input and output value iterators so that merge operation is done only once
auto input_parent_zip_it =
Expand Down Expand Up @@ -1866,8 +1895,10 @@ dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream)
thrust::make_counting_iterator(0),
[idx = empties_idx.data(),
mask = lcv.null_mask(),
level_nullable = level_nullability.empty() ? false : level_nullability[level],
curr_def_level = def_at_level[level]] __device__(auto i) {
return curr_def_level + ((mask && bit_is_set(mask, idx[i])) ? 1 : 0);
return curr_def_level +
((mask && bit_is_set(mask, idx[i]) or (!mask && level_nullable)) ? 1 : 0);
});

// Zip the input and output value iterators so that merge operation is done only once
Expand Down Expand Up @@ -1934,12 +1965,14 @@ dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream)

size_type leaf_col_offset = column_offsets[column_offsets.size() - 1];
size_type leaf_data_size = column_ends[column_ends.size() - 1] - leaf_col_offset;
uint8_t max_def_level = def_at_level.back() - 1;

return dremel_data{std::move(new_offsets),
std::move(rep_level),
std::move(def_level),
leaf_col_offset,
leaf_data_size};
leaf_data_size,
max_def_level};
}

/**
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ struct dremel_data {

size_type leaf_col_offset;
size_type leaf_data_size;
uint8_t max_def_level;
};

/**
Expand All @@ -415,11 +416,15 @@ struct dremel_data {
* def_level = { 1, 1, 1, 0, 1, 1}
* ```
* @param col Column of LIST type
* @param level_nullability Pre-determined nullability at each list level. Empty means infer from
devavret marked this conversation as resolved.
Show resolved Hide resolved
* `col`
* @param stream CUDA stream used for device memory operations and kernel launches.
*
* @return A struct containing dremel data
*/
dremel_data get_dremel_data(column_view h_col, rmm::cuda_stream_view stream);
dremel_data get_dremel_data(column_view h_col,
std::vector<bool> const &level_nullability = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @brief Launches kernel for initializing encoder page fragments
Expand Down
Loading