WebSocket API Chat Room using JSR 356, poor man's actor model and WebSocketEndpoint annotation

From Resin 4.0 Wiki

(Difference between revisions)
Jump to: navigation, search
 
(One intermediate revision by one user not shown)
Line 1: Line 1:
 +
[[WebSocket API Chat Room using JSR 356, poor man's actor model and WebSocketEndpoint annotation take II]]
 +
 +
This code needs some refactoring, but it works well.
 +
 
We are down to four public classes. One of those classes is a utility class.
 
We are down to four public classes. One of those classes is a utility class.
 
It should be able to scale such that the limiting factor is how many FILE_DESCRIPTORS your OS supports.
 
It should be able to scale such that the limiting factor is how many FILE_DESCRIPTORS your OS supports.
Line 588: Line 592:
 
                                 queueProcessor.run();
 
                                 queueProcessor.run();
 
                             } finally {
 
                             } finally {
                               active.set(false);
+
                               active.set(false);
 +
                              launchRunnable();//make sure nothing got in the queue between end of run and the finally block.   
 
                             }
 
                             }
 
                         }
 
                         }

Latest revision as of 00:00, 17 November 2012

WebSocket API Chat Room using JSR 356, poor man's actor model and WebSocketEndpoint annotation take II

This code needs some refactoring, but it works well.

We are down to four public classes. One of those classes is a utility class. It should be able to scale such that the limiting factor is how many FILE_DESCRIPTORS your OS supports. I moved the message parsing to WebsocketMessageHandler. Since it is now using annotations, the ChatRoom does not have many ties left to WebSockets. The one remaining tie that ChatRoom has to websockets is the session (this could be wrapped if you wanted to separate sending of messages). ChatClientPeerHandler depends on websockets session, and ChatRoom manages instances of ChatClientPeerHandler.

ChatClientPeerHandler handles outgoing messages. WebsocketMessageHandler handles incoming messages. ChatRoom manages dispatching messages to ChatClientPeerHandlers.

Browser -> Message In -> WebsocketMessageHandler -> ChatRoom ->*ChatClientPeerHandlers -> Message Out -> Broswer

There are many WebsocketMessageHandlers in the system. There are many ChatClientPeerHandlers in the system.

WebsocketMessageHandlers is for in messages. ChatClientPeerHandlers is for out messages. We could perhaps combine these or not. They hold onto the same info (the websocket session).


WebsocketMessageHandler

package com.example.websocket;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.Session;
import javax.net.websocket.annotations.WebSocketEndpoint;
import javax.net.websocket.annotations.WebSocketMessage;
import javax.net.websocket.annotations.WebSocketOpen;

import com.example.ChatRoom;

@WebSocketEndpoint("/chat")
public class WebsocketMessageHandler {

    @Inject @ApplicationScoped 
    ChatRoom chatRoom;    
    Session session;

    
    @WebSocketOpen
    public void onOpen(Session session) {
        System.out.println("@WebSocketOpen " + session);
        this.session = session;
    }

    @WebSocketMessage
    public void onMessage(String message) {
        
        String[] strings = message.split("::::");
        String clientId = strings[1];
 

        if (message.startsWith("remove client::::")) {
            chatRoom.removeClient(clientId);
        } else if (message.startsWith("send message::::")) {
            String m = strings[2];
            chatRoom.sendMessage(clientId, m);
        } else if (message.startsWith("add client::::")) {
            chatRoom.addClient(clientId, session);
        } else {
            System.err.println("ACK... Don't understand your message!!!!! " + message);
        }
    }

}

ChatRoom

package com.example;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.Session;

import com.example.concurrent.ThreadedQueue;

@ApplicationScoped
@Startup
public class ChatRoom {

    Map<String, ChatClientPeerHandler> chatClients = new HashMap<String, ChatClientPeerHandler>();
    BlockingQueue<ChatMessage> readQueue ;
    @Inject Executor executor;

    
    public void addClient(String clientId, Session session) {
        offerMessage(new ChatMessage(session, clientId));        

    }

    public void sendMessage(String clientId, String message) {
        offerMessage(new ChatMessage(clientId, message));        
    }

    
    public void removeClient(String clientId) {
        offerMessage(new ChatMessage(clientId));                
    }
    


    
    @PostConstruct
    void init() throws Exception {
        System.out.println("POST CONSTRUCT CALLED");
        
        readQueue = new ThreadedQueue<ChatMessage>(new ArrayBlockingQueue<ChatMessage>(2000), new Runnable() {
            public void run() {
                ChatRoom.this.run();
            }},  executor);
        
    }


