WebSocket API Chat Room using JSR 356, poor man's actor model and WebSocketEndpoint annotation take II

From Resin 4.0 Wiki

Revision as of 17:00, 18 November 2012 by Rick (Talk | contribs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

Ok we are now down to three classes, namely, ChatClientPeerHandler, ChatRoom and ThreadedQueue. If no messages are coming in, we take up no threads. If under heavy load, we are limited to the number the thread pool will allow, which is managed by the application server, namely Resin. We will not have 100K to 50K threads if we have 100K to 50K clients.

Websocket JSR 356 API Chat ROOM using latest spec changes, client API, improved session handling, and more


ChatClientPeerHandler

package com.example;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.Session;
import javax.net.websocket.annotations.WebSocketClose;
import javax.net.websocket.annotations.WebSocketEndpoint;
import javax.net.websocket.annotations.WebSocketMessage;
import javax.net.websocket.annotations.WebSocketOpen;

import com.example.concurrent.ThreadedQueue;

@WebSocketEndpoint("/chat")
public class ChatClientPeerHandler implements Closeable {
    
    public ChatClientPeerHandler() {
    }

    @Inject @ApplicationScoped 
    ChatRoom chatRoom;
    
    @Inject Executor executor;


    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue;
    
    ChatRoom room;

    /** Flag to indicate whether we should close or not. */
    volatile boolean close = true;

    
    @WebSocketOpen
    public void onOpen(Session session) {
        System.out.println("@WebSocketOpen " + session);
        this.session = session;
    }
    
    @WebSocketClose
    public void onClose() {
        System.out.println("@WebSocketClose " + session);
        this.session = null;
    }


    @WebSocketMessage
    public void onMessage(String message) {
        
        String[] strings = message.split("::::");
        String clientId = strings[1];
        
        if (this.clientId!=null) {
            if (!this.clientId.equals(clientId)) {
                //This should never happen, but it proves our threads/client handling is correct.
                throw new IllegalStateException("client id " + this.clientId + " does not match " + clientId);
            }
        }
 

        if (message.startsWith("remove client::::")) {
            chatRoom.sendMessage(clientId, clientId
                    + " has become bored and left");
            chatRoom.removeClient(clientId);

        } else if (message.startsWith("send message::::")) {
            String m = strings[2];
            chatRoom.sendMessage(clientId, m);
        } else if (message.startsWith("add client::::")) {
            this.clientId = clientId;
            chatRoom.addClient(clientId, this);
            chatRoom.sendMessage(clientId, clientId
                    + " has join the chat room");

        } else {
            System.err.println("ACK... Don't understand your message!!!!! " + message);
        }
    }
    
    
    void init() {
        close = false;
        if (clientId == null || session == null) {
            throw new IllegalStateException("session and/or clientId is null");
        }
        writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), 
                new Runnable() {                
                    @Override
                    public void run() {
                        ChatClientPeerHandler.this.run();                   
                    }
                },  executor);

    }


    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        if (close) {
            return;
        }
        
        System.out.println("" + this + "sendMessage(" + sendMessage);
        try {
            /* We could do some error handling here
             * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client.
             * We could assume if you are backed up 2000 messages, that you are not active for example
             * This would be a biz logic decision and since this is a sample app... 
             */
            this.writeQueue.put(sendMessage);
        } catch (InterruptedException e) {
            //this should never happen
        }
    }

    private void run() {
        String message;

        /* Get messages, send a message to client if not null, if message is null break. */
        while ((message = writeQueue.poll()) != null) {
            try {
                if (close) {
                    break;
                }
                session.getRemote().sendString(message);
            }  catch (IOException e) {
                e.printStackTrace();
                break;
            }
        }
    }

    public String getName() {
        return clientId;
    }



    @Override
    public void close() throws IOException {
        close = true;
        if (session != null) {
            session.close();
        }
        room.removeClient(this.clientId);
    }

    @Override
    public String toString() {
        return "ChatClientPeerHandler [clientId=" + clientId + "]";
    }

}

ChatRoom

package com.example;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import com.example.concurrent.ThreadedQueue;

@ApplicationScoped
@Startup
public class ChatRoom {

    Map<String, ChatClientPeerHandler> chatClients = new HashMap<String, ChatClientPeerHandler>();
    BlockingQueue<ChatMessage> readQueue ;
    @Inject Executor executor;

    
    public void addClient(String clientId, ChatClientPeerHandler session) {
        offerMessage(new ChatMessage(session, clientId));        

    }

    public void sendMessage(String clientId, String message) {
        offerMessage(new ChatMessage(clientId, message));        
    }

    
    public void removeClient(String clientId) {
        offerMessage(new ChatMessage(clientId));                
    }
    


    
    @PostConstruct
    void init() throws Exception {
        System.out.println("POST CONSTRUCT CALLED");
        
        readQueue = new ThreadedQueue<ChatMessage>(new ArrayBlockingQueue<ChatMessage>(2000), new Runnable() {
            public void run() {
                ChatRoom.this.run();
            }},  executor);
        
    }


    private void run() {
        ChatMessage message;
        
        while ((message = readQueue.poll()) != null) {
             dispatchMessage(message);
        }

    }

    private void launchNewClient(ChatMessage message) {
        System.out.println("launchNewClient::::" + message.getClientId());
        
        message.getSession().init();
        chatClients.put(message.getSession().getName(), message.getSession());
        
    }

    private void dispatchMessage(ChatMessage message) {
        System.out.println("Dispatch Message::" + message);
        
        switch(message.getType()) {
        case ADD_CLIENT:
            launchNewClient(message);
            break;
        case REMOVE_CLIENT:
            doRemoveClient(message.getClientId());
            break;
        case SEND_MESSAGE:
            doSendMessage(message.getClientId(), message.getMessage());
            break;
        default:
             System.err.println("ACK... Don't understand your message!!!!! "
                     + message);
             
        }

    }

    private void doSendMessage(String client, String message) {

        String sendMessage = String.format("%s : %s", client, message);
        System.out.printf("SendMessage::Sending message %s\n", sendMessage);
        Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator();
        while (iterator.hasNext()) {
            ChatClientPeerHandler chatClientHandler = iterator.next();
            
            //prevents sending messages to yourself
            if (client.equals(chatClientHandler.getName())){
                continue;
            }

            System.out.printf("sendMessage::Sending message %s to %s\n",
                        sendMessage, chatClientHandler.getName());
            chatClientHandler.sendMessage(sendMessage);
        }
    }

    private void doRemoveClient(String client) {

        System.out.println("removeClient::::[" + client + "]::::");

        ChatClientPeerHandler chatClientHandler = chatClients.get(client);

        if (chatClientHandler != null) {

            System.out.println("removeClient:::: found " + client
                    + " to remove.");

            chatClients.remove(client);
            try {
                chatClientHandler.close();
            } catch (IOException e) {
            }
        }

    }

    static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT};
    
    static class ChatMessage {

        String message;
        ChatClientPeerHandler session;
        MessageType type;
        String clientId;
        
     
        public MessageType getType() {
            return type;
        }

        public String getClientId() {
            return clientId;
        }

        public ChatMessage(String clientId) {
            this.clientId = clientId;
            this.type=MessageType.REMOVE_CLIENT;
        }
        
        public ChatMessage(ChatClientPeerHandler session, String clientId) {
            this.session = session;
            this.clientId = clientId;
            this.type=MessageType.ADD_CLIENT;
        }


        public ChatMessage(String clientId, String message) {
            this.clientId = clientId;
            this.message = message;
            this.type=MessageType.SEND_MESSAGE;
        }

        public final String getMessage() {
            return message;
        }
        public final ChatClientPeerHandler getSession() {
            return session;
        }

        @Override
        public String toString() {
            return "ChatMessage [message=" + message + ", session=" + session
                    + ", type=" + type + ", clientId=" + clientId + "]";
        }

    }
    
    private void offerMessage(ChatMessage message) {
        try {
            readQueue.offer(message, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }

}

ThreadedQueue

package com.example.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A Queue that wakes up a runnable every time something is added to it.
 * This is a poor man's Actor. The thread is tied to the queue.
 * If you put an item in and there is not a thread, one will be launched.
 */
public class ThreadedQueue<T> implements BlockingQueue<T> {
    BlockingQueue<T> queue;
    Executor executor;
    Runnable queueProcessor;
    AtomicBoolean active = new AtomicBoolean();

    public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor,
            Executor executor) {
        this.queue = queue;
        this.queueProcessor = queueProcessor;
        this.executor = executor;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }

    @Override
    public boolean offer(T item) {
        boolean result = queue.offer(item);

        launchRunnable();

        return result;
    }

    @Override
    public boolean offer(T item, long timeout, TimeUnit unit)
            throws InterruptedException {
        boolean result = queue.offer(item);

        launchRunnable();

        if (result) {
            return result;
        } else {
            result = queue.offer(item, timeout, unit);
            launchRunnable();
        }
        
        return result;
    }

    @Override
    public boolean add(T item) {
        boolean result = queue.add(item);

        launchRunnable();

        return result;
    }

    @Override
    public void put(T item) throws InterruptedException {
        System.out.println("PUT CALLED " + item);
        
        /* Offer it if you can, this is non-blocking. */
        if (queue.offer(item)) {
            launchRunnable();
            return;
        }


        /* If you were unable to offer it, then go ahead and block. */
        queue.put(item);

        launchRunnable();
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {
        boolean result = queue.addAll(c);

        launchRunnable();

        return result;
    }

    @Override
    public T peek() {
        return queue.peek();
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    @Override
    public T element() {
        return queue.element();
    }

    @Override
    public T take() throws InterruptedException {
        return queue.take();
    }

    @Override
    public T remove() {
        return queue.remove();
    }

    @Override
    public Iterator<T> iterator() {
        return queue.iterator();
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <X> X[] toArray(X[] array) {
        return queue.toArray(array);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return queue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return queue.retainAll(c);
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public boolean remove(Object o) {
        return queue.remove(o);
    }

    @Override
    public int drainTo(Collection<? super T> c) {
        return queue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super T> c, int maxElements) {
        return queue.drainTo(c, maxElements);
    }

    public void launchRunnable() {
        System.out.println("LAUNCH RUNNABLE");
        if (queue.isEmpty()) {
            return;
        }

        if (active.compareAndSet(false, true)) {
            executor.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                queueProcessor.run();
                            } finally {
                              active.set(false);  
                            }
                        }
                    });
        }
    }

 }

Personal tools
TOOLBOX
LANGUAGES