Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port middleware from 0.x to master #160

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
port middleware
  • Loading branch information
adenhertog committed Nov 10, 2021
commit e9a9b017106de1bda4057a61ff31fed618c565a7
265 changes: 265 additions & 0 deletions packages/bus-core/src/service-bus/bus-configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import { Message } from '@node-ts/bus-messages'
import { DefaultHandlerRegistry, Handler } from '../handler'
import { HandlerDefinition, isClassHandler } from '../handler/handler'
import { JsonSerializer, Serializer } from '../serialization'
import { MemoryQueue, Transport, TransportMessage } from '../transport'
import { ClassConstructor, CoreDependencies, Middleware, MiddlewareDispatcher } from '../util'
import { BusInstance } from './bus-instance'
import { Persistence, Workflow, WorkflowState } from '../workflow'
import { WorkflowRegistry } from '../workflow/registry/workflow-registry'
import { BusAlreadyInitialized } from './error'
import { ContainerAdapter } from '../container'
import { defaultLoggerFactory, LoggerFactory } from '../logger'
import { ContainerNotRegistered } from '../error'
import { MessageSerializer } from '../serialization/message-serializer'
import { InMemoryPersistence } from '../workflow/persistence'

export interface BusInitializeOptions {
/**
* If true, will initialize the bus in send only mode.
* This will provide a bus instance that is capable of sending/publishing
* messages only and won't handle incoming messages or workflows
* @default false
*/
sendOnly: boolean
}

const defaultBusInitializeOptions: BusInitializeOptions = {
sendOnly: false
}

