Skip to content

Commit

Permalink
[FLINK-21488][connectors/jdbc] Use JobID in XA global transaction ID
Browse files Browse the repository at this point in the history
  • Loading branch information
mobuchowski authored and rkhachatryan committed Mar 11, 2021
1 parent d3b1531 commit 46546c4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@
package org.apache.flink.connector.jdbc.xa;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RuntimeContext;

import javax.transaction.xa.Xid;

import java.security.SecureRandom;
import java.util.Optional;

/**
* Generates {@link Xid} from:
*
* <ol>
* <li>To provide uniqueness over other jobs and apps, and other instances
* <li>of this job, gtrid consists of
* <li>job id (16 bytes)
* <li>subtask index (4 bytes)
* <li>checkpoint id (4 bytes)
* <li>subtask index
* <li>8 random bytes to provide uniqueness across other jobs and apps (generated at startup using
* {@link SecureRandom})
* <li>bqual consists of 4 random bytes (generated using {@link SecureRandom})
* </ol>
*
* <p>Each {@link SemanticXidGenerator} instance MUST be used for only one Sink (otherwise Xids
* could collide).
* <p>Each {@link SemanticXidGenerator} instance MUST be used for only one Sink (otherwise Xids will
* collide).
*/
@Internal
class SemanticXidGenerator implements XidGenerator {
Expand All @@ -46,20 +50,29 @@ class SemanticXidGenerator implements XidGenerator {

private static final int FORMAT_ID = 201;

private transient byte[] gtridBuffer; // globalTransactionId = checkpoint id (long)
private transient byte[] bqualBuffer; // branchQualifier = task index + random bytes
private transient byte[]
gtridBuffer; // globalTransactionId = job id + task index + checkpoint id
private transient byte[] bqualBuffer; // branchQualifier = random bytes

@Override
public void open() {
bqualBuffer = getRandomBytes(Long.BYTES);
gtridBuffer = new byte[Long.BYTES];
gtridBuffer = new byte[3 * Long.BYTES];
bqualBuffer = getRandomBytes(Integer.BYTES);
}

@Override
public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
writeNumber(runtimeContext.getIndexOfThisSubtask(), gtridBuffer, 0);
// deliberately write only 4 bytes of checkpoint id and rely on random generation
writeNumber((int) checkpointId, gtridBuffer, Integer.BYTES);
Optional<JobID> jobId = runtimeContext.getJobId();
if (jobId.isPresent()) {
System.arraycopy(jobId.get().getBytes(), 0, gtridBuffer, 0, JobID.SIZE);
} else {
// fall back to RNG if jobId is unavailable for some reason
System.arraycopy(getRandomBytes(JobID.SIZE), 0, gtridBuffer, 0, JobID.SIZE);
}

writeNumber(runtimeContext.getIndexOfThisSubtask(), gtridBuffer, JobID.SIZE);
// deliberately write only 4 bytes of checkpoint id
writeNumber((int) checkpointId, gtridBuffer, JobID.SIZE + Integer.BYTES);
// relying on arrays copying inside XidImpl constructor
return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public interface XidGenerator extends Serializable, AutoCloseable {
* Generate new {@link Xid}. Requirements for generated Xids:
*
* <ul>
* <li>MUST be unique across Flink job, and probably across Xids generated by other jobs and
* applications - depends on the usage of this class
* <li>Global Transaction Id MUST be unique across Flink job, and probably across Xids
* generated by other jobs and applications - depends on the usage of this class
* <li>SHOULD be immutable
* <li>SHOULD override {@link Object#hashCode hashCode} and {@link Object#equals equals}
* </ul>
*
* @param runtimeContext can be used for example to derive branch qualifier from subtask index
* @param runtimeContext can be used for example to derive global transaction id
* @param checkpointId can be used for example to derive global transaction id
*/
Xid generateXid(RuntimeContext runtimeContext, long checkpointId);
Expand All @@ -53,10 +53,10 @@ default void close() {}
* Creates a {@link XidGenerator} that generates {@link Xid xids} from:
*
* <ol>
* <li>checkpoint id
* <li>job id
* <li>subtask index
* <li>four random bytes to provide uniqueness across other jobs and apps (generated at
* startup using {@link SecureRandom})
* <li>checkpoint id
* <li>four random bytes generated using {@link SecureRandom})
* </ol>
*
* <p>Each created {@link XidGenerator} instance MUST be used for only one Sink instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

/** Simple uniqueness tests for the {@link SemanticXidGenerator}. */
public class SemanticXidGeneratorTest {
private static final int COUNT = 10_000;
private static final int COUNT = 100_000;

@Test
public void testXidsUniqueAmongCheckpoints() {
Expand All @@ -40,11 +40,11 @@ public void testXidsUniqueAmongCheckpoints() {
}

@Test
public void testXidsUniqueAmongGenerators() {
public void testXidsUniqueAmongJobs() {
long checkpointId = 1L;
SemanticXidGenerator generator = new SemanticXidGenerator();
checkUniqueness(
unused -> {
SemanticXidGenerator generator = new SemanticXidGenerator();
generator.open();
return generator.generateXid(TEST_RUNTIME_CONTEXT, checkpointId);
});
Expand All @@ -53,7 +53,8 @@ public void testXidsUniqueAmongGenerators() {
private void checkUniqueness(Function<Integer, Xid> generate) {
Set<Xid> generated = new HashSet<>();
for (int i = 0; i < COUNT; i++) {
generated.add(generate.apply(i));
// We "drop" the branch id because uniqueness of gtrid is important
generated.add(new XidImpl(0, generate.apply(i).getGlobalTransactionId(), new byte[0]));
}
assertEquals(COUNT, generated.size());
}
Expand Down

0 comments on commit 46546c4

Please sign in to comment.