Skip to content

Commit

Permalink
[streaming] Proper hash join added for window joins
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Feb 7, 2015
1 parent a07d59d commit 6d49d1d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.flink.streaming.api.function.co;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
Expand All @@ -40,16 +43,35 @@ public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?>

@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
for (IN1 item1 : first) {
Object key1 = keySelector1.getKey(item1);

for (IN2 item2 : second) {
Object key2 = keySelector2.getKey(item2);
Map<Object, List<IN1>> map = build(first);

if (key1.equals(key2)) {
out.collect(joinFunction.join(item1, item2));
for (IN2 record : second) {
Object key = keySelector2.getKey(record);
List<IN1> match = map.get(key);
if (match != null) {
for (IN1 matching : match) {
out.collect(joinFunction.join(matching, record));
}
}
}

}

private Map<Object, List<IN1>> build(List<IN1> records) throws Exception {

Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();

for (IN1 record : records) {
Object key = keySelector1.getKey(record);
List<IN1> current = map.get(key);
if (current == null) {
current = new LinkedList<IN1>();
map.put(key, current);
}
current.add(record);
}

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;

import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple1;
Expand Down Expand Up @@ -119,8 +120,10 @@ public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross(

env.execute();

assertEquals(joinExpectedResults, joinResults);
assertEquals(crossExpectedResults, crossResults);
assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),
new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResults));
assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossExpectedResults),
new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResults));
}

private static class MyTimestamp<T> implements Timestamp<T> {
Expand Down

0 comments on commit 6d49d1d

Please sign in to comment.