export class BusConfiguration {

private configuredTransport: Transport | undefined
private concurrency = 1
private busInstance: BusInstance | undefined
private container: ContainerAdapter | undefined
private workflowRegistry = new WorkflowRegistry()
private handlerRegistry = new DefaultHandlerRegistry()
private loggerFactory: LoggerFactory = defaultLoggerFactory
private serializer = new JsonSerializer()
private persistence: Persistence = new InMemoryPersistence()
private messageReadMiddlewares = new MiddlewareDispatcher<TransportMessage<unknown>>()

/**
* Initializes the bus with the provided configuration
* @param options Changes the default startup mode of the bus
*/
async initialize (options = defaultBusInitializeOptions): Promise<BusInstance> {
const { sendOnly } = options
const logger = this.loggerFactory('@node-ts/bus-core:bus')
logger.debug('Initializing bus', { sendOnly })

if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

const coreDependencies: CoreDependencies = {
container: this.container,
handlerRegistry: this.handlerRegistry,
loggerFactory: this.loggerFactory,
serializer: this.serializer,
messageSerializer: new MessageSerializer(this.serializer, this.handlerRegistry)
}

if (!sendOnly) {
this.persistence?.prepare(coreDependencies)
this.workflowRegistry.prepare(coreDependencies, this.persistence)
await this.workflowRegistry.initialize(this.handlerRegistry, this.container)

const classHandlers = this.handlerRegistry.getClassHandlers()
if (!this.container && classHandlers.length) {
throw new ContainerNotRegistered(classHandlers[0].constructor.name)
}
}

const transport: Transport = this.configuredTransport || new MemoryQueue()
transport.prepare(coreDependencies)
if (transport.connect) {
await transport.connect()
}
if (!sendOnly && transport.initialize) {
await transport.initialize(this.handlerRegistry)
}
this.busInstance = new BusInstance(
transport,
this.concurrency,
this.workflowRegistry,
coreDependencies,
this.messageReadMiddlewares
)

logger.debug('Bus initialized', { sendOnly, registeredMessages: this.handlerRegistry.getMessageNames() })

return this.busInstance
}


/**
* Register a handler for a specific message type. When Bus is initialized it will configure
* the transport to subscribe to this type of message and upon receipt will forward the message
* through to the provided message handler
* @param messageType Which message will be subscribed to and routed to the handler
* @param messageHandler A callback that will be invoked when the message is received
* @param customResolver Subscribe to a topic that's created and maintained outside of the application
*/
withHandler (classHandler: ClassConstructor<Handler<Message>>): this
withHandler <MessageType extends (Message | object)>(
functionHandler: {
messageType: ClassConstructor<MessageType>,
messageHandler: HandlerDefinition<MessageType>
}
): this
withHandler <MessageType extends (Message | object)>(
handler: ClassConstructor<Handler<Message>> | {
messageType: ClassConstructor<MessageType>,
messageHandler: HandlerDefinition<MessageType>
}): this
{
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

if ('messageHandler' in handler) {
this.handlerRegistry.register(
handler.messageType,
handler.messageHandler
)
} else if (isClassHandler(handler)) {
const handlerInstance = new handler()
this.handlerRegistry.register(
handlerInstance.messageType,
handler
)
}


return this
}

/**
* Registers a custom handler that receives messages from external systems, or messages that don't implement the
* Message interface from @node-ts/bus-messages
* @param messageHandler A handler that receives the custom message
* @param customResolver A discriminator that determines if an incoming message should be mapped to this handler.
*/
withCustomHandler<MessageType extends (Message | object)> (
messageHandler: HandlerDefinition<MessageType>,
customResolver: {
resolveWith: ((message: MessageType) => boolean),
topicIdentifier?: string
}
): this
{
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

this.handlerRegistry.registerCustom(
messageHandler,
customResolver
)
return this
}

/**
* Register a workflow definition so that all of the messages it depends on will be subscribed to
* and forwarded to the handlers inside the workflow
*/
withWorkflow<TWorkflowState extends WorkflowState> (workflow: ClassConstructor<Workflow<TWorkflowState>>): this {
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

this.workflowRegistry.register(
workflow
)
return this
}

/**
* Configures Bus to use a different transport than the default MemoryQueue
*/
withTransport (transportConfiguration: Transport): this {
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

this.configuredTransport = transportConfiguration
return this
}

/**
* Configures Bus to use a different logging provider than the default console logger
*/
withLogger (loggerFactory: LoggerFactory): this {
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

this.loggerFactory = loggerFactory
return this
}

/**
* Configures Bus to use a different serialization provider. The provider is responsible for
* transforming messages to/from a serialized representation, as well as ensuring all object
* properties are a strong type
*/
withSerializer (serializer: Serializer): this {
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

this.serializer = serializer
return this
}

/**
* Configures Bus to use a different persistence provider than the default InMemoryPersistence provider.
* This is used to persist workflow data and is unused if not using workflows.
*/
withPersistence (persistence: Persistence): this {
if (!!this.busInstance) {
throw new BusAlreadyInitialized()
}

this.persistence = persistence
return this
}

/**
* Sets the message handling concurrency beyond the default value of 1, which will increase the number of messages
* handled in parallel.
*/
withConcurrency (concurrency: number): this {
if (concurrency < 1) {
throw new Error('Invalid concurrency setting. Must be set to 1 or greater')
}

this.concurrency = concurrency
return this
}

/**
* Use a local dependency injection/IoC container to resolve handlers
* and workflows.
* @param container An adapter to an existing DI container to fetch class instances from
*/
withContainer (container: { get <T>(type: ClassConstructor<T>): T }): this {
this.container = container
return this
}

/**
* Register optional middlewares that will run for each message that is polled from the transport
* Note these middlewares only run when polling successfully pulls a message off the Transports queue
* After all the user defined middlewares have registered.
*/
withMessageReadMiddleware<TransportMessageType = unknown> (
messageReadMiddleware: Middleware<TransportMessage<TransportMessageType>>
): this {
this.messageReadMiddlewares.use(messageReadMiddleware)
return this
}
}
63 changes: 43 additions & 20 deletions packages/bus-core/src/service-bus/bus-instance.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { InMemoryMessage, MemoryQueue, TransportMessage } from '../transport'
import { Bus } from './bus'
import { BusState } from './bus-state'
import { TestEvent } from '../test/test-event'
import { sleep } from '../util'
import { Middleware, sleep } from '../util'
import { Mock, IMock, Times, It } from 'typemoq'
import { handlerFor, SystemMessageMissingResolver } from '../handler'
import { TestCommand } from '../test/test-command'
Expand All @@ -26,14 +26,17 @@ describe('BusInstance', () => {
let queue: MemoryQueue
let callback: IMock<Callback>
const handler = handlerFor(TestEvent, async (_: TestEvent) => callback.object())
let messageReadMiddleware: IMock<Middleware<TransportMessage<unknown>>>

beforeAll(async () => {
queue = new MemoryQueue()
callback = Mock.ofType<Callback>()
messageReadMiddleware = Mock.ofType<Middleware<TransportMessage<unknown>>>()

bus = await Bus.configure()
.withTransport(queue)
.withHandler(handler)
.withMessageReadMiddleware(messageReadMiddleware.object)
.initialize()
})

Expand Down Expand Up @@ -70,21 +73,37 @@ describe('BusInstance', () => {
})

describe('when a message is successfully handled from the queue', () => {
beforeEach(async () => bus.start())
afterEach(async () => bus.stop())
beforeAll(async () => {
messageReadMiddleware.reset()

it('should delete the message from the queue', async () => {
callback.reset()
callback
.setup(c => c())
.callback(() => undefined)
messageReadMiddleware
.setup(x => x(It.isAny(), It.isAny()))
.returns((_, next) => next())
.verifiable(Times.once())
await bus.publish(event)
await sleep(10)

await bus.start()

await new Promise(async resolve => {
callback.reset()
callback
.setup(c => c())
.callback(resolve)
.verifiable(Times.once())

await bus.publish(event)
})
})

afterAll(async () => bus.stop())

it('should delete the message from the queue', async () => {
expect(queue.depth).toEqual(0)
callback.verifyAll()
})

it('should invoke the message read middlewares', async () => {
messageReadMiddleware.verifyAll()
})
})

describe('when a handled message throws an Error', () => {
Expand All @@ -94,17 +113,21 @@ describe('BusInstance', () => {
it('should return the message for retry', async () => {
callback.reset()
let callCount = 0
callback
.setup(c => c())
.callback(() => {
if (callCount++ === 0) {
throw new Error()
}
})
.verifiable(Times.exactly(2))

await bus.publish(event)
await sleep(2000)
await new Promise<void>(async resolve => {
callback
.setup(c => c())
.callback(() => {
if (callCount++ === 0) {
throw new Error()
} else {
resolve()
}
})
.verifiable(Times.exactly(2))

await bus.publish(event)
})

callback.verifyAll()
})
Expand Down
Loading