Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support expressions in redis output stream field #1283

Prev Previous commit
Next Next commit
Match in out stream name in test
  • Loading branch information
bmarinov committed Jun 7, 2022
commit db7bbf112b05fb4c70c6c6ae685f4ad20e75e48d
7 changes: 3 additions & 4 deletions internal/impl/redis/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ input:
output:
redis_streams:
url: tcp:https://localhost:$PORT
stream: ${! meta("foo")}
stream: ${! meta("routing_meta_stream_name")}
body_key: body
max_length: 0
max_in_flight: $MAX_IN_FLIGHT
Expand All @@ -119,14 +119,13 @@ input:
redis_streams:
url: tcp:https://localhost:$PORT
body_key: body
streams: [ stream-$ID ]
streams: [ expected-stream-name-foo ]
limit: 10
client_id: client-input-$ID
consumer_group: group-$ID
`
suite := integration.StreamTests(
// integration.StreamTestOpenClose(),
integration.StreamTestMetadataRouting(),
integration.StreamTestWithMetadata("routing_meta_stream_name", "expected-stream-name-foo"),
)

suite.Run(
Expand Down
12 changes: 6 additions & 6 deletions internal/integration/stream_test_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,9 +874,8 @@ func StreamTestOutputOnlyOverride(getFn GetMessageFunc) StreamTestDefinition {
)
}

// StreamTestMetadataRouting checks that a message can be routed
// to a stream specified in the metadata
func StreamTestMetadataRouting() StreamTestDefinition {
// StreamTestWithMetadata checks that a message with the given metadata will be received
func StreamTestWithMetadata(metaFieldName string, metaFieldValue string) StreamTestDefinition {
return namedStreamTest(
"can send to redis with stream name from metadata",
func(t *testing.T, env *streamTestEnvironment) {
Expand All @@ -888,12 +887,13 @@ func StreamTestMetadataRouting() StreamTestDefinition {
closeConnectors(t, input, output)
})

metaFieldVal := []string{"foo", "bar"}
messageContent := `{"foo": "bar"}`
metaFieldVal := []string{metaFieldName, metaFieldValue}

require.NoError(t, sendMessage(env.ctx, t, tranChan, "hello world", metaFieldVal...))
require.NoError(t, sendMessage(env.ctx, t, tranChan, messageContent, metaFieldVal...))
received := receiveMessage(env.ctx, t, input.TransactionChan(), nil)

messageMatch(t, received, "hello world")
messageMatch(t, received, messageContent, metaFieldVal...)

// todo: add proper assert?
bmarinov marked this conversation as resolved.
Show resolved Hide resolved
},
Expand Down