Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
Renamed CausationId to ConversationId to be consistent with Otel term…
Browse files Browse the repository at this point in the history
…inology
  • Loading branch information
Jeremy D. Miller authored and Jeremy D. Miller committed Sep 12, 2022
1 parent ee694fd commit cabcad9
Show file tree
Hide file tree
Showing 25 changed files with 58 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public async Task StoreIncomingAsync(Envelope[] envelopes)

envelope.Attempts = await reader.GetFieldValueAsync<int>(2, _cancellation);
envelope.Data = await reader.GetFieldValueAsync<byte[]>(3, _cancellation);
envelope.CausationId = await reader.MaybeReadAsync<Guid>(4, _cancellation);
envelope.ConversationId = await reader.MaybeReadAsync<Guid>(4, _cancellation);
envelope.CorrelationId = await reader.MaybeReadAsync<string>(5, _cancellation);
envelope.SagaId = await reader.MaybeReadAsync<string>(6, _cancellation);
envelope.MessageType = await reader.GetFieldValueAsync<string>(7, _cancellation);
Expand Down
8 changes: 4 additions & 4 deletions src/Jasper.Persistence.Database/DatabaseConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static class DatabaseConstants
public const string DeadLetterTable = "jasper_dead_letters";

public const string CorrelationId = "correlation_id"; // add to all
public const string CausationId = "causation_id"; // add to all
public const string ConversationId = "conversation_id"; // add to all
public const string SagaId = "saga_id"; // add to all
public const string ParentId = "parent_id"; // add to all
public const string ContentType = "content_type"; // add to all
Expand All @@ -34,11 +34,11 @@ public static class DatabaseConstants
public const string ReceivedAt = "received_at"; // add to all

public static readonly string IncomingFields =
$"{Body}, {Id}, {Status}, {OwnerId}, {ExecutionTime}, {Attempts}, {CausationId}, {CorrelationId}, {SagaId}, {MessageType}, {ContentType}, {ReplyRequested}, {AckRequested}, {ReplyUri}, {ReceivedAt}";
$"{Body}, {Id}, {Status}, {OwnerId}, {ExecutionTime}, {Attempts}, {ConversationId}, {CorrelationId}, {SagaId}, {MessageType}, {ContentType}, {ReplyRequested}, {AckRequested}, {ReplyUri}, {ReceivedAt}";

public static readonly string OutgoingFields =
$"{Body}, {Id}, {OwnerId}, {Destination}, {DeliverBy}, {Attempts}, {CausationId}, {CorrelationId}, {SagaId}, {MessageType}, {ContentType}, {ReplyRequested}, {AckRequested}, {ReplyUri}";
$"{Body}, {Id}, {OwnerId}, {Destination}, {DeliverBy}, {Attempts}, {ConversationId}, {CorrelationId}, {SagaId}, {MessageType}, {ContentType}, {ReplyRequested}, {AckRequested}, {ReplyUri}";

