Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem intercepting messages sent with StreamBridge #2885

Closed
jgsmarques opened this issue Jan 17, 2024 · 8 comments
Closed

Problem intercepting messages sent with StreamBridge #2885

jgsmarques opened this issue Jan 17, 2024 · 8 comments
Assignees
Labels
Milestone

Comments

@jgsmarques
Copy link

I have a global interceptor configured for all my output channels to log messages:

@Slf4j
@RequiredArgsConstructor
public class OutputChannelInterceptor implements ChannelInterceptor {
  private final BindingServiceProperties bindingServiceProperties;

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    InterceptorProperties properties =
        new InterceptorProperties(message, channel, bindingServiceProperties);
    log.debug(
        "Outbound channel {} received message in topic {}",
        properties.getChannelName(),
        properties.getTopicName());
    log.trace("Full message is {}", message);
    return message
  }

  @Override
  public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
    InterceptorProperties properties =
        new InterceptorProperties(message, channel, bindingServiceProperties);
    log.debug(
        "Outbound channel {} processed message in topic {} and message {} sent",
        properties.getChannelName(),
        properties.getTopicName(),
        sent ? "was" : "was not");
    log.trace("Full message is {}", message);
  }

  @Data
  static class InterceptorProperties {
    private final String channelName;
    private final String topicName;

    public InterceptorProperties(
        Message<?> message,
        MessageChannel channel,
        BindingServiceProperties bindingServiceProperties) {
      if (channel instanceof DirectWithAttributesChannel directChannel) {
        channelName = directChannel.getFullChannelName();
        topicName =
            directChannel.getComponentName() != null
                ? Optional.ofNullable(
                        bindingServiceProperties.getBindingProperties(
                            directChannel.getComponentName()))
                    .map(BindingProperties::getDestination)
                    .orElse("")
                : "";
      } else {
        channelName = "";
        topicName = "";
      }
    }
  }
}

I use bindingServiceProperties mostly to determine the kafka topic that the message is being sent to.
This works fine when i use message processing like

@Bean
public Function<String, String> interceptor() {
  return message -> "interceptorMessage";
}

However, when i need to send a message on demand (e.g., via a REST API call), I'm using the StreamBridge approach

@Slf4j
@Component
@RequiredArgsConstructor
static class StreamProcessor {
  @Autowired public StreamBridge streamBridge;

  public void testProduce(String key, String message) {
    streamBridge.send(key, "message");
  }
}

For this case, I can't seem to extract the channel name from the channel in the interceptor. I navigated in stream bridge code up to the point where the channel is created, and it does not seem to have a name at all when it is created this way. Is there any way that I can have at least the channel name in the interceptor, or is there an alternative way to produce the message without stream bridge?

Version of the framework 2023.0.0

@sobychacko
Copy link
Contributor

@tigermarques, Could you put the code you provided above as a runnable sample application? This way, the triaging becomes easier.

@jgsmarques
Copy link
Author

Do you mean a repo with the issue in a reproducible format? Or just the classes?

@sobychacko
Copy link
Contributor

I meant a standalone app in a repo.

@jgsmarques
Copy link
Author

Hi,
I managed to get the code in a minimal repo so that you can see the issue.
https://github.com/tigermarques/test-stream-bridge

There is one test which is failing because the interceptor cannot get the channel name when the message is sent via the StreamBridge.

@sobychacko sobychacko added this to the 4.1.1 milestone Jan 19, 2024
@sobychacko sobychacko self-assigned this Jan 19, 2024
@sobychacko
Copy link
Contributor

sobychacko commented Jan 19, 2024

@tigermarques Thanks for the sample; that helped. It turned out to be a bug. I pushed a change to address the issue. Please take a look at the commit above. However, you need a change in your test. I believe that the interceptor-test-application in lines from 28 - 35 needs to be updated to use streambridge-test-application from here.

We are having some issues with our builds right now. Because of this, a snapshot of the new changes is currently unavailable. In the meantime, you can check out the main branch locally, build the spring-cloud-stream module individually, and then use 4.1.1-SNAPSHOT in your pom.xml for spring-cloud-stream. to verify the changes. The fix will be available in the next point release (4.1.1). Thanks!

@jgsmarques
Copy link
Author

Thanks a lot! You're right, the test needs to be updated, I had copied it from a different app and forgot to change the application name in the test.
Do you know in which version of spring cloud dependencies this may be available?

sobychacko added a commit to sobychacko/spring-cloud-stream that referenced this issue Jan 19, 2024
Fixes spring-cloud#2885

The `DirectWithAttributesChannel` used by `StreamBridge` is missing naming
information. Adding the proper application context and component name data
to the channel so that it is able to construct a name when queried.
@sobychacko
Copy link
Contributor

I just backported to 4.0.x of Spring Cloud Stream, so this will be available via spring-cloud 2023.0.1 and 2022.0.5 when they are released.

@ging205
Copy link

ging205 commented Jul 2, 2024

@sobychacko Can i get some insights on this thread:
https://stackoverflow.com/questions/78694499/spring-cloud-stream-kafka-binder-consumer-interceptor

I am not sure if i can post this here, but just thought of checking if i can
get some thoughts on intercepting the messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants