Skip to content

Commit

Permalink
Revert "[FLINK-4154] [core] Correction of murmur hash breaks backward…
Browse files Browse the repository at this point in the history
…s compatibility"

This reverts commit 81cf229.

We had an incorrent implementation of Murmur hash in Flink 1.0. This
was fixed in 641a0d4 for Flink 1.1. Then we thought that we need to
revert this in order to ensure backwards compatability between Flink
1.0 and 1.1 savepoints (81cf22). Turns out, savepoint backwards
compatability is broken for other reasons, too. Therefore, we revert
81cf22 here, ending up with a correct implementation of Murmur hash
again.
  • Loading branch information
uce committed Aug 1, 2016
1 parent aed56ae commit 0f92a6b
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ protected void preSubmit() throws Exception {

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" +
"3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" +
"4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath);
compareResultsByLinesInMemory("4> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" +
"4> 1431162155\n" + "3> -1557280266\n" + "4> -1728529858\n" + "3> 1654374947\n" +
"3> -65105105\n" + "3> -518907128\n" + "4> -252332814\n", this.resultPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ public static int murmurHash(int code) {
code *= 0x1b873593;

code = Integer.rotateLeft(code, 13);
// By the MurmurHash algorithm the following should be "code = code * 5 + 0xe6546b64;" (see FLINK-3623)
// but correcting the algorithm is a breaking change (see FLINK-4154). The effect of the resulting skew
// increases with increased parallelism (see FLINK-4154).
code *= 0xe6546b64;
code = code * 5 + 0xe6546b64;

code ^= 4;
code ^= code >>> 16;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {

override def run(ctx: SourceContext[(Int, Int)]): Unit = {
0 until numElements foreach {
i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i))
// keys '1' and '2' hash to different buckets
i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public TupleSource(int numElements, int numKeys) {
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i);
// keys '1' and '2' hash to different buckets
Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
ctx.collect(result);
}
}
Expand Down

0 comments on commit 0f92a6b

Please sign in to comment.