Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic commit log #3343

Merged

Conversation

belliottsmith
Copy link
Contributor

Thanks for sending a pull request! Here are some tips if you're new here:

  • Ensure you have added or run the appropriate tests for your PR.
  • Be sure to keep the PR description updated to reflect all changes.
  • Write your PR title to summarize what this PR proposes.
  • If possible, provide a concise example to reproduce the issue for a faster review.
  • Read our contributor guidelines
  • If you're making a documentation change, see our guide to documentation contribution

Commit messages should follow the following format:

<One sentence description, usually Jira title or CHANGES.txt summary>

<Optional lengthier description (context on patch)>

patch by <Authors>; reviewed by <Reviewers> for CASSANDRA-#####

Co-authored-by: Name1 <email1>
Co-authored-by: Name2 <email2>

The Cassandra Jira

lastFlushedOffset = allocatePosition;
lastFsyncOffsetUpdater.accumulateAndGet(this, allocatePosition, Math::max);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct? if fsync == false then we didn't fsync... so why update the pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, so much for late night updates (when will I learn)

@@ -99,6 +100,13 @@ public void serialize(T ts, DataOutputPlus out, int version) throws IOException
TopologySerializers.nodeId.serialize(ts.node, out, version);
}

public void serialize(T ts, DataOutputPlus out) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why include ones without a version? what caused this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have some other callers that don't currently have a version, so easier to invoke ones without a version

@@ -81,6 +84,7 @@ public void afterEach() {
@BeforeClass
public static void beforeClass() throws Throwable
{
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's for log.xml compatibility

@@ -43,20 +55,35 @@ public class AccordLoadTest extends AccordTestBase
@BeforeClass
public static void setUp() throws IOException
{
AccordTestBase.setupCluster(builder -> builder.withConfig(config -> config.set("lwt_strategy", "accord").set("non_serial_write_strategy", "accord")), 2);
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you setting this? we don't setup simulator for jvm-dtest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the simulator log file is the most suitable for running this, and this exists only to setup logging for the instances for that file

}, "UPDATE " + qualifiedTableName + " SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, random.nextInt(keyCount));
if (random.nextFloat() < readChance)
{
coordinator.executeWithResult((success, fail) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL; Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> callback, String query, ConsistencyLevel serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object... boundValues);

heh...

@@ -149,7 +149,7 @@ public PairOfSequencesAccordSimulation(SimulatedSystems simulated,
@Override
protected String createTableStmt()
{
return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq text, PRIMARY KEY (pk))";
return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq text, PRIMARY KEY (pk)) WITH transactional_mode = 'full'";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"should" this use transactionMode since you are trying to pass that around now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair, I should pass this through yes

@@ -198,7 +198,7 @@ public static abstract class Builder<S extends Simulation>
protected HeapPool.Logged.Listener memoryListener;
protected SimulatedTime.Listener timeListener = (i1, i2) -> {};
protected LongConsumer onThreadLocalRandomCheck;
protected String lwtStrategy = "migration";
protected String transactionMode = "migration";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you only keep this so paxos running accord doesn't change topologies? I deleted in my branch but I am working on trying to get topology changes working,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think lwtStrategy doesn't exist anymore? I was probably doing the bare minimum to get it working

@iamaleksey iamaleksey self-requested a review June 3, 2024 10:58
Copy link
Contributor

@ifesdjeen ifesdjeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not spot any problems with this, but I feel like we are severely lacking tests here. I will make it my priority to add at least some test coverage some time soon, as the complexity of these classes grows rather quickly.

long startedRunAt = clock.now();
boolean flushToDisk = lastFlushedAt + flushPeriodNanos() <= startedRunAt || state != NORMAL || flushRequested;
long startedAt = clock.now();
boolean flushToDisk = flushStartedAt + flushPeriodNanos() <= startedAt || state != NORMAL || flushRequested;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can move flushPeriodNanos that dereferences the configurable above this line, and reuse it


// a signal that writers can wait on to be notified of a completed flush in PERIODIC FlushMode
private final WaitQueue flushComplete = newWaitQueue();
private final WaitQueue flushComplete = newWaitQueue(); // TODO (expected): this is only used for testing, can we remove this?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is now distinction between flush and fsync, should we rename this to fsyncComplete ?

private class FSyncRunnable implements Interruptible.Task
{
// this is written only by the Flusher thread, and read only by the Fsync thread
ActiveSegment<K, V> fsyncTo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about fsyncUpTo? Not a native speaker, but when I see the variable first association was that this is a target of fsync.

{
long requireFsyncTo = startedAt - periodicFlushLagBlockNanos();

fsyncTo = segment;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I understand that this is an additional allocation; but maybe we could make these two fields a single variable?

An alternative could be a sentinel boolean that would make another thread spin.

// invoke the onFlush() callback once, covering entire flushed range across all flushed segments
if (syncedSegment != -1 && syncedOffset != -1)
callbacks.onFlush(syncedSegment, syncedOffset);
if (synchronousFsync) postFlushSync(startedAt, current.descriptor.timestamp, syncedOffset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename this to afterFsync or afterFlushSync?

Copy link
Member

@iamaleksey iamaleksey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surface-level notes here.

Still taking a deeper pass at changes to ActiveSegment, Flusher, and Journal, otherwise looked thoroughly at everything else, with the exception of simulator-related changes which I'm less familiar with.

@@ -99,6 +99,7 @@
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.Slices;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unused import breaking checkstyle/build

@@ -99,6 +99,7 @@
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unused import breaking checkstyle/build

@@ -98,6 +98,21 @@ void selectActive(long maxTimestamp, Collection<ActiveSegment<K, V>> into)
into.add(segment.asActive());
}

ActiveSegment<K, V> oldest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rename to oldestActive()?

@@ -50,6 +51,8 @@ final class ActiveSegment<K, V> extends Segment<K, V>
* Everything before this offset has been written and flushed.
*/
private volatile int lastFlushedOffset = 0;
private volatile int lastFsyncOffset = 0;
private static final AtomicIntegerFieldUpdater lastFsyncOffsetUpdater = AtomicIntegerFieldUpdater.newUpdater(ActiveSegment.class, "lastFsyncOffset");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: specify at least <ActiveSegment> with @SuppressWarnings("rawtypes")?

if (lastFsyncOffset >= lastFlushed)
return;

flushInternal();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to fsyncInternal()? flush vs fsync terminology becomes a little confusing here overall, I wonder if there is a better way to deconfuse these.

{
int allocatePosition = this.allocatePosition.get();
if (lastFlushedOffset >= allocatePosition)
return lastFlushedOffset;

waitForModifications();
flushInternal();
if (fsync) flushInternal();
lastFlushedOffset = allocatePosition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move this one line up and put the flushIntearnal() call (fsyncInternal()?) and updating the last fsync offset in one if (fsync) block?

@belliottsmith
Copy link
Contributor Author

I think I have addressed feedback - could those waiting for changes confirm? Thx

Copy link
Member

@iamaleksey iamaleksey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

patch by Benedict; reviewed by Aleksey Yeschenko, Alex Petrov and David Capwell for CASSANDRA-19720
@belliottsmith belliottsmith merged commit a0f7da9 into apache:cep-15-accord Jun 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants