Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: add Arrow dataframes output format #109873

Merged
merged 14 commits into from
Jul 3, 2024
Merged

Conversation

swallez
Copy link
Member

@swallez swallez commented Jun 18, 2024

Adds support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the Accept header or the format request parameter.

Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network.

This PR builds on the experiment made by @nik9000 in PR #104877

Features/limitations

  • all ES|QL data types are supported
  • multi-valued fields are not supported
  • fields of type _source are output as JSON text in a varchar array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats.

Technical details

Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management.

We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks.

Future work

Although it's already quite featured, there are some improvements to be done, beyond the limitations outlined above:

  • we should be able to significantly speed things up by copying directly the ES|QL vector's memory buffer instead of reading each position in a loop
  • ES|QL creates small pages when there are some null values. We should aggregate small pages into larger Arrow record batches to remove the overhead of batch headers.

Tests

The unit tests cover all data types and random combinations of fields and sparse/dense/empty vectors. In addition to that, some manual tests have been done in Python, Rust and Java using the standard Arrow libraries.

Below is a Python example taken from PR #104877. The last 3 lines are where the Arrow format is used: read the Arrow stream, wrap it in a Pandas dataframe, and plot the data.

import base64
import urllib3
import pyarrow as pa
import matplotlib.pyplot as plt
import pandas as pd
plt.close("all")
timeout = urllib3.Timeout(connect=2.0, read=300.0)
http = urllib3.PoolManager(timeout=timeout)

def basic(login, password):
  encoded = base64.b64encode((login + ':' + password).encode()).decode()
  return "Basic %s" % encoded

resp = http.request(
  "POST",
  "http:https://elastic:password@localhost:9200/_query?format=arrow",
  headers = {
    "authorization": basic("elastic", "password")
  },
  json = {
    "query": """
      FROM nyc_taxis 
    | WHERE trip_distance > 0 AND trip_distance < 100 AND fare_amount > 1
    | STATS min=MIN(fare_amount), avg=AVG(fare_amount), max=MAX(fare_amount) BY pickup_datetime=DATE_TRUNC(1 DAY, pickup_datetime)
    | EVAL LOG10(min), LOG10(max), LOG10(avg)
    | DROP min, max, avg
    | SORT pickup_datetime
    | LIMIT 100000"""
  })

if resp.status != 200:
  print("failed: %s" % resp.data)
else:
    with pa.ipc.open_stream(resp.data) as reader:
        df = reader.read_pandas()
        df.plot.line(x="pickup_datetime")

@swallez swallez requested a review from a team as a code owner June 18, 2024 14:50
Copy link

Documentation preview:

@elasticsearchmachine elasticsearchmachine added v8.15.0 needs:triage Requires assignment of a team area label external-contributor Pull request authored by a developer outside the Elasticsearch team labels Jun 18, 2024
@swallez swallez added :Analytics/ES|QL AKA ESQL and removed needs:triage Requires assignment of a team area label external-contributor Pull request authored by a developer outside the Elasticsearch team v8.15.0 labels Jun 18, 2024
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Jun 18, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@nik9000
Copy link
Member

nik9000 commented Jun 21, 2024

This has been on my review list for a few days. I'm just on vacation a bunch of days. It's still on the list. sorry for the delay!

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments, but I think the most important merge blockers are:

  1. Some kind of integration test. I guess it'd be something that uses the arrow ipc library directly in java to read and write some things.
  2. Only allowing this in SNAPSHOT builders. It's super paranoid behavior, but I think it's warranted.

What should we do about multivalued results, do you think? By the time we go to serialize these we're past the point of adding warning headers. At least for now, we could silently drop them. Or maybe log a warning in the server log.

