-
Notifications
You must be signed in to change notification settings - Fork 24.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: add Arrow dataframes output format #109873
Conversation
Documentation preview: |
Pinging @elastic/es-analytical-engine (Team:Analytics) |
This has been on my review list for a few days. I'm just on vacation a bunch of days. It's still on the list. sorry for the delay! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, but I think the most important merge blockers are:
- Some kind of integration test. I guess it'd be something that uses the arrow ipc library directly in java to read and write some things.
- Only allowing this in SNAPSHOT builders. It's super paranoid behavior, but I think it's warranted.
What should we do about multivalued results, do you think? By the time we go to serialize these we're past the point of adding warning headers. At least for now, we could silently drop them. Or maybe log a warning in the server log.
throw e; | ||
} finally { | ||
if (output != null) { | ||
// assert false : "failed to write arrow chunk"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should either drop the assert entirely and the if
statement - close
methods skip nulls because of code like this. So much code like this. Or we should uncomment it. I guess the question is - is it always a bug if we throw an exception while serializing? I guess the question is, does recycler
come from netty? If so I think it can throw through no fault of our own.
Also! Are we sure we want to keep the catch Exception
and log
bit? I might have added that in the prototype, but I think it's not the normal way to do this - normally we'd just let it bubble up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I cleaned it up in 3bb04d3 only kept the Releasables.closeExpectNoException(output)
. Or should we remove it as well?
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ValueConversions.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java
Outdated
Show resolved
Hide resolved
// FIXME: are these parameters ok here? | ||
// FIXME: should the resulting block be closed after use? | ||
new NoopCircuitBreaker("esql-arrow"), | ||
BigArrays.NON_RECYCLING_INSTANCE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine for a prototype but it's a blocker to stop switch. There are three choices I think that are ok:
- Push the converter into the superclass and do it optionally, right as you dump into the wire. This is probably the best way because it involves the least copying and memory management. It might be the slowest because the converter will go megamorphic pretty quick. But it's shoveling a fair number of bits around so it's probably fine. Probably.
- Convert the whole block up front in a tight loop with memory management.
- Option 1 but with the generics reified out by hand - java doesn't do it for you so you need to do it with your own fingers. Or, in our case, we'd do it with stringtemplate. That's almost certainly the choice that's fastest at runtime. But I don't know that it matters for this.
Either way, this is fine for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know much about memory management in ES. Is there some explanation somewhere or some reference code I could get inspiration from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more memory management int he compute engine than the rest of ES. The short version is that you can use BlockFactory
to make a builder like:
try (IntBlockBuilder builder = block.blockFactory().newIntBlockBuilder(block.getPositionCount())) {
for... { builder.appendInt(i); }
try (IntBlock block = builder.build()) {
super.render(block);
}
}
That'll account for the new block using the same circuit breakers as the original block. It could be fairly large.
Something like RoundIntEvaluator
does something similar.
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added 54fa11f to reuse the source block's factory and inherits its circuit breaker.
if (bytes.length != 0) { | ||
bytes = valueConverter.apply(bytes, scratch); | ||
} | ||
builder.appendBytesRef(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are probably better off copying the bytes to a single BytesRefBuilder
scratch and then passing that to the converter to modify in place. That way it doesn't have to allocate the result.
One interesting thing I've realized is that in most cases it's safe to just directly modify the returned bytes
but not in all cases. Which is kind of lame.
@swallez and I talked about this some offline and I'm convinced it's fine to allow this in non-snapshot builds. It's an extra layer of paranoia that in this case isn't buying us anything. It has really useful side effects when we are making inter-node communication changes, but we aren't doing that here. |
@nik9000 I addressed your comments. Regarding the main points:
In
Since it's really peripheral (it's an output format users have to choose explicitly and involves no communication between cluster nodes), it seems overly paranoid to me 😉
I did not look at it currently and considered them as single-valued blocks. Looking at the implementation this is actually wrong, since we will flatten all values up to the size of the page. I'll see if I can add them quickly. Otherwise we can either:
I'd rather go for 2/ which is less surprising for users (i.e. no "but where is my field?") |
In c7cd12d I added a check to fail hard with a meaningful message when we encounter a multivalued block (and associated test). |
👍 The reason I'm keen on involving the network isn't really the network, it's to get a view of how this all looks from the user's perspective with netty involved. For example, throwing your exception there should make sure to come back as an error, but if we've already started the response it won't quite work right. Mostly I'm interested in the error cases and maybe like 1 success case. As a follow-up I can try and adapt the csv-spec tests to use array which gives us great coverage without having to maintain much more stuff. |
|
||
// TODO could we "just" get the memory of the array and dump it? | ||
int count = block.getPositionCount(); | ||
for (int i = 0; i < count; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@Override | ||
public void convert(Block b, List<ArrowBuf> bufs, List<BufWriter> bufWriters) { | ||
BytesRefBlock block = (BytesRefBlock) b; | ||
BytesRefBlock transformed = transformValues(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll need to close
the returned transformed built block. The usual way is try (BytesRefBloc transformed = transformValues(block) {super.convert...}
. That'll close it and decrement the memory tracking stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to to try and write a test that fails without this.
Bah that's pretty much what I was going to add. I just didn't see it. sorry. But, yeah, I do think it needs one that hits the types that need the transform. I think a neat followup is to try and write a test that'll run the CSV tests fetching over arrow. But that's kind of finicky to get in, especially without multivalued fields. That'll get all the types and everything. But for now I think doing a test with some of the transformed types is good. |
Added integration tests for values and ip addresses (both are transformed on serialization) in 00aa730 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have discovered a small 🐛. I'll push some changes to your test in a bit and maybe the bug fix. Checking.
builder.appendNull(); | ||
} else { | ||
BytesRef bytes = block.getBytesRef(i, scratch); | ||
if (bytes.length != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about it, I think this should apply to empty BytesRef
values. Let the converter decide what to do.
@swallez I figured out my issue. I was worried about that conversion code not cleaning up the memory tracking. And it wasn't. But your integration test didn't catch it because it was missing an explicit check for memory tracking. Now, what that isn't part of every test, I'm not sure. But it isn't. I had to opt it in. And when I did it started failing. So I added the tracking, just a Now I feel much better. I'm going to give this one more read and then approve it. |
Hi @swallez, I've created a changelog YAML for you. |
@swallez I knew the change that broke the build for you so I pushed a fix. |
Adds support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the
Accept
header or theformat
request parameter.Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network.
This PR builds on the experiment made by @nik9000 in PR #104877
Features/limitations
_source
are output as JSON text in avarchar
array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats.Technical details
Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management.
We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks.
Future work
Although it's already quite featured, there are some improvements to be done, beyond the limitations outlined above:
null
values. We should aggregate small pages into larger Arrow record batches to remove the overhead of batch headers.Tests
The unit tests cover all data types and random combinations of fields and sparse/dense/empty vectors. In addition to that, some manual tests have been done in Python, Rust and Java using the standard Arrow libraries.
Below is a Python example taken from PR #104877. The last 3 lines are where the Arrow format is used: read the Arrow stream, wrap it in a Pandas dataframe, and plot the data.