public static readonly string DeadLetterFields =
$"{Id}, {ExecutionTime}, {Attempts}, {Body}, {CausationId}, {CorrelationId}, {SagaId}, {MessageType}, {ContentType}, {ReplyRequested}, {AckRequested}, {ReplyUri}, {Source}, {Explanation}, {ExceptionText}, {ExceptionType}, {ExceptionMessage}";
$"{Id}, {ExecutionTime}, {Attempts}, {Body}, {ConversationId}, {CorrelationId}, {SagaId}, {MessageType}, {ContentType}, {ReplyRequested}, {AckRequested}, {ReplyUri}, {Source}, {Explanation}, {ExceptionText}, {ExceptionType}, {ExceptionMessage}";
}
10 changes: 5 additions & 5 deletions src/Jasper.Persistence.Database/DatabasePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static void ConfigureOutgoingCommand(DatabaseSettings settings, DbComman
list.Add(builder.AddParameter(envelope.DeliverBy));

list.Add(builder.AddParameter(envelope.Attempts));
list.Add(builder.AddParameter(envelope.CausationId));
list.Add(builder.AddParameter(envelope.ConversationId));
list.Add(builder.AddParameter(envelope.CorrelationId));
list.Add(builder.AddParameter(envelope.SagaId));
list.Add(builder.AddParameter(envelope.MessageType));
Expand Down Expand Up @@ -82,7 +82,7 @@ public static void BuildIncomingStorageCommand(DatabaseSettings settings, DbComm
builder.AddParameter(envelope.OwnerId),
builder.AddParameter(envelope.ScheduledTime),
builder.AddParameter(envelope.Attempts),
builder.AddParameter(envelope.CausationId),
builder.AddParameter(envelope.ConversationId),
builder.AddParameter(envelope.CorrelationId),
builder.AddParameter(envelope.SagaId),
builder.AddParameter(envelope.MessageType),
Expand Down Expand Up @@ -119,7 +119,7 @@ public static async Task<Envelope> ReadIncoming(DbDataReader reader, Cancellatio

envelope.Attempts = await reader.GetFieldValueAsync<int>(5, cancellation);

envelope.CausationId = await reader.MaybeReadAsync<Guid>(6, cancellation);
envelope.ConversationId = await reader.MaybeReadAsync<Guid>(6, cancellation);
envelope.CorrelationId = await reader.MaybeReadAsync<string>(7, cancellation);
envelope.SagaId = await reader.MaybeReadAsync<string>(8, cancellation);

Expand All @@ -144,7 +144,7 @@ public static void ConfigureDeadLetterCommands(ErrorReport[] errors, DbCommandBu
list.Add(builder.AddParameter(error.Envelope.ScheduledTime));
list.Add(builder.AddParameter(error.Envelope.Attempts));
list.Add(builder.AddParameter(error.Envelope.Data));
list.Add(builder.AddParameter(error.Envelope.CausationId));
list.Add(builder.AddParameter(error.Envelope.ConversationId));
list.Add(builder.AddParameter(error.Envelope.CorrelationId));
list.Add(builder.AddParameter(error.Envelope.SagaId));
list.Add(builder.AddParameter(error.Envelope.MessageType));
Expand Down Expand Up @@ -183,7 +183,7 @@ public static async Task<Envelope> ReadOutgoingAsync(DbDataReader reader, Cancel
}

envelope.Attempts = await reader.GetFieldValueAsync<int>(5, cancellation);
envelope.CausationId = await reader.MaybeReadAsync<Guid>(6, cancellation);
envelope.ConversationId = await reader.MaybeReadAsync<Guid>(6, cancellation);
envelope.CorrelationId = await reader.MaybeReadAsync<string>(7, cancellation);
envelope.SagaId = await reader.MaybeReadAsync<string>(8, cancellation);
envelope.MessageType = await reader.GetFieldValueAsync<string>(9, cancellation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void ConfigureCommand(CommandBuilder builder, IMartenSession session)
builder.AddParameter(Envelope.OwnerId),
builder.AddParameter(Envelope.ScheduledTime),
builder.AddParameter(Envelope.Attempts),
builder.AddParameter(Envelope.CausationId),
builder.AddParameter(Envelope.ConversationId),
builder.AddParameter(Envelope.CorrelationId),
builder.AddParameter(Envelope.SagaId),
builder.AddParameter(Envelope.MessageType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void ConfigureCommand(CommandBuilder builder, IMartenSession session)
builder.AddParameter(Envelope.Destination!.ToString()),
builder.AddParameter(Envelope.DeliverBy),
builder.AddParameter(Envelope.Attempts),
builder.AddParameter(Envelope.CausationId),
builder.AddParameter(Envelope.ConversationId),
builder.AddParameter(Envelope.CorrelationId),
builder.AddParameter(Envelope.SagaId),
builder.AddParameter(Envelope.MessageType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public DeadLettersTable(string schemaName) : base(new DbObjectName(schemaName, D
AddColumn<int>(DatabaseConstants.Attempts).DefaultValue(0);
AddColumn(DatabaseConstants.Body, "bytea").NotNull();

AddColumn<Guid>(DatabaseConstants.CausationId);
AddColumn<Guid>(DatabaseConstants.ConversationId);
AddColumn<string>(DatabaseConstants.CorrelationId);
AddColumn<string>(DatabaseConstants.SagaId);
AddColumn<string>(DatabaseConstants.MessageType).NotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public IncomingEnvelopeTable(string schemaName) : base(
AddColumn<int>(DatabaseConstants.Attempts).DefaultValue(0);
AddColumn(DatabaseConstants.Body, "bytea").NotNull();

AddColumn<Guid>(DatabaseConstants.CausationId);
AddColumn<Guid>(DatabaseConstants.ConversationId);
AddColumn<string>(DatabaseConstants.CorrelationId);
AddColumn<string>(DatabaseConstants.SagaId);
AddColumn<string>(DatabaseConstants.MessageType).NotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public OutgoingEnvelopeTable(string schemaName) : base(

AddColumn<int>(DatabaseConstants.Attempts).DefaultValue(0);

AddColumn<Guid>(DatabaseConstants.CausationId);
AddColumn<Guid>(DatabaseConstants.ConversationId);
AddColumn<string>(DatabaseConstants.CorrelationId);
AddColumn<string>(DatabaseConstants.SagaId);
AddColumn<string>(DatabaseConstants.MessageType).NotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public DeadLettersTable(string schemaName) : base(new DbObjectName(schemaName, D
AddColumn<int>(DatabaseConstants.Attempts).DefaultValue(0);
AddColumn(DatabaseConstants.Body, "varbinary(max)").NotNull();

AddColumn<Guid>(DatabaseConstants.CausationId);
AddColumn<Guid>(DatabaseConstants.ConversationId);
AddColumn<string>(DatabaseConstants.CorrelationId);
AddColumn<string>(DatabaseConstants.SagaId);
AddColumn<string>(DatabaseConstants.MessageType).NotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public IncomingEnvelopeTable(string schemaName) : base(
AddColumn<int>(DatabaseConstants.Attempts).DefaultValue(0);
AddColumn(DatabaseConstants.Body, "varbinary(max)").NotNull();

AddColumn<Guid>(DatabaseConstants.CausationId);
AddColumn<Guid>(DatabaseConstants.ConversationId);
AddColumn<string>(DatabaseConstants.CorrelationId);
AddColumn<string>(DatabaseConstants.SagaId);
AddColumn<string>(DatabaseConstants.ParentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public OutgoingEnvelopeTable(string schemaName) : base(
AddColumn(DatabaseConstants.Body, "varbinary(max)").NotNull();

AddColumn<int>(DatabaseConstants.Attempts).DefaultValue(0);
AddColumn<Guid>(DatabaseConstants.CausationId);
AddColumn<Guid>(DatabaseConstants.ConversationId);
AddColumn<string>(DatabaseConstants.CorrelationId);
AddColumn<string>(DatabaseConstants.SagaId);
AddColumn<string>(DatabaseConstants.ParentId);
Expand Down
4 changes: 2 additions & 2 deletions src/Jasper.Pulsar.Tests/DefaultPulsarProtocolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ public void other_random_headers()
[Fact]
public void parent_id()
{
theOriginal.CausationId = Guid.NewGuid();
theEnvelope.CausationId.ShouldBe(theOriginal.CausationId);
theOriginal.ConversationId = Guid.NewGuid();
theEnvelope.ConversationId.ShouldBe(theOriginal.ConversationId);
}


Expand Down
8 changes: 4 additions & 4 deletions src/Jasper.Testing/EnvelopeTester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void default_values_for_original_and_parent_id_are_null()
var parent = new Envelope();

parent.CorrelationId.ShouldBeNull();
parent.CausationId.ShouldBe(Guid.Empty);
parent.ConversationId.ShouldBe(Guid.Empty);
}

[Fact]
Expand Down Expand Up @@ -92,7 +92,7 @@ public void if_reply_requested_header_exists_in_parent_and_matches_the_message_t

var child = parent.CreateForResponse(childMessage);

child.CausationId.ShouldBe(parent.Id);
child.ConversationId.ShouldBe(parent.Id);
child.Destination.ShouldBe(parent.ReplyUri);
}

Expand Down Expand Up @@ -126,7 +126,7 @@ public void original_message_creating_child_envelope()
child.Message.ShouldBeSameAs(childMessage);

child.CorrelationId.ShouldBe(parent.CorrelationId);
child.CausationId.ShouldBe(parent.Id);
child.ConversationId.ShouldBe(parent.Id);
}

[Fact]
Expand All @@ -144,7 +144,7 @@ public void parent_that_is_not_original_creating_child_envelope()
child.Message.ShouldBeSameAs(childMessage);

child.CorrelationId.ShouldBe(parent.CorrelationId);
child.CausationId.ShouldBe(parent.Id);
child.ConversationId.ShouldBe(parent.Id);
}


Expand Down
10 changes: 5 additions & 5 deletions src/Jasper.Testing/Runtime/JasperTracingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class when_creating_an_execution_activity
public when_creating_an_execution_activity()
{
theEnvelope = ObjectMother.Envelope();
theEnvelope.CausationId = Guid.NewGuid();
theEnvelope.ConversationId = Guid.NewGuid();

theEnvelope.MessageType = "FooMessage";
theEnvelope.CorrelationId = Guid.NewGuid().ToString();
Expand All @@ -29,7 +29,7 @@ public when_creating_an_execution_activity()
public void should_set_the_otel_conversation_id_to_correlation_id()
{
theActivity.GetTagItem(JasperTracing.MessagingConversationId)
.ShouldBe(theEnvelope.CorrelationId);
.ShouldBe(theEnvelope.ConversationId);
}

[Fact]
Expand Down Expand Up @@ -68,9 +68,9 @@ public void should_set_the_payload_size_bytes_when_it_exists()
}

[Fact]
public void trace_the_causation_id()
public void trace_the_conversation_id()
{
theActivity.GetTagItem(JasperTracing.MessagingCausationId)
.ShouldBe(theEnvelope.CausationId);
theActivity.GetTagItem(JasperTracing.MessagingConversationId)
.ShouldBe(theEnvelope.ConversationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ public void original_id()
}

[Fact]
public void causation()
public void conversation()
{
outgoing.CausationId = Guid.NewGuid();
incoming.CausationId.ShouldBe(outgoing.CausationId);
outgoing.ConversationId = Guid.NewGuid();
incoming.ConversationId.ShouldBe(outgoing.ConversationId);
}

[Fact]
Expand Down
6 changes: 3 additions & 3 deletions src/Jasper/Envelope.Internals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ internal Envelope CreateForResponse(object message)
{
var child = ForSend(message);
child.CorrelationId = CorrelationId;
child.CausationId = Id;
child.ConversationId = Id;

if (message.GetType().ToMessageTypeName() == ReplyRequested)
{
Expand All @@ -111,7 +111,7 @@ internal Envelope ForSend(object message)
{
Message = message,
CorrelationId = Id.ToString(),
CausationId = Id,
ConversationId = Id,
SagaId = SagaId
};
}
Expand Down Expand Up @@ -180,6 +180,6 @@ internal void WriteTags(Activity activity)
activity.SetTag(JasperTracing.MessageType, MessageType); // Jasper specific
activity.MaybeSetTag(JasperTracing.PayloadSizeBytes, MessagePayloadSize);

activity.MaybeSetTag(JasperTracing.MessagingCausationId, CausationId);
activity.MaybeSetTag(JasperTracing.MessagingConversationId, ConversationId);
}
}
2 changes: 1 addition & 1 deletion src/Jasper/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public object? Message
/// <summary>
/// Id of the immediate message or workflow that caused this envelope to be sent
/// </summary>
public Guid CausationId { get; internal set; }
public Guid ConversationId { get; internal set; }

/// <summary>
/// Location that this message should be sent
Expand Down
2 changes: 1 addition & 1 deletion src/Jasper/EnvelopeConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public static class EnvelopeConstants
public const string CorrelationIdKey = "correlation-id";
public const string SagaIdKey = "saga-id";
public const string IdKey = "id";
public const string CausationIdKey = "parent-id";
public const string ConversationIdKey = "parent-id";
public const string ContentTypeKey = "content-type";
public const string SourceKey = "source";
public const string ReplyRequestedKey = "reply-requested";
Expand Down
12 changes: 6 additions & 6 deletions src/Jasper/Runtime/CommandBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ internal CommandBus(IJasperRuntime runtime, string? correlationId)
public IEnumerable<Envelope> Outstanding => _outstanding;

public IEnvelopeOutbox? Outbox { get; protected set; }
public Guid CausationId { get; protected set; }
public Guid ConversationId { get; protected set; }


public Task InvokeAsync(object message, CancellationToken cancellation = default)
Expand All @@ -48,7 +48,7 @@ public Task InvokeAsync(object message, CancellationToken cancellation = default
{
ReplyUri = TransportConstants.RepliesUri,
CorrelationId = CorrelationId,
CausationId = CausationId,
ConversationId = ConversationId,
Source = Runtime.Advanced.ServiceName
}, cancellation);
}
Expand All @@ -66,7 +66,7 @@ public Task InvokeAsync(object message, CancellationToken cancellation = default
ReplyRequested = typeof(T).ToMessageTypeName(),
ResponseType = typeof(T),
CorrelationId = CorrelationId,
CausationId = CausationId,
ConversationId = ConversationId,
Source = Runtime.Advanced.ServiceName
};

Expand All @@ -89,7 +89,7 @@ public ValueTask EnqueueAsync<T>(T message)

var envelope = Runtime.RoutingFor(message.GetType()).RouteLocal(message, null); // TODO -- propagate DeliveryOptions
envelope.CorrelationId = CorrelationId;
envelope.CausationId = CausationId;
envelope.ConversationId = ConversationId;
envelope.Source = Runtime.Advanced.ServiceName;

return persistOrSendAsync(envelope);
Expand All @@ -104,7 +104,7 @@ public ValueTask EnqueueAsync<T>(T message, string workerQueueName)

var envelope = Runtime.RoutingFor(message.GetType()).RouteLocal(message, workerQueueName, null); // TODO -- propagate DeliveryOptions
envelope.CorrelationId = CorrelationId;
envelope.CausationId = CausationId;
envelope.ConversationId = ConversationId;
envelope.Source = Runtime.Advanced.ServiceName;

return persistOrSendAsync(envelope);
Expand All @@ -123,7 +123,7 @@ public async Task<Guid> ScheduleAsync<T>(T message, DateTimeOffset executionTime
ScheduledTime = executionTime,
Destination = TransportConstants.DurableLocalUri,
CorrelationId = CorrelationId,
CausationId = CausationId,
ConversationId = ConversationId,
Source = Runtime.Advanced.ServiceName
};

Expand Down
2 changes: 1 addition & 1 deletion src/Jasper/Runtime/JasperTracing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal static class JasperTracing
public const string MessagingDestinationKind = "messaging.destination_kind"; // Not sure this is going to be helpful. queue or topic. Maybe port if TCP basically.
public const string MessagingTempDestination = "messaging.temp_destination"; // boolean if this is temporary
public const string PayloadSizeBytes = "messaging.message_payload_size_bytes";
public const string MessagingCausationId = "messaging.causation_id";


// Transport specific things
// messaging.consumer_id
Expand Down
4 changes: 2 additions & 2 deletions src/Jasper/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ internal void ReadEnvelope(Envelope? originalEnvelope, IChannelCallback channel)
{
Envelope = originalEnvelope ?? throw new ArgumentNullException(nameof(originalEnvelope));
CorrelationId = originalEnvelope.CorrelationId;
CausationId = originalEnvelope.Id;
ConversationId = originalEnvelope.Id;
_channel = channel;
_sagaId = originalEnvelope.SagaId;

Expand Down Expand Up @@ -295,7 +295,7 @@ protected override void trackEnvelopeCorrelation(Envelope outbound)

if (Envelope != null)
{
outbound.CausationId = Envelope.Id;
outbound.ConversationId = Envelope.Id;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Jasper/Runtime/WorkerQueues/InlineReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope)
catch (Exception? ex)
{
_logger.LogError(ex,
"Error when trying to Nack a Rabbit MQ message that failed in the HandlerPipeline ({CausationId})",
"Error when trying to Nack a Rabbit MQ message that failed in the HandlerPipeline ({ConversationId})",
envelope.CorrelationId);
}
}
Expand Down
Loading

0 comments on commit cabcad9

Please sign in to comment.