docs/reference/esql/esql-rest.asciidoc Show resolved Hide resolved
throw e;
} finally {
if (output != null) {
// assert false : "failed to write arrow chunk";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should either drop the assert entirely and the if statement - close methods skip nulls because of code like this. So much code like this. Or we should uncomment it. I guess the question is - is it always a bug if we throw an exception while serializing? I guess the question is, does recycler come from netty? If so I think it can throw through no fault of our own.

Also! Are we sure we want to keep the catch Exception and log bit? I might have added that in the prototype, but I think it's not the normal way to do this - normally we'd just let it bubble up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I cleaned it up in 3bb04d3 only kept the Releasables.closeExpectNoException(output). Or should we remove it as well?

x-pack/plugin/esql/arrow/build.gradle Show resolved Hide resolved
// FIXME: are these parameters ok here?
// FIXME: should the resulting block be closed after use?
new NoopCircuitBreaker("esql-arrow"),
BigArrays.NON_RECYCLING_INSTANCE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for a prototype but it's a blocker to stop switch. There are three choices I think that are ok:

  1. Push the converter into the superclass and do it optionally, right as you dump into the wire. This is probably the best way because it involves the least copying and memory management. It might be the slowest because the converter will go megamorphic pretty quick. But it's shoveling a fair number of bits around so it's probably fine. Probably.
  2. Convert the whole block up front in a tight loop with memory management.
  3. Option 1 but with the generics reified out by hand - java doesn't do it for you so you need to do it with your own fingers. Or, in our case, we'd do it with stringtemplate. That's almost certainly the choice that's fastest at runtime. But I don't know that it matters for this.

Either way, this is fine for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know much about memory management in ES. Is there some explanation somewhere or some reference code I could get inspiration from?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more memory management int he compute engine than the rest of ES. The short version is that you can use BlockFactory to make a builder like:

try (IntBlockBuilder builder = block.blockFactory().newIntBlockBuilder(block.getPositionCount())) {
  for... { builder.appendInt(i); }
  try (IntBlock block = builder.build()) {
    super.render(block);
  }
}

That'll account for the new block using the same circuit breakers as the original block. It could be fairly large.

Something like RoundIntEvaluator does something similar.
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 54fa11f to reuse the source block's factory and inherits its circuit breaker.

if (bytes.length != 0) {
bytes = valueConverter.apply(bytes, scratch);
}
builder.appendBytesRef(bytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are probably better off copying the bytes to a single BytesRefBuilder scratch and then passing that to the converter to modify in place. That way it doesn't have to allocate the result.

One interesting thing I've realized is that in most cases it's safe to just directly modify the returned bytes but not in all cases. Which is kind of lame.

@nik9000
Copy link
Member

nik9000 commented Jun 27, 2024

Only allowing this in SNAPSHOT builds. It's super paranoid behavior, but I think it's warranted.

@swallez and I talked about this some offline and I'm convinced it's fine to allow this in non-snapshot builds. It's an extra layer of paranoia that in this case isn't buying us anything. It has really useful side effects when we are making inter-node communication changes, but we aren't doing that here.

@swallez
Copy link
Member Author

swallez commented Jun 27, 2024

@nik9000 I addressed your comments. Regarding the main points:

Some kind of integration test. I guess it'd be something that uses the arrow ipc library directly in java to read and write some things.

In ArrowResponseTests.toArrowVectors() we serialize the ESQL response to a byte array and read it with Arrow's ArrowStreamReader. There is no network involved, but it seemed fine to me as we want to test serialization and deserialization with standard Arrow libraries.

Only allowing this in SNAPSHOT builders. It's super paranoid behavior, but I think it's warranted.

Since it's really peripheral (it's an output format users have to choose explicitly and involves no communication between cluster nodes), it seems overly paranoid to me 😉

What should we do about multivalued results, do you think? By the time we go to serialize these we're past the point of adding warning headers. At least for now, we could silently drop them. Or maybe log a warning in the server log.

I did not look at it currently and considered them as single-valued blocks. Looking at the implementation this is actually wrong, since we will flatten all values up to the size of the page. I'll see if I can add them quickly. Otherwise we can either:

  1. drop multivalued fields from the output as you suggest,
  2. fail early saying that they're not supported yet (it's experimental).

I'd rather go for 2/ which is less surprising for users (i.e. no "but where is my field?")

@swallez
Copy link
Member Author

swallez commented Jun 27, 2024

In c7cd12d I added a check to fail hard with a meaningful message when we encounter a multivalued block (and associated test).

@nik9000
Copy link
Member

nik9000 commented Jun 27, 2024

I'd rather go for 2/ which is less surprising for users (i.e. no "but where is my field?")

👍

The reason I'm keen on involving the network isn't really the network, it's to get a view of how this all looks from the user's perspective with netty involved. For example, throwing your exception there should make sure to come back as an error, but if we've already started the response it won't quite work right. Mostly I'm interested in the error cases and maybe like 1 success case.

As a follow-up I can try and adapt the csv-spec tests to use array which gives us great coverage without having to maintain much more stuff.


// TODO could we "just" get the memory of the array and dump it?
int count = block.getPositionCount();
for (int i = 0; i < count; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Override
public void convert(Block b, List<ArrowBuf> bufs, List<BufWriter> bufWriters) {
BytesRefBlock block = (BytesRefBlock) b;
BytesRefBlock transformed = transformValues(block);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need to close the returned transformed built block. The usual way is try (BytesRefBloc transformed = transformValues(block) {super.convert...}. That'll close it and decrement the memory tracking stuff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to to try and write a test that fails without this.

@swallez
Copy link
Member Author

swallez commented Jul 1, 2024

@nik9000 I added an integration test in 1b75af7 - is this the kind of integration test you're expecting? Are there some additional checks that need to be done regarding memory management?

@nik9000
Copy link
Member

nik9000 commented Jul 1, 2024

@nik9000 I added an integration test in 1b75af7 - is this the kind of integration test you're expecting? Are there some additional checks that need to be done regarding memory management?

Bah that's pretty much what I was going to add. I just didn't see it. sorry. But, yeah, I do think it needs one that hits the types that need the transform.

I think a neat followup is to try and write a test that'll run the CSV tests fetching over arrow. But that's kind of finicky to get in, especially without multivalued fields. That'll get all the types and everything. But for now I think doing a test with some of the transformed types is good.

@swallez
Copy link
Member Author

swallez commented Jul 2, 2024

Added integration tests for values and ip addresses (both are transformed on serialization) in 00aa730

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have discovered a small 🐛. I'll push some changes to your test in a bit and maybe the bug fix. Checking.

builder.appendNull();
} else {
BytesRef bytes = block.getBytesRef(i, scratch);
if (bytes.length != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think about it, I think this should apply to empty BytesRef values. Let the converter decide what to do.

@nik9000
Copy link
Member

nik9000 commented Jul 2, 2024

@swallez I figured out my issue. I was worried about that conversion code not cleaning up the memory tracking. And it wasn't. But your integration test didn't catch it because it was missing an explicit check for memory tracking. Now, what that isn't part of every test, I'm not sure. But it isn't. I had to opt it in. And when I did it started failing. So I added the tracking, just a try block, and broke the test up a bit so I could make sure it was just the transform.

Now I feel much better.

I'm going to give this one more read and then approve it.

@nik9000 nik9000 added the v8.15.0 label Jul 2, 2024
@elasticsearchmachine
Copy link
Collaborator

Hi @swallez, I've created a changelog YAML for you.

@nik9000
Copy link
Member

nik9000 commented Jul 2, 2024

@swallez I knew the change that broke the build for you so I pushed a fix.

@swallez swallez merged commit e78bdc9 into elastic:main Jul 3, 2024
15 checks passed
@nik9000
Copy link
Member

nik9000 commented Jul 9, 2024

Pretty pictures you can make with pandas
image

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.15.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants