Skip to content

Commit

Permalink
[FLINK-11863][table-runtime-blink] Introduce channel to read and writ…
Browse files Browse the repository at this point in the history
…e compressed data (apache#7944)
  • Loading branch information
KurtYoung committed Mar 9, 2019
1 parent 2345bbe commit 3ed490f
Show file tree
Hide file tree
Showing 13 changed files with 1,352 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.disk.iomanager;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memory.AbstractPagedInputView;

import java.io.IOException;
import java.util.List;

/**
* A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
* {@link FileIOChannel}, making it effectively a data input stream. The view reads it data
* in blocks from the underlying channel. The view can only read data that
* has been written by a {@link ChannelWriterOutputView}, due to block formatting.
*/
public abstract class AbstractChannelReaderInputView extends AbstractPagedInputView {

public AbstractChannelReaderInputView(int headerLength) {
super(headerLength);
}

/**
* Closes this InputView, closing the underlying reader and returning all memory segments.
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
public abstract List<MemorySegment> close() throws IOException;

/**
* Get the underlying channel.
*/
public abstract FileIOChannel getChannel();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.disk.iomanager;

import org.apache.flink.runtime.memory.AbstractPagedOutputView;

import java.io.IOException;

/**
* A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link FileIOChannel},
* making it effectively a data output stream. The view writes it data in blocks to the underlying
* channel.
*/
public abstract class AbstractChannelWriterOutputView extends AbstractPagedOutputView {

public AbstractChannelWriterOutputView(int segmentSize, int headerLength) {
super(segmentSize, headerLength);
}

/**
* Get the underlying channel.
*/
public abstract FileIOChannel getChannel();

/**
* Closes this OutputView, closing the underlying writer
*
* @return the number of bytes in last memory segment.
*/
public abstract int close() throws IOException;

/**
* Gets the number of blocks used by this view.
*/
public abstract int getBlockCount();

/**
* Get output bytes.
*/
public abstract long getNumBytes() throws IOException;

/**
* Get output compressed bytes, return num bytes if there is no compression.
*/
public abstract long getNumCompressedBytes() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,50 @@
* stream. The view reads it data in blocks from the underlying channel. The view can only read data that
* has been written by a {@link ChannelWriterOutputView}, due to block formatting.
*/
public class ChannelReaderInputView extends AbstractPagedInputView {
public class ChannelReaderInputView extends AbstractChannelReaderInputView {

protected final BlockChannelReader<MemorySegment> reader; // the block reader that reads memory segments

protected int numRequestsRemaining; // the number of block requests remaining

private final int numSegments; // the number of memory segment the view works with

private final ArrayList<MemorySegment> freeMem; // memory gathered once the work is done

private boolean inLastBlock; // flag indicating whether the view is already in the last block

private boolean closed; // flag indicating whether the reader is closed

// --------------------------------------------------------------------------------------------

/**
* Creates a new channel reader that reads from the given channel until the last block
* (as marked by a {@link ChannelWriterOutputView}) is found.
*
*
* @param reader The reader that reads the data from disk back into memory.
* @param memory A list of memory segments that the reader uses for reading the data in. If the
* list contains more than one segment, the reader will asynchronously pre-fetch
* blocks ahead.
* @param waitForFirstBlock A flag indicating weather this constructor call should block
* until the first block has returned from the asynchronous I/O reader.
*
*
* @throws IOException Thrown, if the read requests for the first blocks fail to be
* served by the reader.
*/
public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, boolean waitForFirstBlock)
throws IOException
{
public ChannelReaderInputView(
BlockChannelReader<MemorySegment> reader,
List<MemorySegment> memory,
boolean waitForFirstBlock) throws IOException {
this(reader, memory, -1, waitForFirstBlock);
}

/**
* Creates a new channel reader that reads from the given channel, expecting a specified
* number of blocks in the channel.
* <p>
* WARNING: The reader will lock if the number of blocks given here is actually lower than
* the actual number of blocks in the channel.
*
*
* @param reader The reader that reads the data from disk back into memory.
* @param memory A list of memory segments that the reader uses for reading the data in. If the
* list contains more than one segment, the reader will asynchronously pre-fetch
Expand All @@ -85,23 +86,24 @@ public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<Mem
* beyond the channel size.
* @param waitForFirstBlock A flag indicating weather this constructor call should block
* until the first block has returned from the asynchronous I/O reader.
*
*
* @throws IOException Thrown, if the read requests for the first blocks fail to be
* served by the reader.
*/
public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory,
int numBlocks, boolean waitForFirstBlock)
throws IOException
{
public ChannelReaderInputView(
BlockChannelReader<MemorySegment> reader,
List<MemorySegment> memory,
int numBlocks,
boolean waitForFirstBlock) throws IOException {
this(reader, memory, numBlocks, ChannelWriterOutputView.HEADER_LENGTH, waitForFirstBlock);
}

/**
* Non public constructor to allow subclasses to use this input view with different headers.
* <p>
* WARNING: The reader will lock if the number of blocks given here is actually lower than
* the actual number of blocks in the channel.
*
*
* @param reader The reader that reads the data from disk back into memory.
* @param memory A list of memory segments that the reader uses for reading the data in. If the
* list contains more than one segment, the reader will asynchronously pre-fetch
Expand All @@ -114,15 +116,17 @@ public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<Mem
* so any subclass changing the header length should override that methods as well.
* @param waitForFirstBlock A flag indicating weather this constructor call should block
* until the first block has returned from the asynchronous I/O reader.
*
*
* @throws IOException
*/
ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory,
int numBlocks, int headerLen, boolean waitForFirstBlock)
throws IOException
{
ChannelReaderInputView(
BlockChannelReader<MemorySegment> reader,
List<MemorySegment> memory,
int numBlocks,
int headerLen,
boolean waitForFirstBlock) throws IOException {
super(headerLen);

if (reader == null || memory == null) {
throw new NullPointerException();
}
Expand All @@ -132,44 +136,45 @@ public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<Mem
if (numBlocks < 1 && numBlocks != -1) {
throw new IllegalArgumentException("The number of blocks must be a positive number, or -1, if unknown.");
}

this.reader = reader;
this.numRequestsRemaining = numBlocks;
this.numSegments = memory.size();
this.freeMem = new ArrayList<MemorySegment>(this.numSegments);

for (int i = 0; i < memory.size(); i++) {
sendReadRequest(memory.get(i));
}

if (waitForFirstBlock) {
advance();
}
}

public void waitForFirstBlock() throws IOException
{
if (getCurrentSegment() == null) {
advance();
}
}

public boolean isClosed() {
return this.closed;
}

/**
* Closes this InputView, closing the underlying reader and returning all memory segments.
*
*
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
public List<MemorySegment> close() throws IOException {
@Override
public List<MemorySegment> close() throws IOException {
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
this.closed = true;

// re-collect all memory segments
ArrayList<MemorySegment> list = this.freeMem;
final MemorySegment current = getCurrentSegment();
Expand All @@ -192,7 +197,12 @@ public List<MemorySegment> close() throws IOException {
}
return list;
}


@Override
public FileIOChannel getChannel() {
return reader;
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
Expand All @@ -203,12 +213,12 @@ public List<MemorySegment> close() throws IOException {
* adds the segment to the readers return queue, which thereby effectively collects all memory segments.
* Secondly, the method fetches the next non-consumed segment
* returned by the reader. If no further segments are available, this method thrown an {@link EOFException}.
*
*
* @param current The memory segment used for the next request.
* @return The memory segment to read from next.
*
*
* @throws EOFException Thrown, if no further segments are available.
* @throws IOException Thrown, if an I/O error occurred while reading
* @throws IOException Thrown, if an I/O error occurred while reading
* @see AbstractPagedInputView#nextSegment(org.apache.flink.core.memory.MemorySegment)
*/
@Override
Expand All @@ -217,16 +227,16 @@ protected MemorySegment nextSegment(MemorySegment current) throws IOException {
if (this.inLastBlock) {
throw new EOFException();
}

// send a request first. if we have only a single segment, this same segment will be the one obtained in
// the next lines
if (current != null) {
sendReadRequest(current);
}

// get the next segment
final MemorySegment seg = this.reader.getNextReturnedBlock();

// check the header
if (seg.getShort(0) != ChannelWriterOutputView.HEADER_MAGIC_NUMBER) {
throw new IOException("The current block does not belong to a ChannelWriterOutputView / " +
Expand All @@ -237,20 +247,20 @@ protected MemorySegment nextSegment(MemorySegment current) throws IOException {
this.numRequestsRemaining = 0;
this.inLastBlock = true;
}

return seg;
}


@Override
protected int getLimitForSegment(MemorySegment segment) {
return segment.getInt(ChannelWriterOutputView.HEAD_BLOCK_LENGTH_OFFSET);
}

/**
* Sends a new read requests, if further requests remain. Otherwise, this method adds the segment
* directly to the readers return queue.
*
*
* @param seg The segment to use for the read request.
* @throws IOException Thrown, if the reader is in error.
*/
Expand Down
Loading

0 comments on commit 3ed490f

Please sign in to comment.