    private void run() {
        ChatMessage message;
        
        while ((message = readQueue.poll()) != null) {
             dispatchMessage(message);
        }

    }

    private void launchNewClient(ChatMessage message) {
        System.out.println("launchNewClient::::" + message.getClientId());
        
        ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler(this,
                message.getSession(), message.getClientId(), executor);
        chatClients.put(chatClientHandler.getName(), chatClientHandler);
        doSendMessage(chatClientHandler.getName(), chatClientHandler.getName()
                + " has join the chat room");
    }

    private void dispatchMessage(ChatMessage message) {
        System.out.println("Dispatch Message::" + message);
        
        switch(message.getType()) {
        case ADD_CLIENT:
            launchNewClient(message);
            break;
        case REMOVE_CLIENT:
            doRemoveClient(message.getClientId());
            break;
        case SEND_MESSAGE:
            doSendMessage(message.getClientId(), message.getMessage());
            break;
        default:
             System.err.println("ACK... Don't understand your message!!!!! "
                     + message);
             
        }

    }

    private void doSendMessage(String client, String message) {

        String sendMessage = String.format("%s : %s", client, message);
        System.out.printf("SendMessage::Sending message %s\n", sendMessage);
        Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator();
        while (iterator.hasNext()) {
            ChatClientPeerHandler chatClientHandler = iterator.next();
            
            //prevents sending messages to yourself
            if (client.equals(chatClientHandler.getName())){
                continue;
            }

            System.out.printf("sendMessage::Sending message %s to %s\n",
                        sendMessage, chatClientHandler.getName());
            chatClientHandler.sendMessage(sendMessage);
        }
    }

    private void doRemoveClient(String client) {

        System.out.println("removeClient::::[" + client + "]::::");

        ChatClientPeerHandler chatClientHandler = chatClients.get(client);

        if (chatClientHandler != null) {

            System.out.println("removeClient:::: found " + client
                    + " to remove.");

            doSendMessage(chatClientHandler.getName(),
                    chatClientHandler.getName()
                            + " has become bored with this chat room");

            chatClients.remove(client);
            try {
                chatClientHandler.close();
            } catch (IOException e) {
            }
        }

    }

    static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT};
    
    static class ChatMessage {

        String message;
        Session session;
        MessageType type;
        String clientId;
        
     
        public MessageType getType() {
            return type;
        }

        public String getClientId() {
            return clientId;
        }

        public ChatMessage(String clientId) {
            this.clientId = clientId;
            this.type=MessageType.REMOVE_CLIENT;
        }
        
        public ChatMessage(Session session, String clientId) {
            this.session = session;
            this.clientId = clientId;
            this.type=MessageType.ADD_CLIENT;
        }


        public ChatMessage(String clientId, String message) {
            this.clientId = clientId;
            this.message = message;
            this.type=MessageType.SEND_MESSAGE;
        }

        public final String getMessage() {
            return message;
        }
        public final Session getSession() {
            return session;
        }

        @Override
        public String toString() {
            return "ChatMessage [message=" + message + ", session=" + session
                    + ", type=" + type + ", clientId=" + clientId + "]";
        }

    }
    
    private void offerMessage(ChatMessage message) {
        try {
            readQueue.offer(message, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }
}

ChatClientPeerHandler

package com.example;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

import javax.net.websocket.Session;

import com.example.concurrent.ThreadedQueue;

public class ChatClientPeerHandler implements Closeable {

    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue;
    
    ChatRoom room;

    /** Flag to indicate whether we should close or not. */
    volatile boolean close;


    public ChatClientPeerHandler(ChatRoom room, Session session, String name, Executor executor) {
        this.session = session;
        this.clientId = name;
        this.room = room;
        
        writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), 
                new Runnable() {                
                    @Override
                    public void run() {
                        ChatClientPeerHandler.this.run();                   
                    }
                },  executor);
        
    }

    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        System.out.println("" + this + "sendMessage(" + sendMessage);
        try {
            /* We could do some error handling here
             * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client.
             * We could assume if you are backed up 2000 messages, that you are not active for example
             * This would be a biz logic decision and since this is a sample app... 
             */
            this.writeQueue.put(sendMessage);
        } catch (InterruptedException e) {
            //this should never happen
        }
    }

    private void run() {
        String message;

        /* Get messages, send a message to client if not null, if message is null break. */
        while ((message = writeQueue.poll()) != null) {
            try {
                if (close) {
                    break;
                }
                session.getRemote().sendString(message);
            }  catch (IOException e) {
                e.printStackTrace();
                break;
            }
        }
    }

    public String getName() {
        return clientId;
    }



    @Override
    public void close() throws IOException {
        close = true;
        if (session != null) {
            session.close();
        }
        room.removeClient(this.clientId);
    }

    @Override
    public String toString() {
        return "ChatClientPeerHandler [clientId=" + clientId + "]";
    }
    
    

}

