Websocket JSR 356 API Chat Room using latest spec, added performance testing and a Java client
From Resin 4.0 Wiki
(Created page with "More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our we...") |
|||
Line 1: | Line 1: | ||
− | More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our websocket implementation. I have tested it with one master and five slaves and it worked well. This plus EC2 should allow us to see how far we can make it scale. | + | More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our websocket implementation. I have tested it with one master and five slaves and it worked well. This plus EC2 should allow us to see how far we can make it scale. I also simplified the ChatRoom a lot, and fixed a very hard to find timing issue which resulted in not registering all clients when under load (the bug was in the ChatRoom not the Websocket implementation). |
Line 1,007: | Line 1,007: | ||
− | + | ==Java Client that remotely controls N Number of performance tester so we can run 100 or so clients with n number of connections in Amazon EC2== | |
+ | ===Client side of new tester=== | ||
The Control allows you to control remote instances. It is the websocket handler for the controller. | The Control allows you to control remote instances. It is the websocket handler for the controller. |
Latest revision as of 00:00, 25 November 2012
More clean up. Added a performance tester. Added ability to remotely control n number of performance testers so we can run 100 or so clients in Amazon EC2 and perf test our websocket implementation. I have tested it with one master and five slaves and it worked well. This plus EC2 should allow us to see how far we can make it scale. I also simplified the ChatRoom a lot, and fixed a very hard to find timing issue which resulted in not registering all clients when under load (the bug was in the ChatRoom not the Websocket implementation).
Contents |
Server Side
ChatClientPeerHandler
package com.example.server; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketError; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; import com.example.concurrent.ThreadedQueue; @WebSocketEndpoint("/chat") public class ChatClientPeerHandler implements Closeable { public ChatClientPeerHandler() { } @Inject @ApplicationScoped ChatRoom chatRoom; @Inject Executor executor; /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue; ChatRoom room; /** Flag to indicate whether we should close or not. */ volatile boolean close = true; @WebSocketOpen public void onOpen(Session session) { //System.out.println("@WebSocketOpen " + session); this.session = session; init(); } @WebSocketClose public void onClose() throws IOException { //System.out.println("@WebSocketClose " + session); this.close(); } @WebSocketMessage public void onMessage(String message) { String[] strings = message.split("::::"); String clientId = strings[1]; if (this.clientId!=null) { if (!this.clientId.equals(clientId)) { //This should never happen, but it proves our threads/client handling is correct. throw new IllegalStateException("client id " + this.clientId + " does not match " + clientId); } } if (message.startsWith("remove client::::")) { chatRoom.sendMessage(clientId, clientId + " has become bored and left"); try { this.close(); } catch (IOException e) { e.printStackTrace(); } } else if (message.startsWith("send message::::")) { if (clientId == null) { return; } String m = strings[2]; chatRoom.sendMessage(clientId, m); } else if (message.startsWith("add client::::")) { this.clientId = clientId; chatRoom.sendMessage(clientId, clientId + " has join the chat room"); } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void init() { close = false; if (session == null) { throw new IllegalStateException("session is null"); } writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), new Runnable() { @Override public void run() { ChatClientPeerHandler.this.run(); } }, executor); chatRoom.addClient(this); } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { if (close) { return; } //System.out.println("" + this + "sendMessage(" + sendMessage); try { /* We could do some error handling here * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client. * We could assume if you are backed up 2000 messages, that you are not active for example * This would be a biz logic decision and since this is a sample app... */ this.writeQueue.put(sendMessage); } catch (InterruptedException e) { //this should never happen } } private void run() { String message; /* Get messages, send a message to client if not null, if message is null break. */ while ((message = writeQueue.poll()) != null) { try { if (close) { break; } session.getRemote().sendString(message); } catch (IOException e) { e.printStackTrace(); try { this.close(); } catch (IOException e1) { //ignore so it does not mask original exception //once you install loggers, make this an info and the other an error. } break; } } } public String getName() { return clientId; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); session=null; } room.removeClient(this); } @Override public String toString() { return "ChatClientPeerHandler [clientId=" + clientId + "]"; } @WebSocketError public void onError(Throwable thrown) { thrown.printStackTrace(); } }
ChatRoom
package com.example.server; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import com.example.concurrent.ThreadedQueue; @ApplicationScoped @Startup public class ChatRoom { Set<ChatClientPeerHandler> chatClients = new HashSet<ChatClientPeerHandler>(); BlockingQueue<Runnable> readQueue ; @Inject Executor executor; public void addClient(final ChatClientPeerHandler session) { offerCommand( new Runnable() { @Override public void run() { chatClients.add(session); } }); } public void sendMessage(final String clientId, final String message) { offerCommand( new Runnable() { @Override public void run() { doSendMessage(clientId, message); } }); } public void removeClient(final ChatClientPeerHandler client) { offerCommand( new Runnable() { @Override public void run() { chatClients.remove(client); } }); } @PostConstruct void init() throws Exception { readQueue = new ThreadedQueue<Runnable>(new ArrayBlockingQueue<Runnable>(2000), new Runnable() { public void run() { commandLoop(); }}, executor); } private void commandLoop() { Runnable runnable; while ((runnable = readQueue.poll()) != null) { runnable.run(); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s to %s number of clients \n", sendMessage, chatClients.size()); for (ChatClientPeerHandler chatClientHandler : chatClients) { //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } chatClientHandler.sendMessage(sendMessage); } } private void offerCommand(Runnable runnable) { try { readQueue.offer(runnable, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
Fixed timing issue in ThreadedQueue.
ThreadedQueue
package com.example.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * A Queue that wakes up a runnable every time something is added to it. * This is a poor man's Actor. The thread is tied to the queue. * If you put an item in and there is not a thread, one will be launched. */ public class ThreadedQueue<T> implements BlockingQueue<T> { BlockingQueue<T> queue; Executor executor; Runnable queueProcessor; AtomicBoolean active = new AtomicBoolean(); public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor, Executor executor) { this.queue = queue; this.queueProcessor = queueProcessor; this.executor = executor; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public int remainingCapacity() { return queue.remainingCapacity(); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } @Override public boolean offer(T item) { boolean result = queue.offer(item); launchRunnable(); return result; } @Override public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException { boolean result = queue.offer(item); launchRunnable(); if (result) { return result; } else { result = queue.offer(item, timeout, unit); launchRunnable(); } return result; } @Override public boolean add(T item) { boolean result = queue.add(item); launchRunnable(); return result; } @Override public void put(T item) throws InterruptedException { /* Offer it if you can, this is non-blocking. */ if (queue.offer(item)) { launchRunnable(); return; } /* If you were unable to offer it, then go ahead and block. */ queue.put(item); launchRunnable(); } @Override public boolean addAll(Collection<? extends T> c) { boolean result = queue.addAll(c); launchRunnable(); return result; } @Override public T peek() { return queue.peek(); } @Override public T poll() { return queue.poll(); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } @Override public T element() { return queue.element(); } @Override public T take() throws InterruptedException { return queue.take(); } @Override public T remove() { return queue.remove(); } @Override public Iterator<T> iterator() { return queue.iterator(); } @Override public Object[] toArray() { return queue.toArray(); } @Override public <X> X[] toArray(X[] array) { return queue.toArray(array); } @Override public boolean removeAll(Collection<?> c) { return queue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return queue.retainAll(c); } @Override public void clear() { queue.clear(); } @Override public boolean remove(Object o) { return queue.remove(o); } @Override public int drainTo(Collection<? super T> c) { return queue.drainTo(c); } @Override public int drainTo(Collection<? super T> c, int maxElements) { return queue.drainTo(c, maxElements); } public void launchRunnable() { if (queue.isEmpty()) { return; } if (active.compareAndSet(false, true)) { executor.execute( new Runnable() { @Override public void run() { try { queueProcessor.run(); } finally { active.set(false); launchRunnable();//just in case someone was trying to get in this block before we set the flag. } } }); } } }
Simple Client Side Java
Java Websocket client
package com.example.client; import java.io.IOException; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { this.session = session; try { session.getRemote().sendString("add client::::" + this.hashCode()); session.getRemote().sendString("send message::::" + this.hashCode() + "::::hello from HELLO HELLO HELLO " + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @WebSocketClose public void onClose() throws IOException { if (session!=null) { session.close(); } session=null; } @WebSocketMessage public void onMessage(String message) { System.out.println(this.hashCode() + " got message " + message); } public static void main (String [] args) throws InterruptedException { ClientContainer container = ContainerProvider.getClientContainer(); for (int index = 0; index < 500; index++) { container.connectToServer(new TestClient(), "ws://localhost:8080/chat/chat"); } Thread.sleep(100000); } }
Client Side JavaScript
JavaScript Websocket client
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client 8</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>
Simple Java Client that measures time it takes to send a message to N clients
Command line arguments: address ws://localhost:8080/chat/chat numClients 900 message hello
package com.example.client; import java.io.IOException; import java.util.HashSet; import java.util.Set; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { static int numClients; volatile static int actualCount; volatile static int messageCount; volatile static long timeTestStarted; volatile static long timeTestEnded; static String address; static String message; static String[] COMMANDS = {"numClients", "nc", "address", "a", "message", "m", "help", "--help", "h", "-h"}; static Set<String> commands = new HashSet<String>(COMMANDS.length); static { for (String command : COMMANDS) { commands.add(command); } } /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { this.session = session; actualCount++; try { session.getRemote().sendString("add client::::" + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @WebSocketClose public void onClose() throws IOException { if (session!=null) { session.close(); } session=null; } @WebSocketMessage public void onMessage(String message) { synchronized (TestClient.class) { if (message.contains("has join the chat room")){ return; } messageCount++; if (messageCount==numClients) { timeTestEnded = System.currentTimeMillis(); } } } public static void main (String [] args) throws InterruptedException, IOException { boolean wasCommand=false; String command = null; actualCount = 0; messageCount = 0; timeTestStarted = 0; timeTestEnded = 0; if (args.length==0) { printHelp(); } for (int index=0; index < args.length; index++) { String value = args[index]; if (commands.contains(value)) { command = value; wasCommand = true; continue; } if (wasCommand) { wasCommand=false; if ("numClients".equals(command) || "nc".equals(command)) { numClients = Integer.parseInt(value); } else if ("address".equals(command) || "a".equals(command)) { address=value; } else if ("message".equals(command) || "m".equals(command)) { message=value; } else if ("help".equals(command) ||"--help".equals(command) || "h".equals(command) || "-h".equals(command)) { printHelp(); } } } ClientContainer container = ContainerProvider.getClientContainer(); for (int index = 0; index < numClients; index++) { container.connectToServer(new TestClient(), address); } Thread.sleep(10000); //wait for all of the clients to connect while (actualCount < numClients) { Thread.sleep(10); System.out.print("."); } System.out.println("\nAll clients LAUNCHED!"); TestClient client = new TestClient(); container.connectToServer(client, address); Thread.sleep(1000); if (message!=null) { messageCount = 0; Thread.sleep(1000); timeTestStarted = System.currentTimeMillis(); client.session.getRemote().sendString("send message::::" + client.hashCode() + ":::: " + message); System.out.println("Message just sent "); Thread.sleep(1000); int timeout=10; int retry=0; while(messageCount < numClients) { retry++; Thread.sleep(1000); System.out.printf("messageCount = %s, numClients=%s \n", messageCount, numClients); if (retry>timeout) { break; } } System.out.println("Messages recieved " + messageCount); System.out.println("Time it took in miliseconds " + (timeTestEnded - timeTestStarted)); } } private static void printHelp() { System.out.println("available commands"); for (String command : COMMANDS) { System.out.println(command); } System.out.println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello"); System.out.println("The above would connect five clients and send the message hello once to the chat server"); } }
Java Client that remotely controls N Number of performance tester so we can run 100 or so clients with n number of connections in Amazon EC2
Client side of new tester
The Control allows you to control remote instances. It is the websocket handler for the controller.
Control'
package com.example.client; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketError; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class Control { /** Session to send messages to the server. */ Session session; volatile boolean gotAcks; private int ackCount; List<Integer> testTimes = Collections.synchronizedList(new ArrayList<Integer>(100)); @WebSocketOpen public void onOpen(Session session) { this.session = session; } @WebSocketClose public void onClose() throws IOException { if (session != null) { session.close(); } session = null; } @WebSocketError public void onError(Throwable thrown) { thrown.printStackTrace(); } @WebSocketMessage public void onMessage(String message) { if (message.startsWith("GOT ALL ACKS::::")) { String[] split = message.split("::::"); ackCount = Integer.parseInt(split[1]); gotAcks = true; } else if (message.startsWith("total time::::")) { String[] split = message.split("::::"); int testTime = Integer.parseInt(split[1]); testTimes.add(testTime); if (ackCount==testTimes.size()) { gotAcks = true; } } else { System.err.println(message); } } boolean acknowledged () { if (gotAcks == true) { gotAcks = false; return true; } else { return false; } } List<Integer> testTimes () { return new ArrayList<Integer>(this.testTimes); } }
The Slave lets you receive remote control messages. It is the websocket handler for the slave.
You can also ack commands through the slave.
Slave
package com.example.client; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketError; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class Slave { BlockingQueue<String> commandQueue = new ArrayBlockingQueue<>(100); /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { System.out.println(); this.session = session; } @WebSocketMessage public void onCommand(String command) throws IOException { System.out.println("Add command to queue " + command); commandQueue.add(command); } @WebSocketClose public void onClose() throws IOException { if (session != null) { session.close(); } session = null; } public String nextCommand() { try { return commandQueue.take(); } catch (InterruptedException e) { /* We don't expect to be interrupted ever, but if we do, we have to cleanup. */ if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); return nextCommand(); } } @WebSocketError public void onError(Throwable thrown) { thrown.printStackTrace(); } }
The test client listens for chat messages and works with the Tester to count and measure timings.
TestClient
package com.example.client; import java.io.IOException; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { private Tester tester; /** Session to send messages to the server. */ Session session; boolean sendOnly; public TestClient(Tester tester2) { this.tester = tester2; } @WebSocketOpen public void onOpen(Session session) { this.session = session; try { session.getRemote().sendString("add client::::" + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } if (sendOnly) { return; } tester.incrementConnectionCount(); } @WebSocketClose public void onClose() throws IOException { System.out.println("ON CLOSED CALLED FOR TEST CLIENT"); if (session != null) { session.close(); } session = null; if (sendOnly) { return; } tester.deccrementConnectionCount(); } @WebSocketMessage public void onMessage(String message) { if (message.contains("has join the chat room")) { return; } tester.collectMessage(message); } }
The tester calculates how long things took. It runs in standalone mode, slave mode or controller mode.
It is written as a test script not a work of art. Any art was unintentional.
Tester
package com.example.client; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; public class Tester { // config private volatile int numClients; private String address; private String message; // private BlockingQueue<Runnable> queue; private List<String> commands = new ArrayList<String>(); // measure private AtomicInteger actualClientCount = new AtomicInteger(0); private AtomicInteger messageCount = new AtomicInteger(0); private long timeTestStarted; private long timeTestEnded; private boolean control; private boolean slave; private boolean standalone; //private Executor executor; private TestClient client; private Control controlClient; Slave backchannel; public Tester() { // executor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.MILLISECONDS, // new ArrayBlockingQueue<Runnable>(100)); // queue = new ThreadedQueue<Runnable>(new ArrayBlockingQueue<Runnable>( // 2000), new Runnable() { // public void run() { // commandLoop(); // } // }, executor); } // private void commandLoop() { // Runnable runnable; // while ((runnable = queue.poll()) != null) { // runnable.run(); // // } // // } public void init() throws InterruptedException, IOException { if (control || standalone) { createSender(); } if (control) { createMaster().session.getRemote().sendString("control"); } } public void numClients(int i) { this.numClients = i; } public void test() throws InterruptedException, IOException { connectAll(); sendMessage(); } private TestClient createSender() { if (client != null) { return client; } if (message != null) { ClientContainer container = ContainerProvider.getClientContainer(); client = new TestClient(null); client.sendOnly = true; container.connectToServer(client, address + "/chat"); } return client; } private Control createMaster() { if (controlClient != null && controlClient.session != null) { return controlClient; } ClientContainer container = ContainerProvider.getClientContainer(); controlClient = new Control(); container.connectToServer(controlClient, address + "/load"); return controlClient; } private void sendRemoteCommand(String commands) { System.out.println("sendRemoteCommand( " + commands); Control master; try { master = createMaster(); master.session.getRemote().sendString(commands); } catch (IOException e) { e.printStackTrace(); } } public void connectAll() { ClientContainer container = ContainerProvider.getClientContainer(); for (int index = actualClientCount.get(); index < numClients; index++) { container.connectToServer(new TestClient(Tester.this), address + "/chat"); } // wait for all of the clients to connect while (actualClientCount.get() < numClients) { System.out.print("."); } System.out.println("\nAll clients LAUNCHED!" + numClients + " " + actualClientCount); messageCount.set(0); timeTestStarted = System.currentTimeMillis(); if (slave) { sendAck(); } } private void sendAck() { try { backchannel.session.getRemote().sendString("&&&ACK&&&"); } catch (IOException e) { e.printStackTrace(); } } private void sendTestTime(long l, int count) { try { backchannel.session.getRemote().sendString("total time::::" + l + "::::" + count); } catch (IOException e) { e.printStackTrace(); } } public void incrementConnectionCount() { actualClientCount.incrementAndGet(); } public synchronized void collectMessage(final String testMessage) { messageCount.incrementAndGet(); if (testMessage.contains(message)) { int count = messageCount.get(); if (count % 3 == 0) { System.out.print(" " + count); } if (count == actualClientCount.get()) { timeTestEnded = System.currentTimeMillis(); System.out.println("\n\nTotal time:" + (timeTestEnded - timeTestStarted)); if (slave) { sendTestTime(timeTestEnded - timeTestStarted, count); } }else { long currentTime = System.currentTimeMillis(); if (currentTime-timeTestStarted > 1000) { if (slave) { sendTestTime(timeTestEnded - timeTestStarted, count); } } } } } public void addCommand(String command) { this.commands.add(command); } public void sendMessage() { TestClient client; try { client = createSender(); client.session.getRemote().sendString( "send message::::" + client.hashCode() + ":::: " + message); } catch (IOException e) { e.printStackTrace(); } System.out.println("Message just sent "); } public void sendRemoteConnectAll() { System.out.println("sendRemoteConnectAll"); sendRemoteCommand("command;connectAll"); } public void sendRemotePause() { System.out.println("sendRemotePause"); sendRemoteCommand("command;pause"); } public void slave(boolean b) { this.slave = b; } public void address(String value) { address = value; } public void clearCommands() { this.commands.clear(); } public List<String> commands() { return new ArrayList<String>(this.commands); } public void control(boolean b) { this.control = b; } public void standalone(boolean b) { this.standalone = b; } public void message(String value) { message = value; } public String address() { return address; } public void deccrementConnectionCount() { actualClientCount.decrementAndGet(); } public void setSlave(Slave s) { backchannel = s; } public void waitForAck() { while (!this.controlClient.acknowledged()) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } public List<Integer> testTimes() { return controlClient.testTimes(); } }
Parses command line arguments and invokes the Tester.
TestMain
package com.example.client; import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; public class TestMain { static String[] COMMANDS = { "numClients", "nc", "address", "a", "message", "m", "help", "--help", "h", "-h", "sleep", "s", "command" }; static Set<String> allowedCommandLineArgs = new HashSet<String>(COMMANDS.length); static { for (String command : COMMANDS) { allowedCommandLineArgs.add(command); } } public static void main(String[] args) throws InterruptedException, IOException { if (args[0].equals("slave")) { System.out.println("Running slave"); Tester tester = extractCommands(args, null); ClientContainer container = ContainerProvider.getClientContainer(); Slave slave = new Slave(); container.connectToServer(slave, tester.address() + "/load"); tester.slave(true); slave.session.getRemote().sendString("slave"); tester.setSlave(slave); tester.connectAll(); while (true) { System.out.println("Waiting for next command"); String controlCommand = slave.nextCommand(); System.out.println("COMMAND: " + controlCommand); tester.clearCommands(); extractCommands(controlCommand.split(";"), tester); List<String> commands = tester.commands(); for (String command : commands) { if (command.equals("connectAll")) { tester.connectAll(); } else { System.out.println("This command not allowed in slave mode " + command); } } } } else if (args[0].equals("control")) { System.out.println("Running master"); Tester tester = extractCommands(args, null); tester.control(true); tester.init(); tester.sendRemoteConnectAll(); tester.waitForAck(); tester.sendMessage(); tester.waitForAck(); List<Integer> testTimes = tester.testTimes(); System.out.println("There were this many clients " + testTimes.size()); int sumTimes=0; int maxTime = Integer.MIN_VALUE; for (int i : testTimes){ sumTimes+=i; if (i>maxTime) { maxTime = i; } } System.out.println("The average times were " + (sumTimes / testTimes.size())); System.out.println("It took this long to recieve all messages " + maxTime); }else { System.out.println("Running standalone"); Tester tester = extractCommands(args, null); tester.standalone(true); tester.init(); tester.test(); Thread.sleep(3000); } } private static Tester extractCommands(String[] args, Tester tester) { if (tester == null) { tester = new Tester(); } boolean wasCommand = false; String command = null; if (args.length == 0) { printHelp(); } for (int index = 0; index < args.length; index++) { String value = args[index]; if (allowedCommandLineArgs.contains(value)) { command = value; wasCommand = true; continue; } if (wasCommand) { wasCommand = false; if ("numClients".equals(command) || "nc".equals(command)) { tester.numClients (Integer.parseInt(value)); } else if ("address".equals(command) || "a".equals(command)) { tester.address(value); } else if ("message".equals(command) || "m".equals(command)) { tester.message(value); } else if ("help".equals(command) || "--help".equals(command) || "h".equals(command) || "-h".equals(command)) { printHelp(); } else if ("command".equals(command) || "c".equals(command)) { tester.addCommand(value); } } } return tester; } private static void printHelp() { System.out.println("available commands"); for (String command : COMMANDS) { System.out.println(command); } System.out .println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello"); System.out .println("The above would connect five clients and send the message hello once to the chat server"); } }
Server side for Testing and Controlling Test Clients
LoadTestController
package com.example.loadtestcontrol; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import com.example.concurrent.ThreadedQueue; @ApplicationScoped @Startup public class LoadTestController { Set<LoadTestPeerHandler> slaves = new HashSet<LoadTestPeerHandler>(); BlockingQueue<Runnable> readQueue; @Inject Executor executor; public void remove(final LoadTestPeerHandler handler) { System.out.println("REMOVE SLAVE " + handler.hashCode()); offerCommand(new Runnable() { @Override public void run() { System.out.println("DO REMOVE SLAVE " + handler.hashCode()); slaves.remove(handler); }}); } public void add(final LoadTestPeerHandler handler) { System.out.println("ADD SLAVE " + handler.hashCode()); offerCommand(new Runnable() { @Override public void run() { System.out.println("DO ADD SLAVE " + handler.hashCode()); slaves.add(handler); } }); } public void sendCommand(final String command) { System.out.println("SEND COMMAND " + command); offerCommand(new Runnable() { @Override public void run() { doSendCommand(command); } }); } @PostConstruct void init() throws Exception { readQueue = new ThreadedQueue<Runnable>(new ArrayBlockingQueue<Runnable>(2000), new Runnable() { public void run() { LoadTestController.this.commandLoop(); }}, executor); } private void commandLoop() { Runnable command; while ((command = readQueue.poll()) != null) { command.run(); } } private void doSendCommand(String command) { System.out.println("DO SEND COMMAND...." + command); for (LoadTestPeerHandler slave : slaves) { slave.sendMessage(command); } } private void offerCommand(Runnable runnable) { try { readQueue.offer(runnable, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
LoadTestPeerHandler
package com.example.loadtestcontrol; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketError; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; import com.example.concurrent.ThreadedQueue; @WebSocketEndpoint("/load") public class LoadTestPeerHandler implements Closeable { boolean remoteController; boolean firstMessage; static LoadTestPeerHandler remoteControl; static AtomicInteger remoteControllersCount = new AtomicInteger(0); static AtomicInteger slaves = new AtomicInteger(0); static AtomicInteger ackCount = new AtomicInteger(0); public LoadTestPeerHandler() { } @Inject @ApplicationScoped LoadTestController controller; @Inject Executor executor; /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue; /** Flag to indicate whether we should close or not. */ volatile boolean close = true; @WebSocketOpen public void onOpen(Session session) { firstMessage = true; this.session = session; init(); } @WebSocketClose public void onClose() throws IOException { this.close(); } @WebSocketMessage public void onMessage(String message) throws IOException { System.out.println("2 CONTROL MESSAGE ##" + message + "##" + message.equals("control") + "##" + firstMessage); if (firstMessage) { System.out.println("FIRST MESSAGE"); firstMessage=false; if (message.equals("control")) { System.out.println("FOUND CONTROL"); if (remoteControllersCount.compareAndSet(0, 1)){ System.out.println("CONTROLLER FOUND"); this.remoteController = true; remoteControl = this; } else { System.out.println("More than one CONTROLLER FOUND"); session.getRemote().sendString("There is already a remote controller"); session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "we only allow one remote controller")); } } else if (message.equals("slave")) { //Only slaves listen. Controllers control. System.out.println("FOUND SLAVE"); controller.add(this); slaves.incrementAndGet(); this.remoteController = false; } else { session.getRemote().sendString("You must be a control or a slave"); session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "client must be control or slave")); } return; } if (this.remoteController) { System.out.println("MESSAGE: " + message); controller.sendCommand(message); ackCount.set(0); } else { if (message.startsWith("&&&ACK&&&")) { ackCount.incrementAndGet(); if (ackCount.get()==slaves.get()) { if (remoteControl!=null) { System.out.println("GOT ALL ACKS::::" + ackCount.get()); remoteControl.sendMessage("GOT ALL ACKS::::" + ackCount.get()); ackCount.set(0); } } else { System.out.printf("Got %s acks out of %s slaves\n", ackCount, slaves); } } else if (message.startsWith("total time::::")) { System.out.printf("test time message is " + message); if (remoteControl!=null) { remoteControl.sendMessage(message); } } else { session.getRemote().sendString("1 You must be a controller to send control messages......" + message); session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "only controllers can send messages")); } } } private void init() { close = false; if (session == null) { throw new IllegalStateException("session is null"); } writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), new Runnable() { @Override public void run() { LoadTestPeerHandler.this.run(); } }, executor); } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { if (close) { return; } //System.out.println("" + this + "sendMessage(" + sendMessage); try { /* We could do some error handling here * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client. * We could assume if you are backed up 2000 messages, that you are not active for example * This would be a biz logic decision and since this is a sample app... */ this.writeQueue.put(sendMessage); } catch (InterruptedException e) { //this should never happen } } private void run() { String message; /* Get messages, send a message to client if not null, if message is null break. */ while ((message = writeQueue.poll()) != null) { try { if (close) { break; } session.getRemote().sendString(message); } catch (IOException e) { e.printStackTrace(); try { this.close(); } catch (IOException e1) { //ignore so it does not mask original exception //once you install loggers, make this an info and the other an error. } break; } } } public String getName() { return clientId; } @Override public void close() throws IOException { close = true; if (this.remoteController) { remoteControllersCount.decrementAndGet(); } else { controller.remove(this); slaves.decrementAndGet(); } if (session != null) { session.close(); session=null; } } @Override public String toString() { return "ChatClientPeerHandler [clientId=" + clientId + "]"; } @WebSocketError public void onError(Throwable thrown) { thrown.printStackTrace(); } }