Websocket JSR 356 API Chat Room using latest spec, added performance testing and a Java client

From Resin 4.0 Wiki

(Difference between revisions)
Jump to: navigation, search
 
Line 1: Line 1:
More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our websocket implementation. I have tested it with one master and five slaves and it worked well. This plus EC2 should allow us to see how far we can make it scale.
+
More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our websocket implementation. I have tested it with one master and five slaves and it worked well. This plus EC2 should allow us to see how far we can make it scale. I also simplified the ChatRoom a lot, and fixed a very hard to find timing issue which resulted in not registering all clients when under load (the bug was in the ChatRoom not the Websocket implementation).
  
  
Line 1,008: Line 1,008:
  
 
==Java Client that remotely controls N Number of performance tester so we can run 100 or so clients with n number of connections in Amazon EC2==
 
==Java Client that remotely controls N Number of performance tester so we can run 100 or so clients with n number of connections in Amazon EC2==
==Client side of new tester==
+
===Client side of new tester===
  
 
The Control allows you to control remote instances. It is the websocket handler for the controller.
 
The Control allows you to control remote instances. It is the websocket handler for the controller.

Latest revision as of 00:00, 25 November 2012

More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our websocket implementation. I have tested it with one master and five slaves and it worked well. This plus EC2 should allow us to see how far we can make it scale. I also simplified the ChatRoom a lot, and fixed a very hard to find timing issue which resulted in not registering all clients when under load (the bug was in the ChatRoom not the Websocket implementation).


Contents


[edit] Server Side

ChatClientPeerHandler

package com.example.server;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketError;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;

import com.example.concurrent.ThreadedQueue;

@WebSocketEndpoint("/chat")
public class ChatClientPeerHandler implements Closeable {
    
    public ChatClientPeerHandler() {
    }

    @Inject @ApplicationScoped 
    ChatRoom chatRoom;
    
    @Inject Executor executor;


    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue;
    
    ChatRoom room;

    /** Flag to indicate whether we should close or not. */
    volatile boolean close = true;

    
    @WebSocketOpen
    public void onOpen(Session session) {
        //System.out.println("@WebSocketOpen " + session);
        this.session = session;
        init();
    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        //System.out.println("@WebSocketClose " + session);
        this.close();
    }


    @WebSocketMessage
    public void onMessage(String message) {
        
        String[] strings = message.split("::::");
        String clientId = strings[1];
        
        if (this.clientId!=null) {
            if (!this.clientId.equals(clientId)) {
                //This should never happen, but it proves our threads/client handling is correct.
                throw new IllegalStateException("client id " + this.clientId + " does not match " + clientId);
            }
        }
 

        if (message.startsWith("remove client::::")) {
            chatRoom.sendMessage(clientId, clientId
                    + " has become bored and left");
            try {
                this.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        } else if (message.startsWith("send message::::")) {
            if (clientId == null) {
                return;
            }
            String m = strings[2];
            chatRoom.sendMessage(clientId, m);
        } else if (message.startsWith("add client::::")) {
            this.clientId = clientId;
            chatRoom.sendMessage(clientId, clientId
                    + " has join the chat room");
        } else {
            System.err.println("ACK... Don't understand your message!!!!! " + message);
        }
    }
    
    
    private void init() {
        close = false;
        if (session == null) {
            throw new IllegalStateException("session is null");
        }
        writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), 
                new Runnable() {                
                    @Override
                    public void run() {
                        ChatClientPeerHandler.this.run();                   
                    }
                },  executor);
        chatRoom.addClient(this);

    }


    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        if (close) {
            return;
        }
        
        //System.out.println("" + this + "sendMessage(" + sendMessage);
        try {
            /* We could do some error handling here
             * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client.
             * We could assume if you are backed up 2000 messages, that you are not active for example
             * This would be a biz logic decision and since this is a sample app... 
             */
            this.writeQueue.put(sendMessage);
        } catch (InterruptedException e) {
            //this should never happen
        }
    }

    private void run() {
        String message;

        /* Get messages, send a message to client if not null, if message is null break. */
        while ((message = writeQueue.poll()) != null) {
            try {
                if (close) {
                    break;
                }
                session.getRemote().sendString(message);
            }  catch (IOException e) {
                e.printStackTrace();
                try {
                    this.close();
                } catch (IOException e1) {
                    //ignore so it does not mask original exception
                    //once you install loggers, make this an info and the other an error.
                }
                break;
            }
        }
    }

    public String getName() {
        return clientId;
    }



    @Override
    public void close() throws IOException {
        close = true;
        if (session != null) {
            session.close();
            session=null;
        }
        room.removeClient(this);
    }

    @Override
    public String toString() {
        return "ChatClientPeerHandler [clientId=" + clientId + "]";
    }

    @WebSocketError
    public void onError(Throwable thrown) {
        thrown.printStackTrace();
    }

}

ChatRoom

package com.example.server;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import com.example.concurrent.ThreadedQueue;

@ApplicationScoped
@Startup
public class ChatRoom {

    Set<ChatClientPeerHandler> chatClients = new HashSet<ChatClientPeerHandler>();
    BlockingQueue<Runnable> readQueue ;
    @Inject Executor executor;

    
    public void addClient(final ChatClientPeerHandler session) {
        offerCommand( new Runnable() {
            @Override
            public void run() {
                chatClients.add(session);        
            }
        });
     }

    public void sendMessage(final String clientId, final String message) {
        offerCommand( new Runnable() {
            @Override
            public void run() {
                doSendMessage(clientId, message);
            }
        });
    }

    
    public void removeClient(final ChatClientPeerHandler client) {
        offerCommand( new Runnable() {
            @Override
            public void run() {
                    chatClients.remove(client);
            }
        });
    }
    
    
    @PostConstruct
    void init() throws Exception {
        readQueue = new ThreadedQueue<Runnable>(new ArrayBlockingQueue<Runnable>(2000), new Runnable() {
            public void run() {
                commandLoop();
            }},  executor);        
    }


    private void commandLoop() {
        Runnable runnable;
        while ((runnable = readQueue.poll()) != null) {
            runnable.run();
        }
    }


    private void doSendMessage(String client, String message) {

        String sendMessage = String.format("%s : %s", client, message);
        System.out.printf("SendMessage::Sending message %s to %s number of clients \n", sendMessage, chatClients.size());
        for (ChatClientPeerHandler chatClientHandler : chatClients) {
            
            //prevents sending messages to yourself
            if (client.equals(chatClientHandler.getName())){
                continue;
            }

            chatClientHandler.sendMessage(sendMessage);
        }
    }

    
    private void offerCommand(Runnable runnable) {
        try {
            readQueue.offer(runnable, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }

}


Fixed timing issue in ThreadedQueue.

ThreadedQueue

package com.example.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A Queue that wakes up a runnable every time something is added to it.
 * This is a poor man's Actor. The thread is tied to the queue.
 * If you put an item in and there is not a thread, one will be launched.
 */
public class ThreadedQueue<T> implements BlockingQueue<T> {
    BlockingQueue<T> queue;
    Executor executor;
    Runnable queueProcessor;
    AtomicBoolean active = new AtomicBoolean();

    public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor,
            Executor executor) {
        this.queue = queue;
        this.queueProcessor = queueProcessor;
        this.executor = executor;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }

    @Override
    public boolean offer(T item) {
        boolean result = queue.offer(item);

        launchRunnable();

        return result;
    }

    @Override
    public boolean offer(T item, long timeout, TimeUnit unit)
            throws InterruptedException {
        boolean result = queue.offer(item);

        launchRunnable();

        if (result) {
            return result;
        } else {
            result = queue.offer(item, timeout, unit);
            launchRunnable();
        }
        
        return result;
    }

    @Override
    public boolean add(T item) {
        boolean result = queue.add(item);

        launchRunnable();

        return result;
    }

    @Override
    public void put(T item) throws InterruptedException {
        
        /* Offer it if you can, this is non-blocking. */
        if (queue.offer(item)) {
            launchRunnable();
            return;
        }



        /* If you were unable to offer it, then go ahead and block. */
        queue.put(item);

        launchRunnable();
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {
        boolean result = queue.addAll(c);

        launchRunnable();

        return result;
    }

    @Override
    public T peek() {
        return queue.peek();
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    @Override
    public T element() {
        return queue.element();
    }

    @Override
    public T take() throws InterruptedException {
        return queue.take();
    }

    @Override
    public T remove() {
        return queue.remove();
    }

    @Override
    public Iterator<T> iterator() {
        return queue.iterator();
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <X> X[] toArray(X[] array) {
        return queue.toArray(array);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return queue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return queue.retainAll(c);
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public boolean remove(Object o) {
        return queue.remove(o);
    }

    @Override
    public int drainTo(Collection<? super T> c) {
        return queue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super T> c, int maxElements) {
        return queue.drainTo(c, maxElements);
    }

    public void launchRunnable() {
        if (queue.isEmpty()) {
            return;
        }

        if (active.compareAndSet(false, true)) {
            executor.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                queueProcessor.run();
                            } finally {
                              active.set(false);
                              launchRunnable();//just in case someone was trying to get in this block before we set the flag.
                            }
                        }
                    });
        }
    }

 }


[edit] Simple Client Side Java

Java Websocket client

package com.example.client;


import java.io.IOException;

import javax.websocket.ClientContainer;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;


@WebSocketEndpoint("")
public class TestClient {
    /** Session to send messages to the server. */
    Session session;

    @WebSocketOpen
    public void onOpen(Session session) {
        this.session = session;
        try {
            session.getRemote().sendString("add client::::" + this.hashCode());
            session.getRemote().sendString("send message::::" + this.hashCode() + "::::hello from HELLO HELLO HELLO " + this.hashCode());
        } catch (IOException e) {
            e.printStackTrace();
        }
        

    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        if (session!=null) {
            session.close();
        }
        session=null;
    }

    @WebSocketMessage
    public void onMessage(String message) {
        System.out.println(this.hashCode() + " got message " + message);
    }

    
    public static void main (String [] args) throws InterruptedException {
        ClientContainer container = ContainerProvider.getClientContainer();
        for (int index = 0; index < 500; index++) {
            container.connectToServer(new TestClient(),
                "ws://localhost:8080/chat/chat");
        }
        Thread.sleep(100000);
    }

}


[edit] Client Side JavaScript

JavaScript Websocket client

<html>

<head>

<title>Chat Client</title>

<link rel="stylesheet" type="text/css" href="css/style.css"
    media="screen" />


<script type="text/javascript" src="scripts/jquery-1.7.1.js"></script>


<script type="text/javascript">
    var ENTER_KEY = '13';
    var TOKEN_DELIM = "::::";
    
    

    
    function buildWebSocketURL() {
        var url = document.URL;
        var parts = url.split('/');
        var scheme = parts[0];
        var hostPort = parts[2];
        var wssScheme = null;
        
        
        if (scheme=="http:") {
            wssScheme="ws:";
        } else if (scheme=="https:") {
            wssScheme="wss:";
        }
        
        wssUrl = wssScheme + "//" + hostPort  + "/chat/chat";
        
        return wssUrl;
        
    }
    

    var chatUserSession = {
                version : "1.0",
                webSocketProtocol : "caucho-example-chat-protocol",
                webSocketURL : buildWebSocketURL(),
                webSocket : null,//WebSocket
                userName : null
    };

    function chat_sendMessage(message) {
        $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>");

        chatUserSession.webSocket.send("send message" + TOKEN_DELIM
                + chatUserSession.userName + TOKEN_DELIM + message);
    }

    function chat_joinChat() {
        chatUserSession.webSocket.send("add client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_leaveChat() {
        chatUserSession.status(chatUserSession.userName + " is leaving chat");
        chatUserSession.webSocket.send("remove client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_openWebSocket() {
        chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL,
                chatUserSession.webSocketProtocol);
        var socket = chatUserSession.webSocket;

        socket.onmessage = function(msg) {
            chatUserSession.onMessage(msg);
        }

        socket.onerror = function(errorEvent) {
            chatUserSession.onError(errorEvent);
        }

        socket.onopen = function() {
            chatUserSession.onOpen();
        }
        socket.onclose = function(closeEvent) {
            chatUserSession.onClose(closeEvent);
        }

    }

    function chat_onMessage(msgEvent) {
        chatUserSession.status("New Message :" + msgEvent.data);

        $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>");
    }

    function chat_Login() {

        chatUserSession.userName = $("#userName").val();
        $("#loginDiv").hide(500);
        $('#header').text(
                "Chat Client (logging in...) : " + chatUserSession.userName);

        chatUserSession.status(chatUserSession.userName + " is logging in...");
        chatUserSession.open();

    }

    function chat_onOpen() {
        chatUserSession.joinChat();
        chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName);
          $('#header').text(
                    "Chat Client (logged in...) : " + chatUserSession.userName);

        $("#inputArea").show(500);
        $("#statusBar").show(500);
        $("#chatInput").focus();
    }
    
    function chat_Status(message) {
        $('#statusBarPara1').text(message);
        $("#statusBar").show(500);
        
    }

    function chat_onClose(closeEvent) {
        $("#loginDiv").show(500);
        $('#header').text(
                "Chat Client (not connected) : " + chatUserSession.userName);
        $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + 
                ":: Reason: " + closeEvent.reason + 
                " Code: " + closeEvent.code);

        $("#inputArea").hide(500);
        $("#statusBar").show(500);

        $("#userName").val(chatUserSession.userName);
        $("#userName").focus();
    }

    function chat_onError(msg) {
        $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg));
        $("#statusBar").show(500);
    }

    chatUserSession.open = chat_openWebSocket;
    chatUserSession.onMessage = chat_onMessage;
    chatUserSession.onOpen = chat_onOpen;
    chatUserSession.login = chat_Login;
    chatUserSession.onClose = chat_onClose;
    chatUserSession.onError = chat_onError;
    chatUserSession.joinChat = chat_joinChat;
    chatUserSession.sendMessage = chat_sendMessage;
    chatUserSession.leaveChat = chat_leaveChat;
    chatUserSession.status = chat_Status;

    $(document).ready(function() {

        $("#inputArea").hide();
        $("#userName").focus();
        
        

        $("#statusBar").click(function() {
            $("#statusBar").hide(300);
        });

        $("#chatInput").keypress(function(event) {

            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                var textMessage = $("#chatInput").val();

                if (textMessage=="bye!") {
                    chatUserSession.leaveChat();
                } else {
                    $("#chatInput").val("");
                    $("#hint").hide(500);
                    chatUserSession.sendMessage(textMessage);
                }
            }
            event.stopPropagation();
        });

        $("#login").click(function(event) {
            chatUserSession.login();
            event.stopPropagation();
        });

        $("#userName").keypress(function(event) {
            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                chatUserSession.login()
                event.stopPropagation();
            }
        });
    });
</script>

</head>

<body>

    <h1 id="header">Chat Client 8</h1>


    <div id="statusBar">
        <p id="statusBarPara1">Welcome to Chat App, Click to hide</p>
    </div>

    <div id="loginDiv">
        User name   <input id="userName" type="text" /> <input
            id="login" type="submit" value="Login" />
    </div>


    <div id="inputArea">
        <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p>
        <input id="chatInput" type="text" value="" />
    </div>

    <div id="chatHistoryDiv">
        <p id="chatHistory"></p>
    </div>


</body>

</html>


[edit] Simple Java Client that measures time it takes to send a message to N clients

Command line arguments: address ws://localhost:8080/chat/chat numClients 900 message hello
package com.example.client;


import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import javax.websocket.ClientContainer;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;


@WebSocketEndpoint("")
public class TestClient {
    
    static int numClients;
    volatile static int actualCount;
    volatile static int messageCount;
    volatile static long timeTestStarted;
    volatile static long timeTestEnded;
    
    static String address;
    static String message;
    static String[] COMMANDS = {"numClients", "nc", "address", "a", "message", "m", "help", "--help", "h", "-h"};
    static Set<String> commands = new HashSet<String>(COMMANDS.length);
    static
    {
        for (String command : COMMANDS) {
            commands.add(command);
        }
    }
    
    /** Session to send messages to the server. */
    Session session;

    @WebSocketOpen
    public void onOpen(Session session) {
        this.session = session;
        actualCount++;
        try {
            session.getRemote().sendString("add client::::" + this.hashCode());
        } catch (IOException e) {
            e.printStackTrace();
        }
        

    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        if (session!=null) {
            session.close();
        }
        session=null;
    }

    @WebSocketMessage
    public void onMessage(String message) {
        synchronized (TestClient.class) {            
            if (message.contains("has join the chat room")){
                return;
            }
            messageCount++;
            if (messageCount==numClients) {
                timeTestEnded = System.currentTimeMillis();
            }
        }

    }

    
    public static void main (String [] args) throws InterruptedException, IOException {
        boolean wasCommand=false;
        String command = null;
        
        actualCount = 0;
        messageCount = 0;
        timeTestStarted = 0;
        timeTestEnded = 0;

        
        if (args.length==0) {
            printHelp();            
        }
        
        for (int index=0; index < args.length; index++) {
            String value = args[index];
            if (commands.contains(value)) {
                command = value;
                wasCommand = true;
                continue;
            }
            if (wasCommand) {
                wasCommand=false;
                if ("numClients".equals(command) || "nc".equals(command)) {
                    numClients = Integer.parseInt(value);
                } else if ("address".equals(command) || "a".equals(command)) {
                    address=value;
                } else if ("message".equals(command) || "m".equals(command)) {
                    message=value;
                } else if ("help".equals(command) ||"--help".equals(command) || "h".equals(command) || "-h".equals(command)) {
                    printHelp();
                }
            }
        }
        
        ClientContainer container = ContainerProvider.getClientContainer();
        for (int index = 0; index < numClients; index++) {
            container.connectToServer(new TestClient(), address);
        }
        Thread.sleep(10000);


        //wait for all of the clients to connect
        while (actualCount < numClients) {
            Thread.sleep(10);
            System.out.print(".");
        }
        System.out.println("\nAll clients LAUNCHED!");
               
        TestClient client = new TestClient();
        container.connectToServer(client, address);
        Thread.sleep(1000);

        
        if (message!=null) { 
            messageCount = 0;
            Thread.sleep(1000);

            
            timeTestStarted = System.currentTimeMillis();
            client.session.getRemote().sendString("send message::::" + client.hashCode() + ":::: " + message);
            System.out.println("Message just sent ");
            Thread.sleep(1000);
            
            int timeout=10;
            int retry=0;
            while(messageCount < numClients) {
                retry++;
                Thread.sleep(1000);
                
                System.out.printf("messageCount = %s, numClients=%s \n", messageCount, numClients);
                if (retry>timeout) {
                    break;
                }
            }
            
            System.out.println("Messages recieved " + messageCount);
            System.out.println("Time it took in miliseconds " + (timeTestEnded - timeTestStarted));
        }
        
        
    }

    private static void printHelp() {
        System.out.println("available commands");
        for (String command : COMMANDS) {
            System.out.println(command);
        }
        
        System.out.println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello");
        System.out.println("The above would connect five clients and send the message hello once to the chat server");

    }

}


[edit] Java Client that remotely controls N Number of performance tester so we can run 100 or so clients with n number of connections in Amazon EC2