ThreadedQueue

package com.example.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A Queue that wakes up a runnable every time something is added to it.
 * This is a poor man's Actor. The thread is tied to the queue.
 * If you put an item in and there is not a thread, one will be launched.
 */
public class ThreadedQueue<T> implements BlockingQueue<T> {
    BlockingQueue<T> queue;
    Executor executor;
    Runnable queueProcessor;
    AtomicBoolean active = new AtomicBoolean();

    public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor,
            Executor executor) {
        this.queue = queue;
        this.queueProcessor = queueProcessor;
        this.executor = executor;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }

    @Override
    public boolean offer(T item) {
        boolean result = queue.offer(item);

        launchRunnable();

        return result;
    }

    @Override
    public boolean offer(T item, long timeout, TimeUnit unit)
            throws InterruptedException {
        boolean result = queue.offer(item);

        launchRunnable();

        if (result) {
            return result;
        } else {
            result = queue.offer(item, timeout, unit);
            launchRunnable();
        }
        
        return result;
    }

    @Override
    public boolean add(T item) {
        boolean result = queue.add(item);

        launchRunnable();

        return result;
    }

    @Override
    public void put(T item) throws InterruptedException {
        System.out.println("PUT CALLED " + item);
        
        /* Offer it if you can, this is non-blocking. */
        if (queue.offer(item)) {
            launchRunnable();
            return;
        }


        /* If you were unable to offer it, then go ahead and block. */
        queue.put(item);

        launchRunnable();
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {
        boolean result = queue.addAll(c);

        launchRunnable();

        return result;
    }

    @Override
    public T peek() {
        return queue.peek();
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    @Override
    public T element() {
        return queue.element();
    }

    @Override
    public T take() throws InterruptedException {
        return queue.take();
    }

    @Override
    public T remove() {
        return queue.remove();
    }

    @Override
    public Iterator<T> iterator() {
        return queue.iterator();
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <X> X[] toArray(X[] array) {
        return queue.toArray(array);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return queue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return queue.retainAll(c);
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public boolean remove(Object o) {
        return queue.remove(o);
    }

    @Override
    public int drainTo(Collection<? super T> c) {
        return queue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super T> c, int maxElements) {
        return queue.drainTo(c, maxElements);
    }

    public void launchRunnable() {
        System.out.println("LAUNCH RUNNABLE");
        if (queue.isEmpty()) {
            return;
        }

        if (active.compareAndSet(false, true)) {
            executor.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                queueProcessor.run();
                            } finally {
                              active.set(false);
                              launchRunnable();//make sure nothing got in the queue between end of run and the finally block.    
                            }
                        }
                    });
        }
    }

 }

index.html

<html>

<head>

<title>Chat Client</title>

<link rel="stylesheet" type="text/css" href="css/style.css"
    media="screen" />


<script type="text/javascript" src="scripts/jquery-1.7.1.js"></script>


