WebSocket API Chat Room using JSR 356

From Resin 4.0 Wiki

Jump to: navigation, search


This version will scale to however many threads your box can run (64 bit OS running a multi core machine can run thousands of threads).

There is another version that will run to however many FILE_DESCRIPTORS your OS supports using a poor man's actor model.

WebSocket API Chat Room using JSR 356 and poor man's actor model

ChatServerWebSocketEndpoint

package com.example.websocket;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.Endpoint;
import javax.net.websocket.Session;

import com.example.ChatRoom;


public class ChatServerWebSocketEndpoint extends Endpoint{

    @Inject @ApplicationScoped 
    ChatRoom room;
    

    @Override
    public void onOpen(Session session) {
        session.addMessageHandler(new ChatWebsocketMessageHandler(session, room));
    }
    

}


The ChatServerWebSocketEndpoint binds a ChatWebsocketMessageHandler to the WebSocket session.

The ChatWebsocketMessageHandler uses a chatRoom to send a message to other clients, when it gets a message.

package com.example.websocket;


import javax.net.websocket.MessageHandler;
import javax.net.websocket.Session;

import com.example.ChatMessage;
import com.example.ChatRoom;

public class ChatWebsocketMessageHandler implements MessageHandler.Text {

    ChatRoom chatRoom;
    Session session;
    
    public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) {
        this.session = session;
        this.chatRoom = chatRoom;
    }

    @Override
    public void onMessage(String message) {
        chatRoom.sendMessage(new ChatMessage(message, session));
    }

}

The ChatWebsocketMessageHandler wraps the message in a ChatMessage. The ChatMessage parses what type of chat message this is.


ChatMessage

package com.example;

import javax.net.websocket.Session;

public class ChatMessage {
    static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT};

    String message;
    Session session;
    MessageType type;
    String clientId;
    
 
    public MessageType getType() {
        return type;
    }

    public String getClientId() {
        return clientId;
    }

    public ChatMessage(MessageType type, String clientId) {
        
    }
    public ChatMessage(String strMessage, Session session) {
        
        String[] strings = strMessage.split("::::");
        clientId = strings[1];
        this.session = session;


        if (strMessage.startsWith("remove client::::")) {
            type = MessageType.REMOVE_CLIENT;
            this.session = session;
        } else if (strMessage.startsWith("send message::::")) {
            type = MessageType.SEND_MESSAGE;
            message = strings[2];
        } else if (strMessage.startsWith("add client::::")) {
            type = MessageType.ADD_CLIENT;
        } else {
            System.err.println("ACK... Don't understand your message!!!!! "
                    + message);
        }
    }

    public final String getMessage() {
        return message;
    }
    public final Session getSession() {
        return session;
    }

    @Override
    public String toString() {
        return "ChatMessage [message=" + message + ", session=" + session
                + ", type=" + type + ", clientId=" + clientId + "]";
    }

}

ChatRoom

package com.example;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.ContainerProvider;
import javax.net.websocket.DefaultServerConfiguration;
import javax.net.websocket.ServerContainer;
import javax.net.websocket.ServerEndpointConfiguration;

import com.example.ChatMessage.MessageType;
import com.example.websocket.ChatServerWebSocketEndpoint;

@ApplicationScoped
@Startup
public class ChatRoom {

    Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>();

    BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>(
            2000);

    @Inject Executor executor;
    @Inject ChatServerWebSocketEndpoint endpoint;
    
    @PostConstruct
    void init() throws Exception {
        System.out.println("POST CONSTRUCT CALLED");
        ServerEndpointConfiguration serverConfiguration;
        serverConfiguration = new DefaultServerConfiguration(new URI("/chat"));
        ServerContainer serverContainer = ContainerProvider.getServerContainer();
        serverContainer.publishServer(endpoint,serverConfiguration);
        
        executor.execute(new Runnable() {
            public void run() {
                ChatRoom.this.run();
            }});
    }


    private void run() {
        while (true) {
            try {
                ChatMessage message = readQueue
                        .poll(500, TimeUnit.MILLISECONDS);
                if (message != null) {
                    dispatchMessage(message);
                }
            } catch (InterruptedException e) {
                if (Thread.currentThread().isInterrupted()) {
                    Thread.interrupted();
                }
            }
        }
    }

    private void launchNewClient(ChatMessage message) {
        System.out.println("launchNewClient::::" + message.getClientId());
        
        ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler(
                message.getSession(), message.getClientId());
        chatClients.put(chatClientHandler.getName(), chatClientHandler);
        executor.execute(chatClientHandler);
        doSendMessage(chatClientHandler.getName(), chatClientHandler.getName()
                + " has join the chat room");
    }

    private void dispatchMessage(ChatMessage message) {
        System.out.println("Dispatch Message::" + message);
        
        switch(message.getType()) {
        case ADD_CLIENT:
            launchNewClient(message);
            break;
        case REMOVE_CLIENT:
            doRemoveClient(message.getClientId());
            break;
        case SEND_MESSAGE:
            doSendMessage(message.getClientId(), message.getMessage());
            break;
        default:
             System.err.println("ACK... Don't understand your message!!!!! "
                     + message);
             
        }

    }

    private void doSendMessage(String client, String message) {

        String sendMessage = String.format("%s : %s", client, message);
        System.out.printf("SendMessage::Sending message %s\n", sendMessage);
        Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator();
        while (iterator.hasNext()) {
            ChatClientPeerHandler chatClientHandler = iterator.next();
            
            //prevents sending messages to yourself
            if (client.equals(chatClientHandler.getName())){
                continue;
            }

            System.out.printf("sendMessage::Sending message %s to %s\n",
                        sendMessage, chatClientHandler.getName());
            chatClientHandler.sendMessage(sendMessage);
        }
    }

    private void doRemoveClient(String client) {

        System.out.println("removeClient::::[" + client + "]::::");

        ChatClientPeerHandler chatClientHandler = chatClients.get(client);

        if (chatClientHandler != null) {

            System.out.println("removeClient:::: found " + client
                    + " to remove.");

            doSendMessage(chatClientHandler.getName(),
                    chatClientHandler.getName()
                            + " has become bored with this chat room");

            chatClients.remove(client);
            try {
                chatClientHandler.close();
            } catch (IOException e) {
            }
        }

    }

    public void sendMessage(ChatMessage message) {
        try {
            readQueue.offer(message, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }
    
    public void removeClient(String clientId) {
        ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId);
        
        try {
            readQueue.offer(message, 100, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }

}

The ChatClientPeerHandler manages sending messages back to the clients.

ChatClientPeerHandler

package com.example;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.Session;

public class ChatClientPeerHandler implements Runnable, Closeable {

    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000);
    
    @Inject @ApplicationScoped 
    ChatRoom room;


    /** Keeps track if the thread for this ChatClientHandler is alive. */
    volatile boolean alive;

    /** Flag to indicate whether we should close or not. */
    volatile boolean close;

    public ChatClientPeerHandler(Session session, String name) {
        this.session = session;
        this.clientId = name;
    }

    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        System.out.println("ChatClientHandler::::" + sendMessage);
        this.writeQueue.offer(sendMessage);
    }

    @Override
    public void run() {
        alive = true;
        while (true) {
            try {
                
                if (close) {
                    alive = false;
                    break;
                }

                /* Get a message, send a message to client if not null. */
                String message = writeQueue.poll(60, TimeUnit.SECONDS);

                if (message != null) {
                    System.out
                            .printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n",
                                    message, clientId);
                    session.getRemote().sendString(message);
                }
            } catch (InterruptedException e) {
                if (Thread.currentThread().isInterrupted()) {
                    Thread.interrupted();
                }
            } catch (IOException e) {
                alive = false;
                e.printStackTrace();
                break;
            }

        }
    }

    public String getName() {
        return clientId;
    }

    public boolean isAlive() {
        return alive;
    }


    @Override
    public void close() throws IOException {
        close = true;
        if (session != null) {
            session.close();
        }
        room.removeClient(this.clientId);
    }

}

The HTML 5 / JavaScript client that uses this chat room.....

<html>

<head>

<title>Chat Client</title>

<link rel="stylesheet" type="text/css" href="css/style.css"
    media="screen" />


<script type="text/javascript" src="scripts/jquery-1.7.1.js"></script>