[edit] Client side of new tester

The Control allows you to control remote instances. It is the websocket handler for the controller.

Control'

package com.example.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketError;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;

@WebSocketEndpoint("")
public class Control {



    /** Session to send messages to the server. */
    Session session;
    
    volatile boolean gotAcks;

    private int ackCount;
    
    List<Integer> testTimes = Collections.synchronizedList(new ArrayList<Integer>(100));


    @WebSocketOpen
    public void onOpen(Session session) {
        this.session = session;

    }

    @WebSocketClose
    public void onClose() throws IOException {
        
        if (session != null) {
            session.close();
        }
        session = null;
    }
    
    @WebSocketError
    public void onError(Throwable thrown) {
        thrown.printStackTrace();
    }

    @WebSocketMessage
    public void onMessage(String message) {
        
        if (message.startsWith("GOT ALL ACKS::::")) {
            String[] split = message.split("::::");
            ackCount = Integer.parseInt(split[1]);
            gotAcks = true;
        } else if (message.startsWith("total time::::")) {
            String[] split = message.split("::::");
            int testTime = Integer.parseInt(split[1]);
            testTimes.add(testTime);
            if (ackCount==testTimes.size()) {
                gotAcks = true;
            }
        } else {
            System.err.println(message);
        }

    }
    
    boolean acknowledged () {
        if (gotAcks == true) {
            gotAcks = false;
            return true;
        } else {
            return false;
        }
    }
    
    List<Integer> testTimes () {
        return new ArrayList<Integer>(this.testTimes);
    }


}


The Slave lets you receive remote control messages. It is the websocket handler for the slave. You can also ack commands through the slave.

Slave

package com.example.client;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketError;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;

@WebSocketEndpoint("")
public class Slave {

    BlockingQueue<String> commandQueue = new ArrayBlockingQueue<>(100);


    /** Session to send messages to the server. */
    Session session;


    @WebSocketOpen
    public void onOpen(Session session) {
        System.out.println();
        this.session = session;
        

    }

    @WebSocketMessage
    public void onCommand(String command) throws IOException {
        System.out.println("Add command to queue " + command);
        commandQueue.add(command);
    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        
        if (session != null) {
            session.close();
        }
        session = null;
    }
    
    public String nextCommand() {
        try {
            return commandQueue.take();
        } catch (InterruptedException e) {
            /* We don't expect to be interrupted ever, but if we do, we have to cleanup. */
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
            return nextCommand();
        }
    }
    
    @WebSocketError
    public void onError(Throwable thrown) {
        thrown.printStackTrace();
    }


}

The test client listens for chat messages and works with the Tester to count and measure timings.

TestClient

package com.example.client;

import java.io.IOException;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;

@WebSocketEndpoint("")
public class TestClient {

    private Tester tester;

    /** Session to send messages to the server. */
    Session session;

    boolean sendOnly;

    public TestClient(Tester tester2) {
        this.tester = tester2;
    }

    @WebSocketOpen
    public void onOpen(Session session) {
        
        this.session = session;

        try {
            session.getRemote().sendString("add client::::" + this.hashCode());
        } catch (IOException e) {
            e.printStackTrace();
        }

        if (sendOnly) {
            return;
        }
        tester.incrementConnectionCount();

    }

    @WebSocketClose
    public void onClose() throws IOException {
        System.out.println("ON CLOSED CALLED FOR TEST CLIENT");
        if (session != null) {
            session.close();
        }
        session = null;
        

        if (sendOnly) {
            return;
        }
        tester.deccrementConnectionCount();

    }

    @WebSocketMessage
    public void onMessage(String message) {
        if (message.contains("has join the chat room")) {
            return;
        }

        tester.collectMessage(message);

    }

}


The tester calculates how long things took. It runs in standalone mode, slave mode or controller mode. It is written as a test script not a work of art. Any art was unintentional.

Tester

package com.example.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import javax.websocket.ClientContainer;
import javax.websocket.ContainerProvider;


public class Tester {

    // config
    private volatile int numClients;
    private String address;
    private String message;
    // private BlockingQueue<Runnable> queue;

    private List<String> commands = new ArrayList<String>();

    // measure
    private AtomicInteger actualClientCount = new AtomicInteger(0);
    private AtomicInteger messageCount = new AtomicInteger(0);
    private long timeTestStarted;
    private long timeTestEnded;

    private boolean control;
    private boolean slave;

    private boolean standalone;

    //private Executor executor;
    private TestClient client;
    private Control controlClient;
    Slave backchannel;

    public Tester() {
//        executor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.MILLISECONDS,
//                new ArrayBlockingQueue<Runnable>(100));

        // queue = new ThreadedQueue<Runnable>(new ArrayBlockingQueue<Runnable>(
        // 2000), new Runnable() {
        // public void run() {
        // commandLoop();
        // }
        // }, executor);
    }

    // private void commandLoop() {
    // Runnable runnable;
    // while ((runnable = queue.poll()) != null) {
    // runnable.run();
    //
    // }
    //
    // }

    public void init() throws InterruptedException, IOException {
        if (control || standalone) {
                createSender();
        }

        if (control) {
                createMaster().session.getRemote().sendString("control");
        }
    }

    public void numClients(int i) {
        this.numClients = i;
    }

    public void test() throws InterruptedException, IOException {
        connectAll();

        sendMessage();
    }

    private TestClient createSender()  {
        if (client != null) {
            return client;
        }
        if (message != null) {
            ClientContainer container = ContainerProvider.getClientContainer();
            client = new TestClient(null);
            client.sendOnly = true;
            container.connectToServer(client, address + "/chat");
        }
        return client;
    }

    private Control createMaster()  {
        if (controlClient != null && controlClient.session != null) {
            return controlClient;
        }
        ClientContainer container = ContainerProvider.getClientContainer();
        controlClient = new Control();
        container.connectToServer(controlClient, address + "/load");
        return controlClient;
    }

    private void sendRemoteCommand(String commands) {
        System.out.println("sendRemoteCommand( " + commands);
        Control master;
        try {
            master = createMaster();
            master.session.getRemote().sendString(commands);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void connectAll() {
        ClientContainer container = ContainerProvider.getClientContainer();
        for (int index = actualClientCount.get(); index < numClients; index++) {
            container.connectToServer(new TestClient(Tester.this), address
                    + "/chat");
        }

        // wait for all of the clients to connect
        while (actualClientCount.get() < numClients) {
            System.out.print(".");
        }
        System.out.println("\nAll clients LAUNCHED!" + numClients + " "
                + actualClientCount);
        messageCount.set(0);
        timeTestStarted = System.currentTimeMillis();
        if (slave) {
            sendAck();
        }

    }

    private void sendAck() {
        try {
            backchannel.session.getRemote().sendString("&&&ACK&&&");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void sendTestTime(long l, int count) {
        try {
            backchannel.session.getRemote().sendString("total time::::" + l + "::::" + count);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void incrementConnectionCount() {
        actualClientCount.incrementAndGet();
    }

    public synchronized void collectMessage(final String testMessage) {

        messageCount.incrementAndGet();
        if (testMessage.contains(message)) {
            int count = messageCount.get();
            if (count % 3 == 0) {
                System.out.print(" " + count);
            }
            if (count == actualClientCount.get()) {
                timeTestEnded = System.currentTimeMillis();
                System.out.println("\n\nTotal time:"
                        + (timeTestEnded - timeTestStarted));
                if (slave) {
                    sendTestTime(timeTestEnded - timeTestStarted, count);
                }
            }else {
                long currentTime = System.currentTimeMillis();
                if (currentTime-timeTestStarted > 1000) {
                    if (slave) {
                        sendTestTime(timeTestEnded - timeTestStarted, count);
                    }                    
                }        
            }
        }
    }

    public void addCommand(String command) {
        this.commands.add(command);
    }

    public void sendMessage() {

        TestClient client;
        try {
            client = createSender();

            client.session.getRemote().sendString(
                    "send message::::" + client.hashCode() + ":::: " + message);
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.println("Message just sent ");

    }

 
 
    public void sendRemoteConnectAll() {

        System.out.println("sendRemoteConnectAll");
        sendRemoteCommand("command;connectAll");

    }

    public void sendRemotePause() {

        System.out.println("sendRemotePause");
        sendRemoteCommand("command;pause");

    }

    public void slave(boolean b) {
        this.slave = b;
    }

    public void address(String value) {
        address = value;
    }

    public void clearCommands() {
        this.commands.clear();
    }

    public List<String> commands() {
        return new ArrayList<String>(this.commands);
    }

    public void control(boolean b) {
        this.control = b;
    }

    public void standalone(boolean b) {
        this.standalone = b;
    }

    public void message(String value) {
        message = value;
    }


    public String address() {
        return address;
    }

    public void deccrementConnectionCount() {
        actualClientCount.decrementAndGet();
    }

    public void setSlave(Slave s) {
        backchannel = s;
    }

    public void waitForAck() {
        while (!this.controlClient.acknowledged()) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public List<Integer> testTimes() {
        return controlClient.testTimes();
    }
}

Parses command line arguments and invokes the Tester.

TestMain

package com.example.client;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.websocket.ClientContainer;
import javax.websocket.ContainerProvider;

public class TestMain {

    static String[] COMMANDS = { "numClients", "nc", "address", "a", "message",
            "m", "help", "--help", "h", "-h", "sleep", "s", "command" };
    static Set<String> allowedCommandLineArgs = new HashSet<String>(COMMANDS.length);
    
    static {
        for (String command : COMMANDS) {
            allowedCommandLineArgs.add(command);
        }
    }

    public static void main(String[] args) throws InterruptedException,
            IOException {
        
        if (args[0].equals("slave")) {
            System.out.println("Running slave");
            Tester tester = extractCommands(args, null);
            ClientContainer container = ContainerProvider.getClientContainer();
            Slave slave = new Slave();
            container.connectToServer(slave, tester.address() + "/load");
            tester.slave(true);
            
            slave.session.getRemote().sendString("slave");
            tester.setSlave(slave);
            tester.connectAll();
            while (true) {
                System.out.println("Waiting for next command");
                String controlCommand = slave.nextCommand();
                System.out.println("COMMAND: " + controlCommand);
                
                tester.clearCommands();
                extractCommands(controlCommand.split(";"), tester);
                List<String> commands = tester.commands();
                for (String command : commands) {
                    if (command.equals("connectAll")) {
                        tester.connectAll();
                    } else {
                        System.out.println("This command not allowed in slave mode " + command);
                    }
                }
            }

        } else if (args[0].equals("control")) {
            System.out.println("Running master");

            Tester tester = extractCommands(args, null);
            tester.control(true);
            tester.init();
            tester.sendRemoteConnectAll();
            
            tester.waitForAck();

            tester.sendMessage();

            tester.waitForAck();
            List<Integer> testTimes = tester.testTimes();
            
            System.out.println("There were this many clients " + testTimes.size());
            int sumTimes=0;
            int maxTime = Integer.MIN_VALUE;
            for (int i : testTimes){
                sumTimes+=i;
                if (i>maxTime) {
                    maxTime = i;
                }
            }
            System.out.println("The average times were " + (sumTimes / testTimes.size()));
            System.out.println("It took this long to recieve all messages " + maxTime);

            

        }else {
            System.out.println("Running standalone");

            Tester tester = extractCommands(args, null);
            tester.standalone(true);
            tester.init();
            tester.test();
            
            Thread.sleep(3000);
        }
    }

    private static Tester extractCommands(String[] args, Tester tester) {
        if (tester == null) {
            tester = new Tester();
        }
        boolean wasCommand = false;
        String command = null;

        if (args.length == 0) {
            printHelp();
        }

        for (int index = 0; index < args.length; index++) {
            String value = args[index];
            if (allowedCommandLineArgs.contains(value)) {
                command = value;
                wasCommand = true;
                continue;
            }
            if (wasCommand) {
                wasCommand = false;
                if ("numClients".equals(command) || "nc".equals(command)) {
                    tester.numClients (Integer.parseInt(value));
                } else if ("address".equals(command) || "a".equals(command)) {
                    tester.address(value);
                } else if ("message".equals(command) || "m".equals(command)) {
                    tester.message(value);
                } else if ("help".equals(command) || "--help".equals(command)
                        || "h".equals(command) || "-h".equals(command)) {
                    printHelp();
                } else if ("command".equals(command) || "c".equals(command)) {
                    tester.addCommand(value);
                }
            }
        }
        return tester;
    }

    private static void printHelp() {
        System.out.println("available commands");
        for (String command : COMMANDS) {
            System.out.println(command);
        }

        System.out
                .println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello");
        System.out
                .println("The above would connect five clients and send the message hello once to the chat server");

    }

}

[edit] Server side for Testing and Controlling Test Clients

LoadTestController

package com.example.loadtestcontrol;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import com.example.concurrent.ThreadedQueue;

@ApplicationScoped
@Startup
public class LoadTestController {

    Set<LoadTestPeerHandler> slaves = new HashSet<LoadTestPeerHandler>();
    BlockingQueue<Runnable> readQueue;

    @Inject
    Executor executor;

    

    public void remove(final LoadTestPeerHandler handler) {
        System.out.println("REMOVE SLAVE " + handler.hashCode());
        offerCommand(new Runnable() {
            @Override
            public void run() {
                System.out.println("DO REMOVE SLAVE " + handler.hashCode());

                slaves.remove(handler);               
            }});
    }
    public void add(final LoadTestPeerHandler handler) {
        System.out.println("ADD SLAVE " + handler.hashCode());

        offerCommand(new Runnable() {           
            @Override
            public void run() {
                System.out.println("DO ADD SLAVE " + handler.hashCode());

                slaves.add(handler);
            }
        });
    }

    public void sendCommand(final String command) {
        System.out.println("SEND COMMAND " + command);

        offerCommand(new Runnable() {           
            @Override
            public void run() {
                doSendCommand(command);
            }
        });
    }

    @PostConstruct
    void init() throws Exception {
        readQueue = new ThreadedQueue<Runnable>(new ArrayBlockingQueue<Runnable>(2000), new Runnable() {
            public void run() {
                LoadTestController.this.commandLoop();
            }},  executor);
        
    }



    private void commandLoop() {
        Runnable command;
        
        while ((command = readQueue.poll()) != null) {
            command.run();
        }

    }

    private void doSendCommand(String command) {
        System.out.println("DO SEND COMMAND...." + command);

        for (LoadTestPeerHandler slave : slaves) {
            slave.sendMessage(command);
        }
    }


     
    private void offerCommand(Runnable runnable) {
        try {
            readQueue.offer(runnable, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }



}

LoadTestPeerHandler

package com.example.loadtestcontrol;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketError;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;

import com.example.concurrent.ThreadedQueue;

@WebSocketEndpoint("/load")
public class LoadTestPeerHandler implements Closeable {
    
    boolean remoteController;
    boolean firstMessage;
    static LoadTestPeerHandler remoteControl;
    static AtomicInteger remoteControllersCount = new AtomicInteger(0);
    static AtomicInteger slaves = new AtomicInteger(0);
    static AtomicInteger ackCount = new AtomicInteger(0);

    
    public LoadTestPeerHandler() {
    }

    @Inject @ApplicationScoped 
    LoadTestController controller;
    
    @Inject Executor executor;


    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue;
    

    /** Flag to indicate whether we should close or not. */
    volatile boolean close = true;

    
    @WebSocketOpen
    public void onOpen(Session session) {
        firstMessage = true;
        this.session = session;
        init();
    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        this.close();
    }


    @WebSocketMessage
    public void onMessage(String message) throws IOException {
        System.out.println("2 CONTROL MESSAGE ##" + message + "##" + message.equals("control") + "##" + firstMessage);
        if (firstMessage) {
            System.out.println("FIRST MESSAGE");
            firstMessage=false;
            if (message.equals("control")) {
                System.out.println("FOUND CONTROL");

                if (remoteControllersCount.compareAndSet(0, 1)){
                    System.out.println("CONTROLLER FOUND");
                    this.remoteController = true;
                    remoteControl = this;
                } else {
                    System.out.println("More than one CONTROLLER FOUND"); 
                    session.getRemote().sendString("There is already a remote controller");
                    session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "we only allow one remote controller"));        
                }
            } else if (message.equals("slave")) {
                //Only slaves listen. Controllers control. 
                System.out.println("FOUND SLAVE");
                controller.add(this);
                slaves.incrementAndGet();
                this.remoteController = false;                
            } else {
                session.getRemote().sendString("You must be a control or a slave");
                session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "client must be control or slave"));
            }
            return;
        }
        
        if (this.remoteController) {
            System.out.println("MESSAGE: " + message);
            controller.sendCommand(message);
            ackCount.set(0);
        } else {
            
            if (message.startsWith("&&&ACK&&&")) {
                ackCount.incrementAndGet();
                if (ackCount.get()==slaves.get()) {
                    if (remoteControl!=null) {
                        System.out.println("GOT ALL ACKS::::" + ackCount.get());
                        remoteControl.sendMessage("GOT ALL ACKS::::" + ackCount.get());
                        ackCount.set(0);
                    }
                } else {
                    System.out.printf("Got %s acks out of %s slaves\n", ackCount, slaves);
                }
            } else if (message.startsWith("total time::::")) {
                System.out.printf("test time message is " + message);    
                if (remoteControl!=null) {
                    remoteControl.sendMessage(message);
                }
            } else {
                session.getRemote().sendString("1 You must be a controller to send control messages......" + message);
                session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "only controllers can send messages"));
            }
        }
        
    }
    
    
    private void init() {
        close = false;
        if (session == null) {
            throw new IllegalStateException("session is null");
        }
        writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), 
                new Runnable() {                
                    @Override
                    public void run() {
                        LoadTestPeerHandler.this.run();                   
                    }
                },  executor);

    }


    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        if (close) {
            return;
        }
        
        //System.out.println("" + this + "sendMessage(" + sendMessage);
        try {
            /* We could do some error handling here
             * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client.
             * We could assume if you are backed up 2000 messages, that you are not active for example
             * This would be a biz logic decision and since this is a sample app... 
             */
            this.writeQueue.put(sendMessage);
        } catch (InterruptedException e) {
            //this should never happen
        }
    }

    private void run() {
        String message;

        /* Get messages, send a message to client if not null, if message is null break. */
        while ((message = writeQueue.poll()) != null) {
            try {
                if (close) {
                    break;
                }
                session.getRemote().sendString(message);
            }  catch (IOException e) {
                e.printStackTrace();
                try {
                    this.close();
                } catch (IOException e1) {
                    //ignore so it does not mask original exception
                    //once you install loggers, make this an info and the other an error.
                }
                break;
            }
        }
    }

    public String getName() {
        return clientId;
    }



    @Override
    public void close() throws IOException {
        close = true;
        if (this.remoteController) {
            remoteControllersCount.decrementAndGet();
        } else  {
            controller.remove(this);
            slaves.decrementAndGet();
        }
        if (session != null) {
            session.close();
            session=null;
        }
    }

    @Override
    public String toString() {
        return "ChatClientPeerHandler [clientId=" + clientId + "]";
    }

    @WebSocketError
    public void onError(Throwable thrown) {
        thrown.printStackTrace();
    }

}
Personal tools
TOOLBOX
LANGUAGES