Skip to content

[Z Notebook] More benchmarking internal

Richard Hightower edited this page Feb 8, 2015 · 30 revisions

#QBit Queue speed 200M messages

Graph

Doing some perf benchmarks so I know when refactors make performance better or worse.

This first test is a no op. Just sending a message and counting the messages sent.

QBit with ArrayBlockingQueue various batches and writer thread counts

        final LongList times = new LongList();

        for (int writers = 0; writers < 20; writers++) {
            final QueueBuilder qb = queueBuilder()
                                    .setBatchSize(10)
                                    .setSize(10_000_000)
                                    .setArrayBlockingQueue();

            puts("NUM WRITER THREADS", writers+1);
            perfTest(qb, 1, writers+1, 100_000_000, 5_000, times, 10_000);
            System.gc();
            Sys.sleep(1000);

        }

Graph

Not that the above is log for ms. 500 ms is doable for 100M messages with a batch size of 1,000.

A batch size of 10,000 comes out to a rate of 280M messages a second.

Looking at in non-log for time:

Graph

Here is what 200M messages look like.

Graph

At a batch size of 100 it does ok. 1,000 it does quite well. The larger the batch size, the better job it does no giving up the thread so the 1 reader thread can work.

We can see the rate for the larger batch sizes (1,000, 10,000 and 100,000) achieve a rate of 400 M messages a second.

Now pushing it to 400 M messaging, we get

Graph

At this point we are running into garbage collector overhead (I think). You can see the effective rate has slipped. It went down to around 200 M messages a second.

After running the profiler, there really does not appear to be that much garbage collection. It could be an issue of buffer creation time. One could create a feedback system to return spent buffers. The thread handoff cost savings, seems to be canceled out the buffer creation time at a certain level.

Two ways to get around this, is adopt a full ring buffer approach and/or adopt a buffer recycling approach to return spent buffers during idle periods.

Graph

Adding a 10 ms pause in the writer threads every 10,000,000 sends seems to smooth out the chart. Not sure why. But it makes the difference between 1,000, 10,000, and 100,000 mostly go away.

Also tried this test with the LinkedTransferQueue as an option.

Graph

One writer does very poorly as does a batch size of 100. Let's increase the minimum writer to 2, and drop the 100 batch size so we can see the numbers a little better.

Graph

229 M messages a second using QBit LinkedTransferQueue.

Graph

        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setLinkTransferQueue().setCheckEvery(checkEvery);

You have the option for CPU intensive readers, to check to see if the reader (or readers) are busy. If they reader is not busy, then you can send him what you have before you reach the full batch size.

In this perf test, this feature is a wash since the reader is more or less a no-op.

Next, we do 1,000,000 operations for each of the 400 M messages.

Graph

        final int batchSize = 10_000;
        final int totalSends = 400_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = batchSize/10;
        final boolean cpuIntensive = true;
        final int times = 1_000_000;

//         LTQ check every & try transfer
//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue()
//                .setCheckEvery(checkEvery).setTryTransfer(true);


        //Check every
        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setLinkTransferQueue()
                .setCheckEvery(checkEvery);

          //LTQ
//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();

           //LBQ
//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setSize(10_000_000).setArrayBlockingQueue();

       //warmup
        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(10_000);
        }

Add more intensive CPU operation (added a loop around last one).

Graph

        final int batchSize = 10_000;
        final int totalSends = 100_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = 1000;
        final boolean cpuIntensive = true;
        final int times = 2_000_000_000;


    //        final QueueBuilder queueBuilder = queueBuilder()
    //                .setBatchSize(batchSize)
    //                .setLinkTransferQueue()
    //                .setCheckEvery(checkEvery);//.setTryTransfer(true);


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();


        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setSize(10_000_000).setArrayBlockingQueue();

        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(3_000);
        }

Loop for CPU intensive

  public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    if (cpuIntensive && total % 13 == 0) {
                        doSomething(value);
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }



        private void doSomething(Integer value) {

            long lv = 0;
            for (int j = 0; j < 10; j++) {
                for (int index = 0; index < times; index++) {
                    lv = value * index % 13 + index;
                    lv = lv * 47;
                    lv = lv * 1000;
                    lv = lv * 13 + lv % 31;
                }
                this.answer.set(this.answer.get() + lv);
            }
        }

##Code for test before cleanup and LinkedTransferQueue work