<script type="text/javascript">
    var ENTER_KEY = '13';
    var TOKEN_DELIM = "::::";
    
    

    
    function buildWebSocketURL() {
        var url = document.URL;
        var parts = url.split('/');
        var scheme = parts[0];
        var hostPort = parts[2];
        var wssScheme = null;
        
        
        if (scheme=="http:") {
            wssScheme="ws:";
        } else if (scheme=="https:") {
            wssScheme="wss:";
        }
        
        wssUrl = wssScheme + "//" + hostPort  + "/chat/chat";
        
        return wssUrl;
        
    }
    

    var chatUserSession = {
                version : "1.0",
                webSocketProtocol : "caucho-example-chat-protocol",
                webSocketURL : buildWebSocketURL(),
                webSocket : null,//WebSocket
                userName : null
    };

    function chat_sendMessage(message) {
        $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>");

        chatUserSession.webSocket.send("send message" + TOKEN_DELIM
                + chatUserSession.userName + TOKEN_DELIM + message);
    }

    function chat_joinChat() {
        chatUserSession.webSocket.send("add client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_leaveChat() {
        chatUserSession.status(chatUserSession.userName + " is leaving chat");
        chatUserSession.webSocket.send("remove client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_openWebSocket() {
        chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL,
                chatUserSession.webSocketProtocol);
        var socket = chatUserSession.webSocket;

        socket.onmessage = function(msg) {
            chatUserSession.onMessage(msg);
        }

        socket.onerror = function(errorEvent) {
            chatUserSession.onError(errorEvent);
        }

        socket.onopen = function() {
            chatUserSession.onOpen();
        }
        socket.onclose = function(closeEvent) {
            chatUserSession.onClose(closeEvent);
        }

    }

    function chat_onMessage(msgEvent) {
        chatUserSession.status("New Message :" + msgEvent.data);

        $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>");
    }

    function chat_Login() {

        chatUserSession.userName = $("#userName").val();
        $("#loginDiv").hide(500);
        $('#header').text(
                "Chat Client (logging in...) : " + chatUserSession.userName);

        chatUserSession.status(chatUserSession.userName + " is logging in...");
        chatUserSession.open();

    }

    function chat_onOpen() {
        chatUserSession.joinChat();
        chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName);
          $('#header').text(
                    "Chat Client (logged in...) : " + chatUserSession.userName);

        $("#inputArea").show(500);
        $("#statusBar").show(500);
        $("#chatInput").focus();
    }
    
    function chat_Status(message) {
        $('#statusBarPara1').text(message);
        $("#statusBar").show(500);
        
    }

    function chat_onClose(closeEvent) {
        $("#loginDiv").show(500);
        $('#header').text(
                "Chat Client (not connected) : " + chatUserSession.userName);
        $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + 
                ":: Reason: " + closeEvent.reason + 
                " Code: " + closeEvent.code);

        $("#inputArea").hide(500);
        $("#statusBar").show(500);

        $("#userName").val(chatUserSession.userName);
        $("#userName").focus();
    }

    function chat_onError(msg) {
        $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg));
        $("#statusBar").show(500);
    }

    chatUserSession.open = chat_openWebSocket;
    chatUserSession.onMessage = chat_onMessage;
    chatUserSession.onOpen = chat_onOpen;
    chatUserSession.login = chat_Login;
    chatUserSession.onClose = chat_onClose;
    chatUserSession.onError = chat_onError;
    chatUserSession.joinChat = chat_joinChat;
    chatUserSession.sendMessage = chat_sendMessage;
    chatUserSession.leaveChat = chat_leaveChat;
    chatUserSession.status = chat_Status;

    $(document).ready(function() {

        $("#inputArea").hide();
        $("#userName").focus();
        
        

        $("#statusBar").click(function() {
            $("#statusBar").hide(300);
        });

        $("#chatInput").keypress(function(event) {

            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                var textMessage = $("#chatInput").val();

                if (textMessage=="bye!") {
                    chatUserSession.leaveChat();
                } else {
                    $("#chatInput").val("");
                    $("#hint").hide(500);
                    chatUserSession.sendMessage(textMessage);
                }
            }
            event.stopPropagation();
        });

        $("#login").click(function(event) {
            chatUserSession.login();
            event.stopPropagation();
        });

        $("#userName").keypress(function(event) {
            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                chatUserSession.login()
                event.stopPropagation();
            }
        });
    });
</script>

</head>

<body>

    <h1 id="header">Chat Client</h1>


    <div id="statusBar">
        <p id="statusBarPara1">Welcome to Chat App, Click to hide</p>
    </div>

    <div id="loginDiv">
        User name   <input id="userName" type="text" /> <input
            id="login" type="submit" value="Login" />
    </div>


    <div id="inputArea">
        <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p>
        <input id="chatInput" type="text" value="" />
    </div>

    <div id="chatHistoryDiv">
        <p id="chatHistory"></p>
    </div>


</body>

</html>
Personal tools
TOOLBOX
LANGUAGES