Skip to content

Commit

Permalink
[FLINK-9087] [runtime] change the method signature of RecordWriter.br…
Browse files Browse the repository at this point in the history
…oadcastEvent() from BufferConsumer to void

This closes apache#5802.
  • Loading branch information
trionesadam authored and zentol committed Apr 11, 2018
1 parent 4742c34 commit 0a5a64a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter
}
}

public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
public void broadcastEvent(AbstractEvent event) throws IOException {
try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) {
for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
RecordSerializer<T> serializer = serializers[targetChannel];
Expand All @@ -164,7 +164,6 @@ public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
if (flushAlways) {
flushAll();
}
return eventBufferConsumer;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,23 @@ public void testBroadcastEventBufferReferenceCounting() throws Exception {
new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
RecordWriter<?> writer = new RecordWriter<>(partition);

BufferConsumer bufferConsumer = writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);

// Verify added to all queues
assertEquals(1, queues[0].size());
assertEquals(1, queues[1].size());

// get references to buffer consumers (copies from the original event buffer consumer)
BufferConsumer bufferConsumer1 = queues[0].getFirst();
BufferConsumer bufferConsumer2 = queues[1].getFirst();

// process all collected events (recycles the buffer)
for (int i = 0; i < queues.length; i++) {
assertTrue(parseBuffer(queues[i].remove(), i).isEvent());
}

assertTrue(bufferConsumer.isRecycled());
assertTrue(bufferConsumer1.isRecycled());
assertTrue(bufferConsumer2.isRecycled());
}

/**
Expand Down

0 comments on commit 0a5a64a

Please sign in to comment.