I hand edited the different params in the main method of this class.

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        int total;

        volatile int totalOut;

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery ) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {

        final QueueBuilder queueBuilder = queueBuilder().setBatchSize(100_000).setSize(10_000_000).setArrayBlockingQueue();
        /*

    public static void perfTest(
                         final QueueBuilder queueBuilder,
                         final int readers,
                         final int writers,
                         final int itemsEachThread,
                         final int timeOut) {

         */

        perfTest(queueBuilder, 1, 10, 500_000_000, 50_000, new LongList(), 100, 10, 1_000_000);

        System.gc();
        Sys.sleep(10_000);


        final LongList times = new LongList();

        for (int writers = 0; writers < 20; writers+=3) {
            final QueueBuilder qb = queueBuilder().setBatchSize(100).setSize(10_000_000).setArrayBlockingQueue();

            puts("NUM WRITER THREADS", writers+1);
            perfTest(qb, 1, writers+1, 400_000_000, 5_000, times, 100, 10, 10_000_000);
            System.gc();
            Sys.sleep(5_000);

        }

//        final LongList times = new LongList();
//
//        for (int writers = 0; writers < 20; writers++) {
//            final QueueBuilder qb = queueBuilder().setBatchSize(100).setLinkTransferQueue();
//            puts("NUM WRITER THREADS", writers+1);
//            perfTest(queueBuilder, 1, writers+1, 500_000_000, 5_000, times, 10_000);
//            System.gc();
//            Sys.sleep(1000);
//
//        }


        for (Long value : times) {
            puts(value);
        }

        puts(times);
        puts(
                "\nmin    \t", times.min(),
                "\nmax    \t", times.max(),
                "\nmean   \t", times.mean(),
                "\nmedian \t", times.median(),
                "\nstddev \t", times.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

##Raw output of first run

--------------------------------------------------------- 
........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 10 
Message count      	 100,000,000 
Msg cnt per thrd   	 10,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 100,100,000 
Read time total	 1,041 
Write time     	 983 
NUM WRITER THREADS 1 
--------------------------------------------------------- 
..........182,866,530
..........356,086,570
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 1 
Message count      	 500,000,000 
Msg cnt per thrd   	 500,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,010,000 
Read time total	 2,954 
Write time     	 1,022 
NUM WRITER THREADS 2 
--------------------------------------------------------- 
..........233,081,330
.........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 2 
Message count      	 500,000,000 
Msg cnt per thrd   	 250,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,020,000 
Read time total	 2,057 
Write time     	 2,015 
NUM WRITER THREADS 3 
--------------------------------------------------------- 
..........132,838,200
..........267,134,950
..........401,589,720
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 3 
Message count      	 500,000,000 
Msg cnt per thrd   	 166,666,667 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,028,000 
Read time total	 3,936 
Write time     	 3,022 
NUM WRITER THREADS 4 
--------------------------------------------------------- 
..........130,954,760
..........264,850,920
..........399,794,450
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 4 
Message count      	 500,000,000 
Msg cnt per thrd   	 125,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,040,000 
Read time total	 3,885 
Write time     	 3,801 
NUM WRITER THREADS 5 
--------------------------------------------------------- 
..........133,938,020
..........270,768,490
..........406,734,990
......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 5 
Message count      	 500,000,000 
Msg cnt per thrd   	 100,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,050,000 
Read time total	 3,870 
Write time     	 3,798 
NUM WRITER THREADS 6 
--------------------------------------------------------- 
..........132,912,550
..........268,018,930
..........402,073,920
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 6 
Message count      	 500,000,000 
Msg cnt per thrd   	 83,333,334 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,058,000 
Read time total	 3,872 
Write time     	 3,814 
NUM WRITER THREADS 7 
--------------------------------------------------------- 
..........131,462,020
..........265,798,160
..........399,964,000
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 7 
Message count      	 500,000,000 
Msg cnt per thrd   	 71,428,572 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,066,000 
Read time total	 3,911 
Write time     	 3,843 
NUM WRITER THREADS 8 
--------------------------------------------------------- 
..........133,118,780
..........267,053,030
..........400,814,320
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 8 
Message count      	 500,000,000 
Msg cnt per thrd   	 62,500,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,080,000 
Read time total	 3,885 
Write time     	 3,819 
NUM WRITER THREADS 9 
--------------------------------------------------------- 
..........134,822,520
..........270,485,380
..........403,792,710
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 9 
Message count      	 500,000,000 
Msg cnt per thrd   	 55,555,556 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,085,000 
Read time total	 3,878 
Write time     	 3,807 
NUM WRITER THREADS 10 
--------------------------------------------------------- 
..........133,540,490
..........267,256,550
..........404,794,160
......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 10 
Message count      	 500,000,000 
Msg cnt per thrd   	 50,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,100,000 
Read time total	 3,845 
Write time     	 3,778 
NUM WRITER THREADS 11 
--------------------------------------------------------- 
..........131,918,610
..........268,519,590
..........403,707,110
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 11 
Message count      	 500,000,000 
Msg cnt per thrd   	 45,454,546 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,104,000 
Read time total	 3,857 
Write time     	 3,790 
NUM WRITER THREADS 12 
--------------------------------------------------------- 
..........134,007,270
..........267,971,530
..........401,474,590
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 12 
Message count      	 500,000,000 
Msg cnt per thrd   	 41,666,667 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,112,000 
Read time total	 3,867 
Write time     	 3,794 
NUM WRITER THREADS 13 
--------------------------------------------------------- 
..........133,613,020
..........266,512,930
..........400,441,870
........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 13 
Message count      	 500,000,000 
Msg cnt per thrd   	 38,461,539 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,123,000 
Read time total	 3,960 
Write time     	 3,895 
NUM WRITER THREADS 14 
--------------------------------------------------------- 
..........131,527,420
..........250,755,900
..........382,014,070
........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 14 
Message count      	 500,000,000 
Msg cnt per thrd   	 35,714,286 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,136,000 
Read time total	 4,017 
Write time     	 3,951 
NUM WRITER THREADS 15 
--------------------------------------------------------- 
..........129,543,750
..........262,070,730
..........392,889,410
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 15 
Message count      	 500,000,000 
Msg cnt per thrd   	 33,333,334 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,145,000 
Read time total	 3,916 
Write time     	 3,846 
NUM WRITER THREADS 16 
--------------------------------------------------------- 
..........132,140,000
..........266,103,410
..........398,092,510
........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 16 
Message count      	 500,000,000 
Msg cnt per thrd   	 31,250,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,160,000 
Read time total	 3,969 
Write time     	 3,904 
NUM WRITER THREADS 17 
--------------------------------------------------------- 
..........132,230,880
..........263,316,360
..........397,953,520
........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 17 
Message count      	 500,000,000 
Msg cnt per thrd   	 29,411,765 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,157,000 
Read time total	 3,976 
Write time     	 3,907 
NUM WRITER THREADS 18 
--------------------------------------------------------- 
..........132,937,000
..........267,508,690
..........402,516,870
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 18 
Message count      	 500,000,000 
Msg cnt per thrd   	 27,777,778 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,166,000 
Read time total	 3,949 
Write time     	 3,861 
NUM WRITER THREADS 19 
--------------------------------------------------------- 
..........130,062,110
..........265,614,200
..........386,620,890
........
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 19 
Message count      	 500,000,000 
Msg cnt per thrd   	 26,315,790 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,175,000 
Read time total	 4,009 
Write time     	 3,944 
NUM WRITER THREADS 20 
--------------------------------------------------------- 
..........134,731,950
..........269,057,010
..........392,053,940
.......
--------------------------------------------------------- 

Threads readers    	 1 
        writers    	 20 
Message count      	 500,000,000 
Msg cnt per thrd   	 25,000,001 
Batch size         	 1,000 
Num batches        	 10,000 
--------------------------------------------------- 

Count          	 500,200,000 
Read time total	 3,957 
Write time     	 3,895 
[2954, 2057, 3936, 3885, 3870, 3872, 3911, 3885, 3878, 3845, 3857, 3867, 3960, 4017, 3916, 3969, 3976, 3949, 4009, 3957] 

min    	 2057 
max    	 4017 
mean   	 3779 
median 	 3898 
stddev 	 450 

LinkedTransfer Queue

https://docs.google.com/spreadsheets/d/17otvI_ztUkMybv-dzKQldl1S16he0ted7KZBgINW7yI/pubchart?oid=1907390554&format=image

For the linked transfer queue. I client up the code a bit.

##Cleaned up code using linked transfer queue.

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        int total;

        volatile int totalOut;

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery ) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {


        final int batchSize = 100;
        final int totalSends = 400_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 100;
        final int sleepEvery = 10_000_000;

        final QueueBuilder warmUpBuilder = queueBuilder().setBatchSize(batchSize).setLinkTransferQueue();
        perfTest(warmUpBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();
        final QueueBuilder qb = queueBuilder().setBatchSize(batchSize).setLinkTransferQueue();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(qb, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(5000);
        }

//        final LongList times = new LongList();
//
//        for (int writers = 0; writers < 20; writers++) {
//            final QueueBuilder qb = queueBuilder().setBatchSize(100).setLinkTransferQueue();
//            puts("NUM WRITER THREADS", writers+1);
//            perfTest(queueBuilder, 1, writers+1, 500_000_000, 5_000, times, 10_000);
//            System.gc();
//            Sys.sleep(1000);
//
//        }


        for (Long value : timeMeasurements) {
            puts(value);
        }

        puts(timeMeasurements);
        puts(
                "\nmin    \t", timeMeasurements.min(),
                "\nmax    \t", timeMeasurements.max(),
                "\nmean   \t", timeMeasurements.mean(),
                "\nmedian \t", timeMeasurements.median(),
                "\nstddev \t", timeMeasurements.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

##Test after adding CPU intensive gak

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        private final boolean cpuIntensive;
        private final int times;
        int total;

        volatile int totalOut;

        public AtomicLong answer = new AtomicLong();

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue, boolean cpuIntensive, int times) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
            this.cpuIntensive = cpuIntensive;
            this.times = times;
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    if (cpuIntensive) {
                        doSomething(value);
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }

        private void doSomething(Integer value) {

            long lv = 0;
            for (int index = 0; index< times; index++) {
                lv = value * index % 13 + index;
            }
            this.answer.set(lv);
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery,
            boolean cpuIntensive, int times) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue, cpuIntensive, times);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
            puts(testReader.answer.get());
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {


        final int batchSize = 10_000;
        final int totalSends = 400_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = batchSize/10;
        final boolean cpuIntensive = true;
        final int times = 1_000_000;


        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setLinkTransferQueue()
                .setCheckEvery(checkEvery);


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setSize(10_000_000).setArrayBlockingQueue();

        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(10_000);
        }



        for (Long value : timeMeasurements) {
            puts(value);
        }

        puts(timeMeasurements);
        puts(
                "\nmin    \t", timeMeasurements.min(),
                "\nmax    \t", timeMeasurements.max(),
                "\nmean   \t", timeMeasurements.mean(),
                "\nmedian \t", timeMeasurements.median(),
                "\nstddev \t", timeMeasurements.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

Making it EXTRA Cpu intensive and spikey

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        private final boolean cpuIntensive;
        private final int times;
        int total;

        volatile int totalOut;

        public AtomicLong answer = new AtomicLong();

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue, boolean cpuIntensive, int times) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
            this.cpuIntensive = cpuIntensive;
            this.times = times;
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    if (cpuIntensive && total % 13 == 0) {
                        doSomething(value);
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }

        private void doSomething(Integer value) {

            long lv = 0;
            for (int j = 0; j < 10; j++) {
                for (int index = 0; index < times; index++) {
                    lv = value * index % 13 + index;
                    lv = lv * 47;
                    lv = lv * 1000;
                    lv = lv * 13 + lv % 31;
                }
                this.answer.set(this.answer.get() + lv);
            }
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery,
            boolean cpuIntensive, int times) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue, cpuIntensive, times);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
            puts(testReader.answer.get());
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {


        final int batchSize = 10_000;
        final int totalSends = 100_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = 1000;
        final boolean cpuIntensive = true;
        final int times = 2_000_000_000;


    //        final QueueBuilder queueBuilder = queueBuilder()
    //                .setBatchSize(batchSize)
    //                .setLinkTransferQueue()
    //                .setCheckEvery(checkEvery);//.setTryTransfer(true);


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();


        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setSize(10_000_000).setArrayBlockingQueue();

        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(3_000);
        }



        for (Long value : timeMeasurements) {
            puts(value);
        }

        puts(timeMeasurements);
        puts(
                "\nmin    \t", timeMeasurements.min(),
                "\nmax    \t", timeMeasurements.max(),
                "\nmean   \t", timeMeasurements.mean(),
                "\nmedian \t", timeMeasurements.median(),
                "\nstddev \t", timeMeasurements.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally