Skip to content

[Z Design] Existing design of method dispatch (circa 2016) May 3rd

Richard Hightower edited this page May 3, 2016 · 3 revisions

This is not documentation but merely notes on the design (current code really) of the EndpointServer as it stands April 2016. It is very useful for understanding how QBit works.

An EndpointServer is built by the EndpointServerBuilder. The EndpointServerBuilder produces a ServiceEndpointServer.

EndpointServerBuilder.build 4/2016

    public ServiceEndpointServer build() {


        final ServiceBundle serviceBundle;


        serviceBundle = getFactory().createServiceBundle(uri,
                getRequestQueueBuilder(),
                getResponseQueueBuilder(),
                getWebResponseQueueBuilder(),
                getFactory(),
                eachServiceInItsOwnThread, this.getBeforeMethodCall(),
                this.getBeforeMethodCallAfterTransform(),
                this.getArgTransformer(), true,
                getSystemManager(),
                getHealthService(),
                getStatsCollector(), getTimer(),
                getStatsFlushRateSeconds(),
                getCheckTimingEveryXCalls(),
                getCallbackManager(),
                getEventManager(),
                getBeforeMethodSent(),
                getBeforeMethodCallOnServiceQueue(), getAfterMethodCallOnServiceQueue());


        final ServiceEndpointServer serviceEndpointServer = new ServiceEndpointServerImpl(getHttpServer(),
                getEncoder(), getParser(), serviceBundle, getJsonMapper(), this.getTimeoutSeconds(),
                this.getNumberOfOutstandingRequests(), getProtocolBatchSize(),
                this.getFlushInterval(), this.getSystemManager(), getEndpointName(),
                getServiceDiscovery(), getPort(), getTtlSeconds(), getHealthService(), getErrorHandler(),
                getFlushResponseInterval(), getParserWorkerCount(), getEncoderWorkerCount());


        if (serviceEndpointServer != null && qBitSystemManager != null) {
            qBitSystemManager.registerServer(serviceEndpointServer);
        }

        if (services != null) {
            serviceEndpointServer.initServices(services);
        }

        if (servicesWithAlias != null) {
            servicesWithAlias.entrySet().forEach(entry -> serviceEndpointServer.addServiceObject(entry.getKey(), entry.getValue()));
        }
        return serviceEndpointServer;
    }

The serviceBundle has three queues: method request queue, response queue and a web response queue. The web response queue exists so that we can stream responses to the ServiceEndpointServer as normal callback handling does not make sense in terms of WebSocket and REST.

Currently there is one implementation of the ServiceBundle interface.

ServiceBundle

package io.advantageous.qbit.service.impl;

/**
 * Manages a collection of services.
 */
public class ServiceBundleImpl implements ServiceBundle {

We will refer to it as ServiceBundle as at the moment there is only one.

The ServiceBundle was originally used as the primary way to dispatch REST calls, but that activity has been improved upon and the responsibility has moved to other classes, but there still might be some leftover REST/WebSocket dispatch code in the ServiceBundle.

Starting up ServiceBundle

There are many ways to startup a ServiceBundle.

Start up modes in ServiceBundle

    public void startReturnHandlerProcessor(ReceiveQueueListener<Response<Object>> listener) {
        responseQueue.startListener(listener);
    }

    public void startWebResponseReturnHandler(ReceiveQueueListener<Response<Object>> listener) {
        webResponseQueue.startListener(listener);
    }

    public void startReturnHandlerProcessor() {

The third noArg startReturnHandlerProcessor has this logic inside of it.

startReturnHandlerProcessor if the originating request is a web request, but request on the queue.

            
final Request<Object> originatingRequest = response.request().originatingRequest();

if (originatingRequest == null) {
      callbackManager.handleResponse(response);
} else if (originatingRequest instanceof HttpRequest 
                      || originatingRequest instanceof WebSocketMessage) {
     webResponseSendQueue.send(response);
} else {
     callbackManager.handleResponse(response);
}
            

Looks like that can be simplified. :) One problem with this is we have a web queue in a ServiceBundle that is an area that can be refactored. This logic should probably exist only in the ServiceEndpointServer.

ServiceEndpointServerImpl uses the ServiceBundle startup methods

    /**
     * Sets up the response queue listener so we can sendText responses
     * to HTTP / WebSocket end points.
     */
    private void startResponseQueueListener() {
        serviceBundle.startReturnHandlerProcessor();
        serviceBundle.startWebResponseReturnHandler(createResponseQueueListener());
    }
...

    private void doStart() {
     ...
        serviceBundle.startUpCallQueue();
        startResponseQueueListener();
    }

The ServiceEndpointServerImpl also calls serviceBundle.startUpCallQueue().

ServiceBundle handling calls

    /**
     * Start the client bundle.
     */
    public ServiceBundle startUpCallQueue() {
        methodQueue.startListener(new ReceiveQueueListener<MethodCall<Object>>() {

            /**
             * When we receive a method call, we call doCall.
             * @param item item
             */
            @Override
            public void receive(MethodCall<Object> item) {
                doCall(item); 
            }

The method serviceBundle.doCall is one of the best places to debug why things are not working.

Now let's cover adding services to a ServiceBundle

class ServiceBundleImpl
...
    public ServiceBundle addService(Object object) {
...
    public ServiceBundle addServiceWithQueueCallBackHandlers(final Object serviceObject, 
          final QueueCallBackHandler... queueCallBackHandlers) {
...
    public ServiceBundle addServiceObject(final String serviceAddress, final Object serviceObject) {
...
    public ServiceBundle addServiceObjectWithQueueCallBackHandlers(final String serviceAddress,
                                                                   final Object serviceObject,
                                                                   final QueueCallBackHandler... queueCallBackHandlers) {
...
    public ServiceBundle addServiceConsumer(final String serviceAddress,
                                            final Consumer<MethodCall<Object>> methodCallConsumer) {

...
   public void addServiceService(final String serviceAddress, final ServiceQueue serviceQueue) {

...
    public void addServiceService(final String objectName, final String serviceAddress, final ServiceQueue serviceQueue) {

Let's break this down.

  • addService(...)
  • addServiceWithQueueCallBackHandlers(...)
  • addServiceObject(...)
  • addServiceConsumer(...)
  • addServiceService(...)
  • addRoundRobinService

Now that seems like a lot less.

What do they do.

  • addService(...) add a Java object to the service bundle
  • addServiceWithQueueCallBackHandlers(...) add a POJO to the service bundle
  • addServiceObject(...) add a POJO to the service bundle with an alias
  • addServiceConsumer(...) add a MethodConsumer to the service bundle (used for RoundRobin and sharded Service pools)
  • addServiceService(...) add a ServiceQueue to the service bundle

All of the add POJO methods end up calling addServiceObjectWithQueueCallBackHandlers.

Let's cover that one.

ServiceBundle.addServiceObjectWithQueueCallBackHandlers short circuit

    public ServiceBundle addServiceObjectWithQueueCallBackHandlers(final String serviceAddress,
                                                                   final Object serviceObject,
                                                                   final QueueCallBackHandler... queueCallBackHandlers) {

        logger.info("Adding service {} @ {} with object {}", ServiceBundleImpl.class.getName(),
                serviceAddress, serviceObject);

        if (serviceObject instanceof Consumer) {

            //noinspection unchecked
            addServiceConsumer(serviceAddress, (Consumer<MethodCall<Object>>) serviceObject);
            return this;
        }


        if (serviceObject instanceof ServiceQueue) {
            addServiceService(serviceAddress, (ServiceQueue) serviceObject);
            return this;
        }

If somehow you called this addServiceObjectWithQueueCallBackHandlers directly or indirectly with a MethodCall Consumer or a ServiceQueue, it will short circuit to the right add method.

If it does not short circuit, then you gave it a POJO, just a Java object which is your service. It will then create a ServiceQueue for your POJO so your POJO handles calls async.

ServiceBundle.addServiceObjectWithQueueCallBackHandlers building a service queue for a POJO

    public ServiceBundle addServiceObjectWithQueueCallBackHandlers(final String serviceAddress,
                                                                   final Object serviceObject,
                                                                   final QueueCallBackHandler... queueCallBackHandlers) {
...

        /** Turn this client object into a client with queues. */
        final ServiceBuilder serviceBuilder = ServiceBuilder.serviceBuilder()
                .setRootAddress(rootAddress)
                .setServiceObject(serviceObject)
                .setServiceAddress(serviceAddress)
                .setTimer(timer)
                .setResponseQueue(responseQueue)
                .setAsyncResponse(asyncCalls)
                .setInvokeDynamic(invokeDynamic)
                .setSystemManager(systemManager)
                .setRequestQueueBuilder(BeanUtils.copy(this.requestQueueBuilder))
                .setRequestQueueBuilder(requestQueueBuilder)
                .setHandleCallbacks(false)
                .setCreateCallbackHandler(false)
                .setEventManager(eventManager)
                .setBeforeMethodCall(this.beforeMethodCallOnServiceQueue)
                .setAfterMethodCall(this.afterMethodCallOnServiceQueue);

        if (queueCallBackHandlers != null) {
            stream(queueCallBackHandlers).forEach(serviceBuilder::addQueueCallbackHandler);
        }


        final String bindStatHealthName = serviceAddress == null
                ? AnnotationUtils.readServiceName(serviceObject)
                : serviceAddress;

        if (healthService != null) {
            serviceBuilder.registerHealthChecks(healthService, bindStatHealthName);
        }


        if (statsCollector != null) {
            /*
              The default is to flush stats every five seconds, and sample
              every 10_000 queue calls.
             */
            serviceBuilder.registerStatsCollections(bindStatHealthName,
                    statsCollector, sampleStatFlushRate, checkTimingEveryXCalls);
        }


        final ServiceQueue serviceQueue = serviceBuilder.buildAndStart();

        addServiceService(serviceAddress, serviceQueue);
        return this;

Notice that this calls serviceBuilder.buildAndStart() which in turns creates the serviceQueue and calls startServiceQueue build().startServiceQueue().

The serviceQueue has four start methods.

  • start
  • startServiceQueue (same as start, but fluent)
  • startAll (which calls startServiceQueue and startCallBackHandler)
  • startCallBackHandler used to run the serviceQueue standalone

A serviceQueue that is getting used in a serviceBundle should never call startCallBackHandler or startAll before it registers.

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally