Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DataFusionError instead of ArrowError in SendableRecordBatchStream #5101

Merged
merged 2 commits into from
Jan 30, 2023

Conversation

comphead
Copy link
Contributor

Which issue does this PR close?

Closes #5039.

Rationale for this change

See #5039

What changes are included in this PR?

Replace ArrowError with DataFusionError in DF context

Are these changes tested?

Yes

Are there any user-facing changes?

User will get DatafusionError::ArrowError instead arrow::ArrowError for datafusion contexts

@github-actions github-actions bot added the core Core datafusion crate label Jan 28, 2023
@comphead
Copy link
Contributor Author

@tustvold please check whenever you have time, the code rotting fast :)

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, mostly just some suggestions to use ? instead of map_err(Into::into) in a couple of places.

Given the intrusiveness of this change, I think we should leave this open for a few days in case there are any objections.

@alamb @andygrove

@@ -400,12 +399,9 @@ impl NestedLoopJoinStream {
let mut left_indices_builder = UInt64Builder::new();
let mut right_indices_builder = UInt32Builder::new();
let left_right_indices = match indices_result {
Err(_) => {
// TODO why the type of result stream is `Result<T, ArrowError>`, and not the `DataFusionError`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

Comment on lines 1033 to 1034
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
.collect::<Result<Vec<_>, ArrowError>>()?;

Comment on lines 1047 to 1048
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?
.collect::<Result<Vec<_>, ArrowError>>()?

@@ -821,7 +820,7 @@ pub(crate) fn build_batch_from_indices(
};
columns.push(array);
}
RecordBatch::try_new(Arc::new(schema.clone()), columns)
RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into)
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)

.and_then(|b| {
self.pc_projector
.project(b, partition_values)
.map_err(|e| ArrowError::ExternalError(e.into()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, project already returns ArrowError AFAICT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is project from DataFusion, it returns DataFusionError

.and_then(|array| {
Ok(as_boolean_array(&array)?)
// apply filter array to record batch
.and_then(|filter_array| filter_record_batch(batch, filter_array))
.and_then(|filter_array| {
filter_record_batch(batch, filter_array).map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
filter_record_batch(batch, filter_array).map_err(Into::into)
Ok(filter_record_batch(batch, filter_array)?)

Comment on lines 843 to 844
)
.map_err(Into::<DataFusionError>::into)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
)
.map_err(Into::<DataFusionError>::into)?;
)?;

@@ -462,7 +461,7 @@ impl SortedPartitionByBoundedWindowStream {
if let Some(columns_to_show) = columns_to_show {
let n_generated = columns_to_show[0].len();
self.prune_state(n_generated)?;
RecordBatch::try_new(schema, columns_to_show)
RecordBatch::try_new(schema, columns_to_show).map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RecordBatch::try_new(schema, columns_to_show).map_err(Into::into)
Ok(RecordBatch::try_new(schema, columns_to_show)?)

Comment on lines 361 to 362
.collect::<Result<Vec<ArrayRef>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.collect::<Result<Vec<ArrayRef>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
.collect::<Result<Vec<ArrayRef>, ArrowError>>()?;


// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expression results are always appended to the columns
let mut batch_columns = batch.columns().to_vec();
// calculate window cols
batch_columns.extend_from_slice(&columns);
RecordBatch::try_new(self.schema.clone(), batch_columns)
RecordBatch::try_new(self.schema.clone(), batch_columns).map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RecordBatch::try_new(self.schema.clone(), batch_columns).map_err(Into::into)
Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?)

@tustvold tustvold added the api change Changes the API exposed to users of the crate label Jan 28, 2023
@tustvold tustvold changed the title Replace ArrowError with DataFusionError in DF context Use DataFusionError instead of ArrowError in SendableRecordBatchStream Jan 28, 2023
@comphead
Copy link
Contributor Author

Thanks @tustvold for your review. Fixed all the comments

@jackwener
Copy link
Member

cc @liukun4515 , You may also be interested in this.

@alamb
Copy link
Contributor

alamb commented Jan 30, 2023

Also @yahoNanJing and the Ballista team, this may result in chrun

@tustvold tustvold merged commit 74b05fa into apache:master Jan 30, 2023
@ursabot
Copy link

ursabot commented Jan 30, 2023

Benchmark runs are scheduled for baseline = 9c8bdfe and contender = 74b05fa. 74b05fa is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use DataFusionError in SendableRecordBatchStream
5 participants