Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ transport doesn't allow custom message headers. #202

Open
vsabirov opened this issue Apr 28, 2023 · 4 comments
Open

RabbitMQ transport doesn't allow custom message headers. #202

vsabirov opened this issue Apr 28, 2023 · 4 comments

Comments

@vsabirov
Copy link

Hello, this is a very great library! However, there is one small issue regarding the RabbitMQ transport.

Take rabbitmq-delayed-message-exchange plugin for the RabbitMQ server as an example.

Given an up and running RabbitMQ server with the rabbitmq-delayed-message-exchange plugin installed, the task is to simply use it and send a delayed message. For example, notify an external web server with a request after 1 hour, and if it succeeds, send an event down the bus, or else send another notification command down the bus that would be delayed for another 1 hour in case the external server is down or whatever.

This is currently impossible with node-ts/bus, because:

  1. publishMessage builds its own headers from MessageAttributes and doesn't allow user-specified custom headers, so we can't use the x-delay header required for the delayed message.

  2. There is no possible way to create an exchange with a custom type, in our case x-delayed-type is needed. This is because all methods and fields to achieve this are privated out, so there is no intended way.

It would be much more helpful to be able to specify custom message headers and assert custom exchanges.

@adenhertog
Copy link
Contributor

hi @vsabirov, I had a read through the rabbit implementation again. Generally one of the goals of this library is to abstract away transport-specific features so that they're largely interchangeable. With that said, the ability to specify custom headers is understandable and I wanted to confirm some behaviour with you.

If you were to do Bus.publish(yourEvent, { attributes: { 'x-delay': 100 } }) then you'd expect this to be added directly against the header.

Currently per rabbitmq-transport.ts:367 this is being serialized against a different key like attributes: JSON.stringify({ 'x-delay': 100 }) ie:

image

Would this work for you if all attributes were spread onto the headers? ie:

headers: {
  ...messageOptions.attributes
}

@vsabirov
Copy link
Author

vsabirov commented May 1, 2023

@adenhertog Perhaps the ability to specify transport-agnostic attributes in a special header should be kept as is. Sure, it will work fine, however i'm afraid that the user can overwrite some special RMQ headers by accident and will be forced to rename their attributes to avoid collisions.

Perhaps something like

const rawHeaders = messageOptions.attributes.rawHeaders

and

headers: {
  attributes: messageOptions.attributes ? JSON.stringify(messageOptions.attributes) : undefined,
  stickyAttributes: messageOptions.stickyAttributes ? JSON.stringify(messageOptions.stickyAttributes) : undefined

  ...rawHeaders
}

And to use that you'll just have to specify a rawHeaders object in the message options that will be additionally spread into the headers.

@adenhertog
Copy link
Contributor

@vsabirov I'm happy with what you've proposed - the reasoning makes a lot of sense. I can't get around to this immediately, but I'm open to accepting PRs if you need this soon

@talentumtuum
Copy link

talentumtuum commented May 4, 2023

@adenhertog I propose to hide access to internal attributes from regular use with symbols. The user should understand that this is an internal API.

I see a solution to this problem as follows

@node-ts/bus-messages

export kInternalAttributes = Symbol('@node-ts/bus-messages/internal-attributes')

export type InternalAttributes<T> =  T

export interface MessageAttributes<
  AttributesType extends MessageAttributeMap = MessageAttributeMap,
  StickyAttributesType extends MessageAttributeMap = MessageAttributeMap,
  InternalAttributesType = InternalAttributes<unknown>
> 
    correlationId?: Uuid;
    attributes: AttributesType;
    stickyAttributes: StickyAttributesType;
    [kInternalAttributes]: InternalAttributesType
}

@node-ts/bus-rabbitmq

export const kRabbitMQHeaders = Symbol('@node-ts/bus-rabbitmq/rabbitmq-headers')

export interface RabbitMQInternalAttributes = {
  headers: MessageAttributeMap
}

private async publishMessage(
    message: Message,
    messageOptions: MessageAttributes<..., ..., RabbitMQInternalAttributes> = { attributes: {}, stickyAttributes: {}, internalAttributes: {} }
  ): Promise<void> {
    await this.assertExchange(message.$name)
    const payload = this.coreDependencies.messageSerializer.serialize(message)
    const internalAttributes = messageOptions[kInternalAttributes]
    this.channel.publish(message.$name, '', Buffer.from(payload), {
      correlationId: messageOptions.correlationId,
      messageId: uuid.v4(),
      headers: {
        attributes: messageOptions.attributes
          ? JSON.stringify(messageOptions.attributes)
          : undefined,
        stickyAttributes: messageOptions.stickyAttributes
          ? JSON.stringify(messageOptions.stickyAttributes)
          : undefined
        ...internalAttributes.headers
      }
    })
  }

Usage

import { kInternalAttributes } from '@node-ts/bus-messages'

bus.publish(new Message(), {
  [kInternalAttributes]: { // typed
    headers: {
      'x-delay': 5000
    }
  }
})

This is a draft solution, I'm just suggesting a concept

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants