Skip to content

Commit

Permalink
[FLINK-28198][connectors][cassandra] raise driver timeouts per sessio…
Browse files Browse the repository at this point in the history
…n request and raise it higher than cluster side timetouts (apache#20184)

* [FLINK-28198][connectors][cassandra] Raise driver timeouts per session request and raise it higher than cluster side timeouts. This closes apache#20184
  • Loading branch information
echauchot authored and liujia10 committed Jul 22, 2022
1 parent 800eb3b commit 5aef453
Showing 1 changed file with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.annotations.Table;
Expand Down Expand Up @@ -115,6 +117,8 @@ public class CassandraConnectorITCase
@ClassRule
public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();

private static final int READ_TIMEOUT_MILLIS = 36000;

@Rule public final RetryRule retryRule = new RetryRule();

private static final int PORT = 9042;
Expand Down Expand Up @@ -142,10 +146,11 @@ protected Cluster buildCluster(Cluster.Builder builder) {
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
.withSocketOptions(
new SocketOptions()
// multiply default timeout by 3
// default timeout x 3
.setConnectTimeoutMillis(15000)
// double default timeout
.setReadTimeoutMillis(24000))
// default timeout x3 and higher than
// request_timeout_in_ms at the cluster level
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
.withoutJMXReporting()
.withoutMetrics()
.build();
Expand Down Expand Up @@ -280,13 +285,14 @@ private static void raiseCassandraRequestsTimeouts() {
String patchedConfiguration =
configuration
.replaceAll(
"request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
"request_timeout_in_ms: [0-9]+",
"request_timeout_in_ms: 30000") // x3 default timeout
.replaceAll(
"read_request_timeout_in_ms: [0-9]+",
"read_request_timeout_in_ms: 15000")
"read_request_timeout_in_ms: 15000") // x3 default timeout
.replaceAll(
"write_request_timeout_in_ms: [0-9]+",
"write_request_timeout_in_ms: 6000");
"write_request_timeout_in_ms: 6000"); // x3 default timeout
CASSANDRA_CONTAINER.copyFileToContainer(
Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
"/etc/cassandra/cassandra.yaml");
Expand Down Expand Up @@ -381,13 +387,13 @@ public static void startAndInitializeCassandra() {
}
}
}
session.execute(CREATE_KEYSPACE_QUERY);
session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
}

@Before
public void createTable() {
tableID = random.nextInt(Integer.MAX_VALUE);
session.execute(injectTableName(CREATE_TABLE_QUERY));
session.execute(requestWithTimeout(injectTableName(CREATE_TABLE_QUERY)));
}

@AfterClass
Expand Down Expand Up @@ -458,7 +464,7 @@ protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkp
protected void verifyResultsIdealCircumstances(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 60; x++) {
list.add(x);
Expand All @@ -476,7 +482,7 @@ protected void verifyResultsIdealCircumstances(
protected void verifyResultsDataPersistenceUponMissedNotify(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 60; x++) {
list.add(x);
Expand All @@ -494,7 +500,7 @@ protected void verifyResultsDataPersistenceUponMissedNotify(
protected void verifyResultsDataDiscardingUponRestore(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {

ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 20; x++) {
list.add(x);
Expand Down Expand Up @@ -528,7 +534,7 @@ protected void verifyResultsWhenReScaling(
}

ArrayList<Integer> actual = new ArrayList<>();
ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));

for (com.datastax.driver.core.Row s : result) {
actual.add(s.getInt(TUPLE_COUNTER_FIELD));
Expand Down Expand Up @@ -607,7 +613,7 @@ public void testCassandraTupleAtLeastOnceSink() throws Exception {
sink.close();
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
assertThat(rs.all()).hasSize(20);
}

Expand All @@ -625,7 +631,7 @@ public void testCassandraRowAtLeastOnceSink() throws Exception {
sink.close();
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
assertThat(rs.all()).hasSize(20);
}

Expand All @@ -635,7 +641,7 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception {
annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
writePojos(annotatedPojoClass, null);

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
assertThat(rs.all()).hasSize(20);
}

Expand All @@ -644,7 +650,7 @@ public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Excepti
final Class<? extends Pojo> annotatedPojoClass =
annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
writePojos(annotatedPojoClass, KEYSPACE);
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
assertThat(rs.all()).hasSize(20);
}

Expand Down Expand Up @@ -685,7 +691,7 @@ builderForWriting, injectTableName(INSERT_DATA_QUERY))

tEnv.sqlQuery("select * from testFlinkTable").executeInsert("cassandraTable").await();

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));

// validate that all input was correctly written to Cassandra
List<Row> input = new ArrayList<>(rowCollection);
Expand Down Expand Up @@ -720,7 +726,7 @@ public void testCassandraBatchPojoFormat() throws Exception {
annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);

final List<? extends Pojo> pojos = writePojosWithOutputFormat(annotatedPojoClass);
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
assertThat(rs.all()).hasSize(20);

final List<? extends Pojo> result = readPojosWithInputFormat(annotatedPojoClass);
Expand Down Expand Up @@ -791,7 +797,7 @@ public void testCassandraBatchRowFormat() throws Exception {
sink.close();
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
List<com.datastax.driver.core.Row> rows = rs.all();
assertThat(rows).hasSize(rowCollection.size());
}
Expand Down Expand Up @@ -845,7 +851,7 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception {
sink.close();
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
List<com.datastax.driver.core.Row> rows = rs.all();
assertThat(rows).hasSize(scalaTupleCollection.size());

Expand Down Expand Up @@ -885,7 +891,7 @@ public void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
sink.close();
}

ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
List<com.datastax.driver.core.Row> rows = rs.all();
assertThat(rows).hasSize(1);
// Since nulls are ignored, we should be reading one complete record
Expand All @@ -898,4 +904,8 @@ public void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
.isEqualTo(new scala.Tuple3<>(id, counter, batchId));
}
}

private static Statement requestWithTimeout(String query) {
return new SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
}
}

0 comments on commit 5aef453

Please sign in to comment.