<script type="text/javascript">
    var ENTER_KEY = '13';
    var TOKEN_DELIM = "::::";
    
    

    
    function buildWebSocketURL() {
        var url = document.URL;
        var parts = url.split('/');
        var scheme = parts[0];
        var hostPort = parts[2];
        var wssScheme = null;
        
        
        if (scheme=="http:") {
            wssScheme="ws:";
        } else if (scheme=="https:") {
            wssScheme="wss:";
        }
        
        wssUrl = wssScheme + "//" + hostPort  + "/chat/chat";
        
        return wssUrl;
        
    }
    

    var chatUserSession = {
                version : "1.0",
                webSocketProtocol : "caucho-example-chat-protocol",
                webSocketURL : buildWebSocketURL(),
                webSocket : null,//WebSocket
                userName : null
    };

    function chat_sendMessage(message) {
        $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>");

        chatUserSession.webSocket.send("send message" + TOKEN_DELIM
                + chatUserSession.userName + TOKEN_DELIM + message);
    }

    function chat_joinChat() {
        chatUserSession.webSocket.send("add client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_leaveChat() {
        chatUserSession.status(chatUserSession.userName + " is leaving chat");
        chatUserSession.webSocket.send("remove client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_openWebSocket() {
        chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL,
                chatUserSession.webSocketProtocol);
        var socket = chatUserSession.webSocket;

        socket.onmessage = function(msg) {
            chatUserSession.onMessage(msg);
        }

        socket.onerror = function(errorEvent) {
            chatUserSession.onError(errorEvent);
        }

        socket.onopen = function() {
            chatUserSession.onOpen();
        }
        socket.onclose = function(closeEvent) {
            chatUserSession.onClose(closeEvent);
        }

    }

    function chat_onMessage(msgEvent) {
        chatUserSession.status("New Message :" + msgEvent.data);

        $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>");
    }

    function chat_Login() {

        chatUserSession.userName = $("#userName").val();
        $("#loginDiv").hide(500);
        $('#header').text(
                "Chat Client (logging in...) : " + chatUserSession.userName);

        chatUserSession.status(chatUserSession.userName + " is logging in...");
        chatUserSession.open();

    }

    function chat_onOpen() {
        chatUserSession.joinChat();
        chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName);
          $('#header').text(
                    "Chat Client (logged in...) : " + chatUserSession.userName);

        $("#inputArea").show(500);
        $("#statusBar").show(500);
        $("#chatInput").focus();
    }
    
    function chat_Status(message) {
        $('#statusBarPara1').text(message);
        $("#statusBar").show(500);
        
    }

    function chat_onClose(closeEvent) {
        $("#loginDiv").show(500);
        $('#header').text(
                "Chat Client (not connected) : " + chatUserSession.userName);
        $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + 
                ":: Reason: " + closeEvent.reason + 
                " Code: " + closeEvent.code);

        $("#inputArea").hide(500);
        $("#statusBar").show(500);

        $("#userName").val(chatUserSession.userName);
        $("#userName").focus();
    }

    function chat_onError(msg) {
        $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg));
        $("#statusBar").show(500);
    }

    chatUserSession.open = chat_openWebSocket;
    chatUserSession.onMessage = chat_onMessage;
    chatUserSession.onOpen = chat_onOpen;
    chatUserSession.login = chat_Login;
    chatUserSession.onClose = chat_onClose;
    chatUserSession.onError = chat_onError;
    chatUserSession.joinChat = chat_joinChat;
    chatUserSession.sendMessage = chat_sendMessage;
    chatUserSession.leaveChat = chat_leaveChat;
    chatUserSession.status = chat_Status;

    $(document).ready(function() {

        $("#inputArea").hide();
        $("#userName").focus();
        
        

        $("#statusBar").click(function() {
            $("#statusBar").hide(300);
        });

        $("#chatInput").keypress(function(event) {

            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                var textMessage = $("#chatInput").val();

                if (textMessage=="bye!") {
                    chatUserSession.leaveChat();
                } else {
                    $("#chatInput").val("");
                    $("#hint").hide(500);
                    chatUserSession.sendMessage(textMessage);
                }
            }
            event.stopPropagation();
        });

        $("#login").click(function(event) {
            chatUserSession.login();
            event.stopPropagation();
        });

        $("#userName").keypress(function(event) {
            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                chatUserSession.login()
                event.stopPropagation();
            }
        });
    });
</script>

</head>

<body>

    <h1 id="header">Chat Client 7</h1>


    <div id="statusBar">
        <p id="statusBarPara1">Welcome to Chat App, Click to hide</p>
    </div>

    <div id="loginDiv">
        User name   <input id="userName" type="text" /> <input
            id="login" type="submit" value="Login" />
    </div>


    <div id="inputArea">
        <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p>
        <input id="chatInput" type="text" value="" />
    </div>

    <div id="chatHistoryDiv">
        <p id="chatHistory"></p>
    </div>


</body>

</html>
Personal tools
TOOLBOX
LANGUAGES