-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodic commit log #3343
Periodic commit log #3343
Conversation
lastFlushedOffset = allocatePosition; | ||
lastFsyncOffsetUpdater.accumulateAndGet(this, allocatePosition, Math::max); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this correct? if fsync == false
then we didn't fsync... so why update the pointer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, so much for late night updates (when will I learn)
@@ -99,6 +100,13 @@ public void serialize(T ts, DataOutputPlus out, int version) throws IOException | |||
TopologySerializers.nodeId.serialize(ts.node, out, version); | |||
} | |||
|
|||
public void serialize(T ts, DataOutputPlus out) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why include ones without a version? what caused this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we have some other callers that don't currently have a version, so easier to invoke ones without a version
@@ -81,6 +84,7 @@ public void afterEach() { | |||
@BeforeClass | |||
public static void beforeClass() throws Throwable | |||
{ | |||
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean to add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's for log.xml compatibility
@@ -43,20 +55,35 @@ public class AccordLoadTest extends AccordTestBase | |||
@BeforeClass | |||
public static void setUp() throws IOException | |||
{ | |||
AccordTestBase.setupCluster(builder -> builder.withConfig(config -> config.set("lwt_strategy", "accord").set("non_serial_write_strategy", "accord")), 2); | |||
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you setting this? we don't setup simulator for jvm-dtest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the simulator log file is the most suitable for running this, and this exists only to setup logging for the instances for that file
}, "UPDATE " + qualifiedTableName + " SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, random.nextInt(keyCount)); | ||
if (random.nextFloat() < readChance) | ||
{ | ||
coordinator.executeWithResult((success, fail) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL; Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> callback, String query, ConsistencyLevel serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object... boundValues);
heh...
test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
Show resolved
Hide resolved
@@ -149,7 +149,7 @@ public PairOfSequencesAccordSimulation(SimulatedSystems simulated, | |||
@Override | |||
protected String createTableStmt() | |||
{ | |||
return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq text, PRIMARY KEY (pk))"; | |||
return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq text, PRIMARY KEY (pk)) WITH transactional_mode = 'full'"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"should" this use transactionMode
since you are trying to pass that around now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair, I should pass this through yes
@@ -198,7 +198,7 @@ public static abstract class Builder<S extends Simulation> | |||
protected HeapPool.Logged.Listener memoryListener; | |||
protected SimulatedTime.Listener timeListener = (i1, i2) -> {}; | |||
protected LongConsumer onThreadLocalRandomCheck; | |||
protected String lwtStrategy = "migration"; | |||
protected String transactionMode = "migration"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you only keep this so paxos running accord doesn't change topologies? I deleted in my branch but I am working on trying to get topology changes working,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think lwtStrategy doesn't exist anymore? I was probably doing the bare minimum to get it working
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not spot any problems with this, but I feel like we are severely lacking tests here. I will make it my priority to add at least some test coverage some time soon, as the complexity of these classes grows rather quickly.
long startedRunAt = clock.now(); | ||
boolean flushToDisk = lastFlushedAt + flushPeriodNanos() <= startedRunAt || state != NORMAL || flushRequested; | ||
long startedAt = clock.now(); | ||
boolean flushToDisk = flushStartedAt + flushPeriodNanos() <= startedAt || state != NORMAL || flushRequested; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can move flushPeriodNanos
that dereferences the configurable above this line, and reuse it
|
||
// a signal that writers can wait on to be notified of a completed flush in PERIODIC FlushMode | ||
private final WaitQueue flushComplete = newWaitQueue(); | ||
private final WaitQueue flushComplete = newWaitQueue(); // TODO (expected): this is only used for testing, can we remove this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is now distinction between flush and fsync, should we rename this to fsyncComplete
?
private class FSyncRunnable implements Interruptible.Task | ||
{ | ||
// this is written only by the Flusher thread, and read only by the Fsync thread | ||
ActiveSegment<K, V> fsyncTo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about fsyncUpTo
? Not a native speaker, but when I see the variable first association was that this is a target of fsync.
{ | ||
long requireFsyncTo = startedAt - periodicFlushLagBlockNanos(); | ||
|
||
fsyncTo = segment; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I understand that this is an additional allocation; but maybe we could make these two fields a single variable?
An alternative could be a sentinel boolean that would make another thread spin.
// invoke the onFlush() callback once, covering entire flushed range across all flushed segments | ||
if (syncedSegment != -1 && syncedOffset != -1) | ||
callbacks.onFlush(syncedSegment, syncedOffset); | ||
if (synchronousFsync) postFlushSync(startedAt, current.descriptor.timestamp, syncedOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename this to afterFsync
or afterFlushSync
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surface-level notes here.
Still taking a deeper pass at changes to ActiveSegment
, Flusher
, and Journal
, otherwise looked thoroughly at everything else, with the exception of simulator-related changes which I'm less familiar with.
@@ -99,6 +99,7 @@ | |||
import org.apache.cassandra.db.SinglePartitionReadCommand; | |||
import org.apache.cassandra.db.Slices; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unused import breaking checkstyle/build
@@ -99,6 +99,7 @@ | |||
import org.apache.cassandra.db.SinglePartitionReadCommand; | |||
import org.apache.cassandra.db.Slices; | |||
import org.apache.cassandra.db.filter.ClusteringIndexFilter; | |||
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; | |||
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unused import breaking checkstyle/build
@@ -98,6 +98,21 @@ void selectActive(long maxTimestamp, Collection<ActiveSegment<K, V>> into) | |||
into.add(segment.asActive()); | |||
} | |||
|
|||
ActiveSegment<K, V> oldest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename to oldestActive()
?
@@ -50,6 +51,8 @@ final class ActiveSegment<K, V> extends Segment<K, V> | |||
* Everything before this offset has been written and flushed. | |||
*/ | |||
private volatile int lastFlushedOffset = 0; | |||
private volatile int lastFsyncOffset = 0; | |||
private static final AtomicIntegerFieldUpdater lastFsyncOffsetUpdater = AtomicIntegerFieldUpdater.newUpdater(ActiveSegment.class, "lastFsyncOffset"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: specify at least <ActiveSegment>
with @SuppressWarnings("rawtypes")
?
if (lastFsyncOffset >= lastFlushed) | ||
return; | ||
|
||
flushInternal(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to fsyncInternal()
? flush vs fsync terminology becomes a little confusing here overall, I wonder if there is a better way to deconfuse these.
{ | ||
int allocatePosition = this.allocatePosition.get(); | ||
if (lastFlushedOffset >= allocatePosition) | ||
return lastFlushedOffset; | ||
|
||
waitForModifications(); | ||
flushInternal(); | ||
if (fsync) flushInternal(); | ||
lastFlushedOffset = allocatePosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: move this one line up and put the flushIntearnal()
call (fsyncInternal()
?) and updating the last fsync offset in one if (fsync)
block?
I think I have addressed feedback - could those waiting for changes confirm? Thx |
f90b0f3
to
441180f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
patch by Benedict; reviewed by Aleksey Yeschenko, Alex Petrov and David Capwell for CASSANDRA-19720
441180f
to
9ba4f27
Compare
Thanks for sending a pull request! Here are some tips if you're new here:
Commit messages should follow the following format:
The Cassandra Jira