Archive

Archive for the ‘Grizzly’ Category

Grizzly : Create a server and client with only few changes

May 24th, 2010 No comments

With Grizzly it can be simple to create a server and client with a few lines changes. I want to show you how you can create or convert a server to a client without to much troubles.

I will use ProtocolChain in the server. For the client there are two ways to create it using the logic of the server. You can use the ProtocolChain or use a CallbackHandler.

I’ll show you how to create the client for the both alternatives.

Let’s start by showing the main lines and later I’ll show you the differences between the ProtocolChain and CallbackHandler implementations for the client.

The complete source code is available at the end of this post.

Let’s take a look if the init method of the server and the clients implementations.

Server

f_tg = new ThreadGroup("ThreadGroup");final CountDownLatch started = new CountDownLatch(1);controller = new Controller();TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler();tcpSelectorHandler.setPort(port);controller.addSelectorHandler(tcpSelectorHandler);

Client

f_tg = new ThreadGroup("ThreadGroup");final CountDownLatch started = new CountDownLatch(1);controller = new Controller();TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler();controller.addSelectorHandler(tcpSelectorHandler);

The difference is that you need to specify the listening port of the server.

Let’s continue with the next step : the close notification and the state listener

The server and clients are identical.

BaseSelectionKeyHandler selectionKeyHandler = new BaseSelectionKeyHandler();// to be notify when a client/server close the connectionselectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() {  public void locallyClosed(SelectionKey key) {      if(s_logger.isDebugEnabled()){        s_logger.debug(key + " is being locally cancelled");      }     }     public void remotlyClosed(SelectionKey key) {       if(s_logger.isDebugEnabled()){         s_logger.debug(key + " is being remotly cancelled (connection closed)");       }     }});tcpSelectorHandler.setSelectionKeyHandler(selectionKeyHandler);// STATE Listenercontroller.addStateListener(new ControllerStateListenerAdapter() {  public void onException(Throwable e) {    s_logger.error("Grizzly controller exception:" + e.getMessage());  }  public void onReady() {    if(s_logger.isDebugEnabled()){      s_logger.debug("Ready!");    }    started.countDown();  }});

Now it’s time the see the ProtocolChain in the server and the client implementations

Server and client (the ProtocolChain implementation)

// the protocol chainEchoQueryProtocolFilter protocolParser = new EchoQueryProtocolFilter();EchoQueryManagerFilter echoManagerFilter = new EchoQueryManagerFilter(this);   final ProtocolChain protocolChain = new DefaultProtocolChain();protocolChain.addFilter(protocolParser);protocolChain.addFilter(echoManagerFilter);((DefaultProtocolChain) protocolChain).setContinuousExecution(true);ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() {       public boolean offer(ProtocolChain protocolChain) {           return false;       }       public ProtocolChain poll() {           return protocolChain;       }   };controller.setProtocolChainInstanceHandler(pciHandler);

Client (the Callbackhandler) : nothing here.

We finish by starting the server and clients

Server

// start the servernew Thread(f_tg, controller).start(); try {  started.await();} catch (Exception e) {  s_logger.error("Timeout in wait" + e.getMessage());}// extra stuff : QueueblockingQueue = new LinkedBlockingQueue();queueConsumer = new QueueConsumer(this, blockingQueue);new Thread(f_tg, queueConsumer,"QueueConsumer").start();

Client

// start the clientnew Thread(f_tg, controller).start(); try {  started.await();} catch (Exception e) {  s_logger.error("Timeout in wait" + e.getMessage());}// extra stuff : QueueblockingQueue = new LinkedBlockingQueue();queueConsumer = new QueueConsumer(this, blockingQueue);new Thread(f_tg, queueConsumer,"QueueConsumer").start();connector_handler = (TCPConnectorHandler) controller.acquireConnectorHandler(Controller.Protocol.TCP);

The difference this time is on the client side. We need to acquire a ConnectorHandler that will be used later.

I want to say that the ProtocolChain is this example are identical for the client and server. Normally you could have a different implementations but the logic is the same. The init method will look alike.

Now let’s take a deeper look into the Client implementations.

On the client side we need to add a method to connect to the server.

There is a little difference between the ProtocolChain and CallbackHandler. In the connect you need to specify a CallbackHandler.
For the ProtocolChain implementation, I was expecting the pass a null handler, and the API will use the ProtocolChain instead, but it doesn’t work. I sent a email to the developers to find if it’s a bug or not. I’ll describe the current way that work.

UPDATED : It was a bug. It fixed with Grizzly 1.9.3+.

Client (ProtocolChain)

public boolean connect() {  try {    //connector_handler.connect(new InetSocketAddress(host, port), new ProtocolChainCallbackHandler(connector_handler));      connector_handler.connect(new InetSocketAddress(host, port), (CallbackHandler)null);  // don't need the callback anymore with 1.9.3+ } catch (Exception e) {    s_logger.error("Exception in execute..." + e);    return false;  }    return true;}

Client (CallbackHandler)

public boolean connect() {  try {    connector_handler.connect(new InetSocketAddress(host, port), new ClientCallbackHandler(connector_handler));  } catch (Exception e) {    s_logger.error("Exception in execute..." + e);    return false;  }    return true;}

The last part is the CallbackHandler. I’ll start by the ProtocolChain implementation followed by the CallbackHandler implementation. I’ll describe
the difference between the implementations, I suggested that you take a look at the source code to get the full overview.

ProtocolChainCallbackHandler

// read eventpublic void onRead(IOEvent ioEvent) {  SelectionKey key = ioEvent.attachment().getSelectionKey();  SelectorHandler selectorHandler = ioEvent.attachment().getSelectorHandler();     // CALL THE PROTOCOL CHAIN  try {    if(key.isValid() && key.isReadable()){      Context ctx = ioEvent.attachment();            ctx.getProtocolChain().execute(ioEvent.attachment());          }  } catch (Exception ex){    if(s_logger.isDebugEnabled()){      s_logger.debug("IOException", ex);    }      selectorHandler.getSelectionKeyHandler().cancel(key);  }  }

ClientCallbackHandler

// read eventpublic void onRead(IOEvent ioEvent) {  SelectionKey key = ioEvent.attachment().getSelectionKey();  SelectorHandler selectorHandler = ioEvent.attachment().getSelectorHandler();   SocketChannel socketChannel = (SocketChannel)key.channel();    try {    if(key.isValid() && key.isReadable()){              // parse the responses here        int count = socketChannel.read(response);        if(count>0){          response.flip();                    byte[] b = new byte[(int)count];          response.get(b);                  // ...... do your own logic.  You can use EchoQueryProtocolParser as example                     response.clear();          if(s_logger.isDebugEnabled()){            s_logger.debug(new String(b)); // for debug purpose           }        }              selectorHandler.register(key, SelectionKey.OP_READ);    }  } catch (IOException ex){    if(s_logger.isDebugEnabled()){      s_logger.debug("IOException", ex);    }      selectorHandler.getSelectionKeyHandler().cancel(key);  }  }

I hope that can show you that with Grizzly it can be really simple to create a server and a client in the same time with just few changes. The source code can be downloaded here.

Categories: Grizzly Tags:

Grizzly : Speedup the ProtocolFilter response time

May 24th, 2010 No comments

Suppose that you are dealing with sql query that you send to a database. Some query could take few seconds to run, that will block a Thread.

Even if you have few Threads in your pool, they could all be stuck there too.

To avoid that you can use the Producer/Consumer pattern.

Take a look at this snippets.

….

// default Pipeline settings
Pipeline pipeline = new DefaultPipeline();
pipeline.setMaxThreads(5);
controller.setPipeline(pipeline);

// the ParserProtocolFilter that will parse the query
QuoteQueryProtocolFilter protocolParser = new QuoteQueryProtocolFilter();

// the ProtocolFilter that will process the query
QuoteQueryManagerFilter quoteManagerFilter = new QuoteQueryManagerFilter();

final ProtocolChain protocolChain = new DefaultProtocolChain();
protocolChain.addFilter(protocolParser);
protocolChain.addFilter(quoteManagerFilter);

….

Suppose we want to simulate a waiting IO, you can do a sleep for 30 seconds.
The effect will be that the manager will wait

public class QuoteQueryManagerFilter implements ProtocolFilter {

public boolean execute(Context context) throws IOException { String query = (String) context.removeAttribute(ProtocolParser.MESSAGE);

 ......

 // that will simulate that the database take 30 sec to complete the query try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); }

 // call the database //databaseManager.execute(query);

 return true;

}

If you send 10 query to your application, the application will block after 5 query (pipeline threads).
The 6th query will be executed after 30 sec.

We can change that behaviour easily.

We will need a Producer and a Consumer with a queue. You can take a LinkedBlockingQueue as a queue.

Here the snippets for the Consumer : (the Thread.sleep is only for testing)

public class Consumer implements IConsumer, Runnable {

protected BlockingQueue<String> blockingQueue;.... 

public void run(){ ..... while(!Thread.currentThread().isInterrupted()){ try { String query = blockingQueue.take();

 System.out.println("     took item=" + query); // now call your database // that will simulate that the database take 30 sec to complete try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } databaseManager.process(command); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } .....}

For the Producer, I use my databaseManager.

public class DatabaseManager {

protected BlockingQueue<QueueCommand> blockingQueue;....

public void init(){

 ThreadGroup tg = new ThreadGroup("Producer/Consumer"); quoteConsumer = new QuoteConsumer(this, blockingQueue); new Thread(tg, quoteConsumer,"QuoteConsumer").start();

}

public void addtoQueue(String query) { System.out.println("add item=" + query); blockingQueue.add(query); }

....}

and don't forget the change the QuoteQueryManagerFilter  : 

public boolean execute(Context context) throws IOException { String query = (String) context.removeAttribute(ProtocolParser.MESSAGE);

  ......

 // call the database databaseManager.addToQueue(query);

 return true;

 }

If you try this, the database will still take 30 seconds to run, but your application will be able to handle the requests.

Categories: Grizzly Tags:

Grizzly : How to be notify when a client disconnect

May 24th, 2010 No comments

It’s now possible to be notify when a client disconnect for a server build on Grizzly 1.9+.

here a little snippets that will allow you that. Thanks to the new ConnectionCloseHandler.

public void init(){

int port = 5000;

try {

Controller controller = new Controller();
TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler();
tcpSelectorHandler.setPort(port);

Pipeline pipeline = new DefaultPipeline();
pipeline.setMaxThreads(5);

controller.setPipeline(pipeline);

BaseSelectionKeyHandler selectionKeyHandler = new BaseSelectionKeyHandler();

// to be notify when a client close the connection
selectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() {

public void locallyClosed(SelectionKey key) {
System.out.println(key + ” is being locally cancelled”);
}

public void remotlyClosed(SelectionKey key) {
System.out.println(key + ” is being remotly cancelled (connection closed)”);
}
});

tcpSelectorHandler.setSelectionKeyHandler(selectionKeyHandler);

controller.addSelectorHandler(tcpSelectorHandler);

….. add here your protocolChain

try {
controller.start();
} catch (IOException e) {
e.printStackTrace();
}

} catch (Exception e) {
System.out.println(“exit”);
}
}

Categories: Grizzly Tags:

New logger Formatter for Grizzly

May 24th, 2010 No comments

I created a new logging formatter for Grizzly, because when we were debugging a test we weren’t able to trace in the log the event because the default formatter in java.util.logging doesn’t print the Thread that log the event. (ouff.. that a long line)

The new formatter is : com.sun.grizzly.util.LoggingFormatter

It’s based on java.util.logging.SimpleFormatter.

It’s really simple to change the formatter used by java.util.logging API.

You will have to create a file logging.properties (can be any name) and add this parameter to the command line.

-Djava.util.logging.config.file=logging.properties

There is already a file name logging.properties that came with the JRE. (jre/lib/logging.properties). You can take this file as sample for your application.

With these file, you will have only one line to change to use the new logging formatter.

* #the default logger is this add you can replace it with LoggingFormatter
* #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
* java.util.logging.ConsoleHandler.formatter = com.sun.grizzly.util.LoggingFormatter

An example of the output will look like that :
[WorkerThreadImpl-1, Grizzly] 2008-10-08 18:49:59 [INFO] com.sun.grizzly.Controller:doSelect message

Categories: Grizzly Tags: ,

Grizzly Migration Guide Part 3

May 24th, 2010 No comments

In this article I’ll explain how to migrate a NIO application non blocking to Grizzly. Grizzly is a step a little further than our previous implementation. Grizzly will simplify all the NIO management and the best part is that the framework will do for almost all your needs with the defaults settings. The hardest part will be to learn a new API (Grizzly), but like I said Grizzly will do the job with the default settings, so often you will be fine by doing “copy/paste” of your previous project using Grizzly. Let’s write code and use it in any application.

Lets take a overview of the changes that we will have to do.


I’ll start by the GrizzlyConnectionListener. Because we are doing NIO our-self all we did for the communication handling has to be update to Grizzly. We don’t need the class ConnectionAcceptor, and all the logic for parsing the incoming message that was into ClientConnectionHandler was moved into a ParserProtocolFilter. I’ll explain all the new concepts soon, but start by taking a look at GrizzlyConnectionListener.

/**
* Init
*/
public void init() {

    System.out.println("listening for incomming TCP Connections on port : " + f_port);
    try {

        f_controller = new Controller();
        TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler();
        tcpSelectorHandler.setPort(f_port);

        Pipeline pipeline = new DefaultPipeline();
        pipeline.setMaxThreads(5);

        f_controller.setPipeline(pipeline);

        tcpSelectorHandler.setSelectionKeyHandler(new BaseSelectionKeyHandler());

        f_controller.addSelectorHandler(tcpSelectorHandler);

        QuoteQueryProtocolFilter protocolParser = new QuoteQueryProtocolFilter();
        QuoteQueryManagerFilter quoteManagerFilter = new QuoteQueryManagerFilter(f_quoteManager);

        final ProtocolChain protocolChain = new DefaultProtocolChain();
        protocolChain.addFilter(protocolParser);
        protocolChain.addFilter(quoteManagerFilter);
        ((DefaultProtocolChain) protocolChain).setContinuousExecution(true);

        ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() {

            public ProtocolChain poll() {

                return protocolChain;
            }

            public boolean offer(ProtocolChain protocolChain) {
                return false;

            }
        };

        f_controller.setProtocolChainInstanceHandler(pciHandler);
        try {
            f_controller.start();
        } catch (IOException e) {
            e.printStackTrace();
        }

    } catch (Exception e) {
        System.exit(-10);
    }
}

That contains lot of new classes that we never saw before, but it not that hard to understand. I will start by explaining how we start listen for incoming TCP connection with NIO.

// Create a non-blocking server socket channel and bind to the port
f_serverSocketChannel = ServerSocketChannel.open();
f_serverSocketChannel.configureBlocking(false);
f_serverSocketChannel.socket().bind(new InetSocketAddress(f_port));

// Create the selector and bind the server socket to it
f_selector = Selector.open();
f_serverSocketChannel.register(f_selector, SelectionKey.OP_ACCEPT, new ConnectionAcceptor(f_selector, f_serverSocketChannel, f_quoteManager));

With grizzly it’s lot simpler than that :

Controller controller = new Controller();
TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler();
tcpSelectorHandler.setPort(f_port);
controller.addSelectorHandler(tcpSelectorHandler);

If we wanted to listen for UDP or even SSL it’s not more complicated than add a new SelectorHandler. Grizzly offer few protocol handler : TCPSelectorHandler, UDPSelectorHandler, SSLSelectorHandler. You can create your own handler if you want, just need to implements SelectorHandler.

I said that Grizzly came with lot of default settings that will do the job for most of us. One of these settings is the build in ThreadPool. In Grizzly it’s called : Pipeline. You can change the Pipeline in the controller with a few lines.

Pipeline pipeline = new DefaultPipeline();
pipeline.setMaxThreads(5);
controller.setPipeline(pipeline);

I described how to start a listening controller, but I didn’t talk how we will manage the Selector and SelectionKey. In Grizzly, all that was really simplify into a SelectionKeyHandler. I have to say it again, but Grizzly came with default settings even for SelectionKeyHandler. DefaultSelectionKeyHandler is the default handler. You have be to warn about this handler. This handler have a build in timeout. You can change the timeout, if you want, but in this guide, I need a handler that won’t timeout. The clients must keep a permanent connection with the server. If we had a web approach it will make sens to have timeout for connections.

This snippet will allow you to change the timeout value .

DefaultSelectionKeyHandler keyHandler = new DefaultSelectionKeyHandler();
//keep connection for 30 minutes
keyHandler.setTimeout(30 * 1000 * 60);

tcpSelectorHandler.setSelectionKeyHandler(keyHandler);

In this migration guide, I’ll use a handler that don’t timeout : BaseSelectionKeyHandler.

tcpSelectorHandler.setSelectionKeyHandler(new BaseSelectionKeyHandler());

The next step will be to how do we read messages from the clients and how do we send the response to them. In the previous article “Grizzly Migration Guide Part 2” we had complete control over the SocketChannels, but in this article Grizzly will take that control.

There are 3 simples steps to do that in Grizzly. (These steps our for the purpose of this guide. You don’t have to respect that order)

First : create a ParserProtocolFilter : This filter will parse the incoming message from the client, and try to extract a valid query.

Second : create a ProtocolFilter : This filter will receive the valid query return by the previous filter and process the query

Third : create a ProtocolHandler that will handle the previous filters and set the order. (implements the pattern : Chain of Responsibility)

    QuoteQueryProtocolFilter protocolParser = new QuoteQueryProtocolFilter();
    QuoteQueryManagerFilter quoteManagerFilter = new QuoteQueryManagerFilter(f_quoteManager);

    final ProtocolChain protocolChain = new DefaultProtocolChain();
    protocolChain.addFilter(protocolParser);
    protocolChain.addFilter(quoteManagerFilter);
    ((DefaultProtocolChain) protocolChain).setContinuousExecution(true);

    ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() {
        public ProtocolChain poll() {
            return protocolChain;
        }

        public boolean offer(ProtocolChain protocolChain) {
            return false;
            }
        };

    controller.setProtocolChainInstanceHandler(pciHandler);

Finally, we can start the application with this simple line :

controller.start();

Put meat on theses Filters.

I explained how to start the application and how to handle listen for incoming connection, now it’s time to go into details. The ProtocolFilter must contains the logic how to parse the incoming data. In the previous implementation all the logic was done into the ClientConnectionHandler. With the pattern : Chain of Responsibility, you must divided your code into smaller command.

I’ll start by the Filter that will parse the incoming data from a client : QuoteQueryProtocolFilter

public class QuoteQueryProtocolFilter extends ParserProtocolFilter {

    @Override
    public ProtocolParser newProtocolParser() {
        return new QuoteQueryProtocolParser();
    }

}

public class QuoteQueryProtocolParser implements ProtocolParser<String> {
    // the limit, if the limit is reach the connection will be closed
    protected static final int LIMITBB = 5;
    protected CharsetDecoder f_asciiDecoder = Charset.forName("ISO-8859-1").newDecoder();
    protected ByteBuffer processingBuffer;
    protected String query = null;
    private boolean eoqFound = false;

    private boolean maxBufferReached = false;

    // first method to been called
    /**
    * Is this ProtocolParser expecting more data ?
    *
    * This method is typically called after a call to <code>parseBytes()</code>
    * to determine if the {@link ByteBuffer} which has been parsed
    * contains a partial message
    *
    * @return - <tt>true</tt> if more bytes are needed to construct a
    *           message;  <tt>false</tt>, if no
    *           additional bytes remain to be parsed into a <code>T</code>.
    *          Note that if no partial message exists, this method should
    *         return false.
    */
    public boolean isExpectingMoreData() {

        System.out.println("isExpectingMoreData");

        // we need to loop until when get a query
        return !eoqFound;
    }

    // next method after isExpectingMoreData or releaseBuffer
    /**
    * Are there more bytes to be parsed in the {@link ByteBuffer} given
    * to this ProtocolParser's <code>setBuffer</code> ?
    *
    * This method is typically called after a call to <code>parseBytes()</code>
    * to determine if the {@link ByteBuffer} has more bytes which need to
    * parsed into a message.
    *
    * @return <tt>true</tt> if there are more bytes to be parsed.
    *         Otherwise <tt>false</tt>.
    */
    public boolean hasMoreBytesToParse() {
        System.out.println("hasMoreBytesToParse");
        /*
        if (debug) System.out.println("hasMoreBytesToParse()");
        if (savedBuffer == null) {
        if (debug) System.out.println("hasMoreBytesToParse() savedBuffer == null, return false");
        return false;
        }
        */
        return eoqFound && processingBuffer != null && processingBuffer.position() > 0;
    }

    // if isExpectingMoreData : 2th method
    /**
    * No more parsing will be done on the buffer passed to
    * <code>startBuffer.</code>
    * Set up the buffer so that its position is the first byte that was
    * not part of a full message, and its limit is the original limit of
    * the buffer.
    *
    * @return -- true if the parser has saved some state (e.g. information
    * data in the buffer that hasn't been returned in a full message);
    * otherwise false. If this method returns true, the framework will
    * make sure that the same parser is used to process the buffer after
    * more data has been read.
    */
    public boolean releaseBuffer() {
        System.out.println("releaseBuffer");

        if (processingBuffer != null) {
            processingBuffer.compact();
        }

        processingBuffer = null;

        eoqFound = false;

        return false;
    }

    // method after hasMoreBytesToParse if it's true
    /**
    * Set the buffer to be parsed. This method should store the buffer and
    * its state so that subsequent calls to <code>getNextMessage</code>
    * will return distinct messages, and the buffer can be restored after
    * parsing when the <code>releaseBuffer</code> method is called.
    */
    public void startBuffer(ByteBuffer bb) {
        System.out.println("startBuffer");

        bb.flip();
        processingBuffer = bb;

        //System.out.println("capacity=" + processingBuffer.capacity());

    }

    /**
    * Get the next complete message from the buffer, which can then be
    * processed by the next filter in the protocol chain. Because not all
    * filters will understand protocol messages, this method should also
    * set the position and limit of the buffer at the start and end
    * boundaries of the message. Filters in the protocol chain can
    * retrieve this message via context.getAttribute(MESSAGE)
    *
    * @return The next message in the buffer. If there isn't such a message,
    *    return <code>null.</code>
    *
    */
    public String getNextMessage() {
        System.out.println("getNextMessage");
        if (maxBufferReached) {
            return "MAX";
        }
        return query;
    }

    /**
    * Indicates whether the buffer has a complete message that can be
    * returned from <code>getNextMessage</code>. Smart implementations of
    * this will set up all the information so that an actual call to
    * <code>getNextMessage</code> doesn't need to re-parse the data.
    */
    public boolean hasNextMessage() {
        System.out.println("hasNextMessage");

        if (processingBuffer == null) {
            return false;
        }
        if (processingBuffer.hasRemaining()) {
            System.out.println("hasNextMessage: " + new String(processingBuffer.array(), processingBuffer.arrayOffset(), processingBuffer.remaining()));
            // decode the buffer
            String msg = null;
            try {

                ByteBuffer tmp = processingBuffer.duplicate();
                //tmp.flip();   // not needed because processingBuffer was previously flip
                msg = f_asciiDecoder.decode(tmp).toString();
            } catch (CharacterCodingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            //System.out.println("msg=" + msg);

            int index = msg.indexOf("[eoq]");
            if (index > -1) {

                query = msg.substring(0, index);
                //System.out.println("Query = " + query);

                // We need to kept what is after the EOQ
                processingBuffer.clear();
                String substr = msg.substring(index + "[eoq]".length());
                System.out.println("index: " + index + " substr: " + substr + " substr.len: " + substr.length());
                processingBuffer.put(msg.substring(index + "[eoq]".length()).getBytes());
                processingBuffer.flip();
                eoqFound = true;
            } else {
                // Check if buffer is full
                if (processingBuffer.remaining() == processingBuffer.capacity()) {
                    // If full - reallocate

                    // but check if the max length is attein
                    if (processingBuffer.capacity() + processingBuffer.remaining() < LIMITBB) {
                        ByteBuffer newBB = ByteBufferFactory.allocateView(processingBuffer.capacity() * 2, processingBuffer.isDirect());
                        newBB.put(processingBuffer);
                        processingBuffer = newBB;
                        WorkerThread workerThread = (WorkerThread) Thread.currentThread();
                        workerThread.setByteBuffer(processingBuffer);
                    } else {
                        System.out.println("BUFFER MAX REACH!");

                        processingBuffer.clear();

                        maxBufferReached = true;

                        return maxBufferReached;
                    }
                }

                eoqFound = false;
            }

        }

        //System.out.println("hasNextMessage() result = " + eoqFound);

        return eoqFound;
    }
}

Let’s explain the flow of this parser, it will do simpler to understand. Each functions do a specific work. Here the order.

isExpectingMoreData
startBuffer
hasNextMessage
getNextMessage

The main function is hasNextMessage. This function will parse the ByteBuffer and check if there is a valid query. The logic is the same as the previous implementation except few details. Before we were reading from the SocketChannel and filling a ByteBuffer with the data, but now Grizzly do that magic for us. We will receive a ByteBuffer that contains the data read from the client in the function : startBuffer(ByteBuffer).

Other than the ByteBuffer receive by Grizzly there is another change that required our attention. It the case when the query was not found into the buffer and when the buffer contains more than one query. We need to keep the data into the buffer for the next time. We can do that by adding a new buffer into the context of Grizzly. I’ll give you a snippet for the case when the ByteBuffer is not small we need to be resize.

ByteBuffer newBB = ByteBufferFactory.allocateView(
processingBuffer.capacity() * 2,
processingBuffer.isDirect());
newBB.put(processingBuffer);
processingBuffer = newBB;
WorkerThread workerThread = (WorkerThread) Thread.currentThread();
workerThread.setByteBuffer(processingBuffer);  // put the buffer into the context for the next time

When we found a query in the function hasNextMessage we return true, so the getMessage function will be called and the return of getMessage, will be used in the next Filter.

Now that we found a valid query, we need to process it. The process will still be done like it was in the previous implementation, using the ICommand, but the difference is how you start? Grizzly put in the context the value returns by the previous filter (getMessage).

public class QuoteQueryManagerFilter implements ProtocolFilter {

    private static final String CLIENT_CONNECTION_HANDLER_ATTR = "connection-handler";

    private QuoteManager manager;

    public QuoteQueryManagerFilter(QuoteManager manager) {
        this.manager = manager;
    }

    public boolean execute(Context context) throws IOException {
        String query = (String) context.removeAttribute(ProtocolParser.MESSAGE);

        if (query == null || query.trim().length() == 0) {
            return false;
        }

        System.out.println("query = " + query);

        /*
        * For now instead of keeping whole Context, you can just keep SelectionKey as reference to the client,
        * and send response back using: selectorHandler.getAsyncQueueWriter().write(SelectionKey, ...);
        */

        // on va chercher le ClientConnectionHandler si il existe
        ClientConnectionHandler clientConnectionHandler = retrieveConnectionHandler(context);

        if (query.equals("MAX")) {
            clientConnectionHandler.close();
            return false;
        }

        manager.processQuery(clientConnectionHandler, query);

        return true;
    }

    public boolean postExecute(Context context) throws IOException {
        return true;
    }

    private ClientConnectionHandler retrieveConnectionHandler(Context context) {
        ClientConnectionHandler clientConnectionHandler = null;

        AttributeHolder connectionAttrs = context.getAttributeHolderByScope(AttributeScope.CONNECTION);
        if (connectionAttrs != null) {
            clientConnectionHandler = (ClientConnectionHandler) connectionAttrs.getAttribute(CLIENT_CONNECTION_HANDLER_ATTR);
        } else {
            WorkerThread workerThread = (WorkerThread) Thread.currentThread(); // Detach the current Thread data.
            connectionAttrs = workerThread.getAttachment();
            // Attach it to the SelectionKey so the it can be resumed latter.
            context.getSelectionKey().attach(connectionAttrs);
        }

        if (clientConnectionHandler == null) {
            clientConnectionHandler = new ClientConnectionHandler(manager, context.getSelectionKey(), context.getSelectorHandler());
            connectionAttrs.setAttribute(CLIENT_CONNECTION_HANDLER_ATTR, clientConnectionHandler);
        }

        return clientConnectionHandler;
    }

}

The first method that will be called is : execute.

We retrieve the query in the context with this line :

String query = (String) context.removeAttribute(ProtocolParser.MESSAGE);

It’s the next part that is more complex, how retrieve the SelectionKey and the Selector that’s related to the client that sent the query that we parsed. All the logic in done in retrieveConnectionHandler. Like other thing.. all it’s in the context. So we retrieve the SelectionKey with context.getSelectionKey() and the Selection with context.getSelectorHandler().

Now that we have the ClientConnectionHandler created, I can explain how to send the response to the client. Don’t be surprise, it’s not that hard to send message to the client after all the previous steps. It took a long time to arrive at this points. What we have to change is in the IResponseHandler.

We have to change to lines :

// use for the retry is needed
SelectionKey key = null;
Selector writeSelector = null;
int attempts = 0;

try {

    while (writeBuffer.hasRemaining() && clientConnectionHandler.getSocketChannel().isOpen()) {
        int len = clientConnectionHandler.getSocketChannel().write(writeBuffer);
        if (len <= 0) {
            attempts++;
            if (writeSelector == null) {
                writeSelector = Selector.open();
                if (writeSelector == null) {
                    // Continue using the main one.
                    continue;
                }
                key = clientConnectionHandler.getSocketChannel().register(writeSelector, SelectionKey.OP_WRITE);
            }

            if (writeSelector.select() == 0) {
                if (attempts > 2)
                    throw new IOException("Client disconnected");
            }
        }

    }

} catch (IOException e) {
    e.printStackTrace();
    // le client n'est pas connecte
    clientConnectionHandler.close();

} finally {
    if (key != null) {
        key.cancel();
        key = null;
    }

    if (writeSelector != null) {
        // Cancel the key.
            try {
                writeSelector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    writeBuffer.rewind();

}

by

try {
    if (clientConnectionHandler.getKey().isValid()) {
        clientConnectionHandler.getSelectorHandler().getAsyncQueueWriter().write(clientConnectionHandler.getKey(), writeBuffer);
    }
} catch (IOException e) {
    e.printStackTrace();
    // something obtain this error

    // java.nio.channels.ClosedChannelException

    // le client n'est pas connecte
    clientConnectionHandler.close();
}

writeBuffer.rewind();

The magic is done by this writer : getSelectorHandler().getAsyncQueueWriter(). It’s a non blocking writer.

That’s completed the Grizzly Migration Guide. I didn’t go into the API in details, but I give baseline of Grizzly. Grizzly is really more advanced than that, and I could had use others API offer by Grizzly and sub projects, but I wanted to show you how to migrate from a blocking NIO application into a new NIO framework using the core of this framework. I Hope you enjoy this guide, and will come back to read the next one : How to create a real time stock quotes client using Grizzly.

Download the source code here : to run the application java -jar nio_quotestock_demo_v3.jar

Categories: Grizzly Tags: ,

Grizzly Migration Guide Part 2

May 24th, 2010 No comments

The article will focus on how to migrate the demo in “Grizzly Migration Guide Part 1” to use non blocking nio connection. The nio Selector and the nio Channels in the JDK 1.4+, will be used. If you are not familiar with theses terms, like I was before doing this guide, I recommend you to start by reading about the concepts. A good starting point will be to read this article. I really want to focus how to migrate an actual application that is not using Selector, and for this reason, I’ll skip the principals behind that.

The task to migrate the application is not as complicated that it sounds. All the business logic stay the same, only the way to send and read data to the client changed. How do you read and write data? How do you listen for incoming connection? Is it possible to now create a Thread by client connection?


Let’s take a look to the two last questions to see what are we actually doing. The class SocketConnectionListener listens for incoming connection as follows :

public void init() {

    System.out.println("listening for incoming TCP Connections on port : " + f_port);
    try {
        f_serverSocketChannel = ServerSocketChannel.open();
        f_serverSocketChannel.socket().bind(new InetSocketAddress(f_port));

        f_threadGroup = new ThreadGroup("ClientConnectionHandlerGroup");

    } catch (Exception e) {
        System.exit(-10);
    }
}

if we want to use non blocking we will do few little changes (take a the first Image)

public void init() {

    System.out.println("listening for incoming TCP Connections on port : " + f_port);
    try {
        // Create a non-blocking server socket channel and bind to the port
        f_serverSocketChannel = ServerSocketChannel.open();
        f_serverSocketChannel.configureBlocking(false);
        f_serverSocketChannel.socket().bind(new InetSocketAddress(f_port));

        // Create the selector and bind the server socket to it
        f_selector = Selector.open();
        f_serverSocketChannel.register(f_selector, SelectionKey.OP_ACCEPT, new ConnectionAcceptor(f_selector, f_serverSocketChannel, f_quoteManager));

        f_threadGroup = new ThreadGroup("ClientConnectionHandlerGroup");

    } catch (Exception e) {
        System.exit(-10);
    }
}

and the core of this class in the run ()

public void run() {

    init();

    while (!isShutdown()) {
        try {
            SocketChannel socketChannel = f_serverSocketChannel.accept();

            // create a new clientConnectionhandler
            ClientConnectionHandler client = new ClientConnectionHandler(getQuoteManager());
            client.setSocketChannel(socketChannel);

            System.out.println("new client connection established");

            Thread t = new Thread(f_threadGroup, client, "Client");
            t.start();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

and the new implementation, things changes more in the run () instead of accepting the new connection in the run, we will dispatch that to a new class ConnectionAcceptor and we won’t need a Thread for the class ClientConnectionHandler.

Let’s take a look to the new code.

public void run() {

    init();

    while (!isShutdown()) {
        try {

            f_selector.select();

            // Get list of selection keys with pending events
            Iterator<SelectionKey> it = f_selector.selectedKeys().iterator();

            // Process each key
            while (it.hasNext()) {
                // Get the selection key
                SelectionKey selKey = it.next();

                // Remove it from the list to indicate that it is being processed
                it.remove();

                // Check if it's a connection request
                if (selKey.isValid() && selKey.isAcceptable()) {
                    ConnectionAcceptor connectionAcceptor = (ConnectionAcceptor) selKey.attachment();
                    connectionAcceptor.accept();
                }

                // Check if a message has been sent
                if (selKey.isValid() && selKey.isReadable()) {
                    ClientConnectionHandler connectionReader = (ClientConnectionHandler) selKey.attachment();
                    connectionReader.read();
                }
            }

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

It’s a little more complicated, but still easily readable. Instead of trying to read on a socket, the class will receive a event. The events will be received by the SelectionKey and we process the event the same way we can process Swing event. We validate the kind of event, and we do the proper work. Like I said previously, a new class was added : ConnectionAcceptor. This class will handle the new connections and will register to the event wanted. In this case the event is read, called SelectionKey.OP_READ. We use the event OP_READ to read data from the client.

Because we receive event, we didn’t need a Thread for the ClientConnectionHandler anymore, so we can remove the “implements Runnable” and replace the run() by a read() function instead.

public void read() {
    try {
        String query = getQuery(f_buf);

        if (query != null) {
            System.out.println("query found [" + query + "]");
            f_manager.processQuery(this, query);
        }

    } catch (Exception e) {
        // Connection may have been closed
        e.printStackTrace();
        // CLOSE
        close();
    }
}

This way is a better approach that the previous one, because we don’t pool anymore (see that the while(…) has been remove) or block the thread on the read, instead the read function is called when a OP_READ event is receive. That’s cover the messages sent by the client, but how what’s changed when sending response to that client ?

Sending response back to the client

The next step will be how do we send message to the client. If you remember the “first article” there is only one interface that is used to send messages to the client : IResponseHandler. So all the changes that had to be done are in classes that implements it : FeedResponseHandler and QuoteResponseHandler. I will do like in the previous article, and show the modification do to FeedResponseHandler because it’s a little more complex that the other.

Let’s compare the previous implementation and the new one.

@Override
public void sendToClient(StringBuffer sb) {

    ByteBuffer writeBuffer = ByteBuffer.allocate(sb.toString().getBytes().length);

    String quoteSubcription = getCommand().getSubscription();

    List<ClientConnectionHandler> list = getCommand().getQuoteManager().getClientHandlerByQuoteSubcription(quoteSubcription);

    // HOW to skip the clientConnectionHandler if it was closed ?

    if (list != null) {
        for (Iterator<ClientConnectionHandler> iterator = list.iterator(); iterator.hasNext();) {
            ClientConnectionHandler clientConnectionHandler = iterator.next();

            writeBuffer.put(sb.toString().getBytes());
            writeBuffer.flip();

            System.out.println("SENDING FEED TO CLIENT = [" + sb.toString() + "]");

            try {
                if (clientConnectionHandler.getSocketChannel().isConnected()) {
                    clientConnectionHandler.getSocketChannel().write(writeBuffer);
                }
            } catch (IOException e) {
                e.printStackTrace();
                // sometime obtain this error

                // java.nio.channels.ClosedChannelException

                // le client n'est pas connecte
                clientConnectionHandler.close();
            }

            writeBuffer.rewind();

        }
    }
}

The previous implementation is really straight through : write the ByteBuffer into the socket, but this one is a little bulletproof. Because I added code to validate if the byte are really written to the SocketChannel, it will be more work. The part that’s in really important is when the write operation return -1. That’s mean that the information were not sent to the client, and if we want to try again, we will need to use Selector. Take a quick look into this snippet, and I’ll explain it by steps.

@Override
public void sendToClient(StringBuffer sb) {

    ByteBuffer writeBuffer = ByteBuffer.allocate(sb.toString().getBytes().length);

    String quoteSubcription = getCommand().getSubscription();

    ConcurrentSkipListSet<ClientConnectionHandler> list = getCommand().getQuoteManager().getClientHandlerByQuoteSubcription(quoteSubcription);

    // HOW to skip the clientConnectionHandler if it was closed ?

    if (list != null) {
        for (Iterator<ClientConnectionHandler> iterator = list.iterator(); iterator.hasNext();) {
            ClientConnectionHandler clientConnectionHandler = iterator.next();

            writeBuffer.put(sb.toString().getBytes());
            writeBuffer.flip();

            System.out.println("SENDING FEED TO CLIENT = [" + sb.toString() + "]");

            // use for the retry is needed
            SelectionKey key = null;
            Selector writeSelector = null;
            int attempts = 0;

            try {

                while (writeBuffer.hasRemaining() && clientConnectionHandler.getSocketChannel().isOpen()) {
                    int len = clientConnectionHandler.getSocketChannel().write(writeBuffer);
                    if (len <= 0) {
                        attempts++;
                        if (writeSelector == null) {
                            writeSelector = Selector.open();
                            if (writeSelector == null) {
                                // Continue using the main one.
                                continue;
                            }
                            key = clientConnectionHandler.getSocketChannel().register(writeSelector, SelectionKey.OP_WRITE);
                        }

                        if (writeSelector.select() == 0) {
                            if (attempts > 2)
                                throw new IOException("Client disconnected");
                        }
                    }

                }

            } catch (IOException e) {
                e.printStackTrace();
                // le client n'est pas connecte
                clientConnectionHandler.close();

            } finally {
                if (key != null) {
                    key.cancel();
                    key = null;
                }

                if (writeSelector != null) {
                    // Cancel the key.
                    try {
                        writeSelector.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            writeBuffer.rewind();

        }
    }
}

First you need to get a Selector. Only one line of code is needed : writeSelector = Selector.open(); Now that you have the Selector, you need to register the event that you want to used. In this case is OP_WRITE. By registering to a event, you will receive a SelectionKey (that will be needed later in the close step at the end). Before trying to write again, you need to be sure that’s the Selector is ready to write by returning a value > 0 : writeSelector.select() . If the Selector return 0, you will take that as a failed attempts and try again.

In this demo, I’ll try 3 times to write on the SocketChannel and do a nice clean in the finally block.

Sometime using mutiples Thread are multiples clients, the risk of having a concurrency problem is high. That’s why in this implementation I changed List<ClientConnectionHandler> by ConcurrentSkipListSet<ClientConnectionHandler>.

We don’t have nothing else to migrate, everything was done in few Classes : SocketConnectionListener and IResponseHandler. Theses littles changes will make a difference and when starting a new NIO application is better to take that approach first. Even if it’s not hard to migrate the first implementation to this one, it’s still not easy to understand it all. If your prefer to let others do the NIO handling and concentrate on the business logic of your application, you could take a NIO framework.

Download the source code here : to run the application java -jar nio_quotestock_demo_v2.jar

In the next article :”Grizzly Migration Guide Part 3” I will explain how to migrate to a NIO Framework : Grizzly.

Categories: Grizzly Tags: ,

Grizzly Migration Guide Part 1

May 24th, 2010 No comments

This article will focus on creating a demo application build on the framework Grizzly. I wanted to go in more details that the hello-world’s applications. It’s not easy at first to use a new framework and even more complicated when the framework is actively in development or lack of samples and documentation.

This guide will follow the steps how to migrate a real world application to Grizzly framework. I choose to do a “real-time stock quote” application. I’ll only focus on the server part, the client will be left for further article.

The principles :

To provide the service of quotes on stocks, the server will have a connection with a third party that provide this service. The server will listen for requests from the clients, and give back a response in real-time. The client will ask for quotes or a quote feed (will receive all the updates on the stock). The clients can register to more than one quote feed. The server will handle around 5000 stock’s feeds in real-time. When the server lost the connection with the third party, the server will reconnect and resubscribe the quote feeds that was previous register. A client can unsubscribe to a quote feed anytime.

I think that covert most of the cases that can happen in real world.

This part will focus on creating the first implementation of the demo using plain NIO. To simplify the demo, I won’t use externals libraries. I will use the JDK 6 for this demo, if you need to compile it on a previous JDK, you will have to adapt the code, and if you plan to do that, you could use “backport” libraries to replace the concurrent API.

The main class is QuoteManager. This class contain the logic behind the feed’s subscription and the client’s requests. Here the most important methods in QuoteManager.

protected void init() {

    quoteSubscriptionCache = new ConcurrentHashMap<String, IResponseHandler>();
    clientHandlerByQuoteSubcriptionCache = new ConcurrentHashMap<String, List<ClientConnectionHandler>>();
    clientSubcriptionListCache = new ConcurrentHashMap<ClientConnectionHandler, ConcurrentSkipListSet<String>>();

    f_tg = new ThreadGroup("Connection IN/OUT");

    // 3th party connection
    thirdConnection = new ThirdPartyConnectionHandler();
    thirdConnection.setQuoteManager(this);

    new Thread(f_tg, thirdConnection, "ThirdPartyConnectionHandler").start();

    // incoming connection
    incomingConnectionListener = new SocketConnectionListener();
    incomingConnectionListener.setQuoteManager(this);
    incomingConnectionListener.setPort(INCOMING_PORT);

    // start the listening service
    new Thread(f_tg, incomingConnectionListener, "Incoming Connection").start();

}

In this implementation we will create one Thread by client connection and blocking SocketChannel for communicating with the client. It’s not the best way to do that, but I will address that in the “Grizzly Migration Guide Part 2“.

/**
 * processing
 *
 * - Open socket
 * - a new Thread (ClientConnectionHandler)
 * - start the new thread
 */
public void run() {

    init();

    while (!isShutdown()) {
        try {
            SocketChannel socketChannel = f_serverSocket.accept();

            // create a new clientConnectionhandler
            ClientConnectionHandler client = new ClientConnectionHandler(getQuoteManager());
            client.setSocketChannel(socketChannel);

            System.out.println("new client connection established");

            Thread t = new Thread(f_threadGroup, client, "Client");
            t.start();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

When a query will be return by the function : getQuery(), the QuoteManager will process it. Now take a look how we parse the requests send by the client.

/**
* find the query formatted : xxx|symbol[eoq]
*
* the buffer could contain more than one query.
*
* ex : msg1 = quote|aaa[eoq]
*    : msg2 = feed|bbb[eoq]_zzzzz
*
* The first query will be return and the rest will be kept in the buffer
* for the next pass.
*
* @param buf buf
* @return the query found
* @throws IOException exception
*/
public String getQuery(ByteBuffer buf) throws IOException {

    while (f_socketChannel.isConnected()) {

        // check if there is already a request in the buffer
        String query = parseQuery();
        if (query != null) {
            return query;
        }

        // Clear the buffer and read bytes from socket
        buf.clear();

        int numBytesRead = f_socketChannel.read(buf);

        if (numBytesRead == -1) {
            // No more bytes can be read from the channel
            close();
        } else {
            // To read the bytes, flip the buffer
            buf.flip();

            if (buf.hasRemaining()) {

                //System.out.println("Remaining=" + buf.remaining());

                // on remplit le BB cumulatifBB avec le BB buf
                f_cumulatifBB = getBB(f_cumulatifBB, buf);

                // parseQuery
                return parseQuery();

            }
        }
    }

    return null;
}

In this function we do two criticals things :

read from the socketChannel and check if there is a valid query in the buffer

copy the remaining data in the buffer to a cumulative buffer from the next time.

the line :int numBytesRead = f_socketChannel.read(buf); is the blocking part. The function will block until data is receive on the socketChannel.

The next step is parse the buffer and find a valid query. In this application I didn’t use a fix length message, it’s really dynamic. The query finish when the pattern “[eoq]” is found.

/**
* Parse the buffer and look for a valid query
* @return a Query
* @throws IOException exception
*/
protected String parseQuery() throws IOException {

    String query = null;

    if (f_cumulatifBB.hasRemaining()) {
        // On lit le buffer cumulatif pour voir si on trouverait la EOQ
        ByteBuffer tmp = f_cumulatifBB.duplicate();
        tmp.flip();

        // decode the buffer
        String msg = f_asciiDecoder.decode(tmp).toString();

        //System.out.println("msg=" + msg);

        int index = msg.indexOf("[eoq]");
        if (index > -1) {

            query = msg.substring(0, index);
            //System.out.println("Query = " + query);

            // We need to kept what is after the EOQ
            f_cumulatifBB.clear();
            f_cumulatifBB.put(msg.substring(index + "[eoq]".length()).getBytes());
        } else {
            //System.out.println("no EOQ in this iteration");
        }
    }

    return query;
}

Because the query is not fix length, we have to check each time if “[eoq]” is found. If the query is found, we extract it from the buffer and keep the remaining data to another buffer, because it’s possible to receive multiple request in the same buffer. Like this :

feed|aaa[eoq]feeb|b

In this pass we will extract the query “feed|aaa[eoq]” and keep the rest into the buffer : “feed|b” and the next time we receive data we will have “feed|bbb[eoq]” into the buffer.

Processing the command

The query parsed by the ClientConnectionHandler will be send to the QuoteManager.

Like I said there are only 3 commands availables, the each command has is ICommand implementation. The processing of the query is done by his ICommand class. The QuoteManager obtain the ICommand by getCommand().

/**
* Process the request from the client.
*
* @param clientHandler the connection from the client
* @param query the query
*/
public void processQuery(ClientConnectionHandler clientHandler, String query) {

    // extract the command
    int index = query.indexOf(ARG_SEP);
    String commandName = query.substring(0, index);

    ICommand command = getCommand(commandName);

    if (command == null) {
        System.out.println("Commande non supportee : commandName = [" + commandName + "]");
        return;
    }

    // send the query
    command.process(query, clientHandler);

}

/**
* Return a class that will handle the command
* requested by the client
*
* The supported commands are : quit, quote, feed
*
* @param commandName name of the command
* @return the class that will handle the command
*/
public ICommand getCommand(String commandName) {

    // we could use Spring to load the config, but we are in a demo :) 

    if (commandName.equalsIgnoreCase("quit")) {
        return new QuitCommand(this);
    } else if (commandName.equalsIgnoreCase("quote")) {
        return new QuoteCommand(this);
    } else if (commandName.equalsIgnoreCase("feed")) {
        return new FeedCommand(this);
    }

    return null;
}

At this point, the client request was receive by server and in process. All the remaining work will be done by the ThirdPartyConnectionHandler. In this demo, I didn’t provide a actual stock quote service, but instead I created a simulator of quotes (bid/ask). I could have done a simulator server and do a connection with this server, but I prefer to keep it simple. So I created a random quotes in the ICommandRequest. Take a look of the next Image.

Because we ask for a feed request, it’s a FeedCommandRequest that will be use to handle the quotes (bid/ask). The ThirdPartyConnection will retrieve the quotes (here it’s a simulator, but it could be in a cache) and send it back by the IResponseHandler receive by the ICommand.

/**
 * Send the request to the 3th party
 *
 * @param quoteSubscription the quote requested
 * @param responseHandler handler that will received the response
 * @return CommandRequest request
 * @throws Exception exception
 */
public ICommandRequest sendFeedRequest(String quoteSubscription, IResponseHandler responseHandler) throws Exception {

    FeedCommandRequest request = new FeedCommandRequest();
    request.setQuoteSubscription(quoteSubscription);
    request.setResponseHandler(responseHandler);

    responseHandler.setCommandRequest(request);

    // we send the query to the third party and we receive notification by the
    // responseHandler .. but here the update are generated in the CommandRequest class in a thread
    // not efficient.. but it's a DEMO :)   and because it's a QUOTE, we send
    // the update only to this client
    request.startGenerateQuote();

    return request;
}

Like I said the Feed request is the more complex, so when the sendToClient is called, it’s not only send the response to one client, but all the clients subscribes to the stock. In the FeedResponseHandler, the client’s list for the quote subscription is retrieve and browse. The quote (bid/ask) is send to all clients in that list.

public void sendToClient(StringBuffer sb) {

    ByteBuffer writeBuffer = ByteBuffer.allocate(sb.toString().getBytes().length);

    String quoteSubcription = getCommand().getSubscription();

    List<ClientConnectionHandler> list = getCommand().getQuoteManager().getClientHandlerByQuoteSubcription(quoteSubcription);

    // HOW to skip the clientConnectionHandler if it was closed ?

    if (list != null) {
        for (Iterator<ClientConnectionHandler> iterator = list.iterator(); iterator.hasNext();) {
            ClientConnectionHandler clientConnectionHandler = iterator.next();

            writeBuffer.put(sb.toString().getBytes());
            writeBuffer.flip();

            System.out.println("SENDING FEED TO CLIENT = [" + sb.toString() + "]");

            try {
                if (clientConnectionHandler.getSocketChannel().isConnected()) {
                    clientConnectionHandler.getSocketChannel().write(writeBuffer);
                }
            } catch (IOException e) {
                e.printStackTrace();
                // sometime obtain this error

                // java.nio.channels.ClosedChannelException

                // le client n'est pas connecte
                clientConnectionHandler.close();
            }

            writeBuffer.rewind();

        }
    }
}

Let’s talk about the quit command. When a client send a quit request to the server, or when the client close his connection, the QuoteManager will unsubscribe the client of his cache. If this client was the only one subscribe to the stock feed, the QuoteManager, will close the IResponseHandler with the 3th party, because there will be not clients that are interesting in the next stock update.

/**
* unsubscribe all the quoteSubcription for this client.  If there is no more client subcribe
* to a quoteSubcripotion, the subcription will be close.
* @param clientConnectionHandler the client connection handler
*/
public void unsubcribeClient(ClientConnectionHandler clientConnectionHandler) {

    ConcurrentSkipListSet<String> quoteSubcriptionList = getQuoteSubscription(clientConnectionHandler);

    // for each quoteSubcription we need to check if there are other client that are subcribe to this quoteSubcription
    // and we remove this client from the List.
    for (Iterator<String> iterator = quoteSubcriptionList.iterator(); iterator.hasNext();) {
        String quoteSubcription = iterator.next();

        List<ClientConnectionHandler> listClient = getClientHandlerByQuoteSubcription(quoteSubcription);
        listClient.remove(clientConnectionHandler);

        // if empty, the quoteSubcription is not needed anymore, so we close it
        if (listClient.size() == 0) {
            IResponseHandler responseHandler = getResponseHandlerFromQuoteSubcription(quoteSubcription);

            responseHandler.getCommandRequest().close();

            quoteSubscriptionCache.remove(quoteSubcription);

        }

    }
}

I think we are ready to see it live (live for me:) ). I’ll use a telnet to connect to the server and send requests.

Here the output in the server console, when we start the server.

NIOGateway started
listening for incoming TCP Connections on port : 5000

Now it’s time to connect to the server and send a command : feed|aaa[eoq]

telnet localhost 5000

feed|aaa[eoq] (paste it in the console)

as soon the request is receive by the server you will see that in the server console :

new client connection established
query found [feed|aaa]
SENDING FEED TO CLIENT = [SYMBOL=[aaa] BID = 27.014468688065794| ASK = 31.911143507306804]
SENDING FEED TO CLIENT = [SYMBOL=[aaa] BID = 46.96027985720251| ASK = 40.82132916110618]
SENDING FEED TO CLIENT = [SYMBOL=[aaa] BID = 1.7131267924565088| ASK = 9.244953384119619]

and the telnet console you will receive the bid/ask sent.

SYMBOL=[aaa] BID = 27.014468688065794| ASK = 31.911143507306804

SYMBOL=[aaa] BID = 46.96027985720251| ASK = 40.82132916110618

SYMBOL=[aaa] BID = 1.7131267924565088| ASK = 9.244953384119619

I can send another request to the server : feed|bbb[eoq]

that will be add the bbb’s bid/ask and the client and server console.

and when the client send the quit command, all the update will stop in the server and the client, and the connection with the server will be closed.

in the server console :

query found [quit|aaa]
ClientConnection close

Like I said in the beginning of this article, the server will handle the disconnection with the 3th party. To simulate a disconnection with the 3th party, I do it in the run() of the ThirdPartyConnectionHandler, and you will see it in the server console :

Simulate a disconnection from the 3th party
Reconnecting…

In the article, we saw the baselines how to create a real world application using the NIO. The SocketChannel are used instead of the plain socket read/write. In the first implementation, there is one Thread created by client and the SocketChannel is blocking. To handle the asynchronous requests we use callback object (IResponseHandler). To avoid to request the sames stocks to the 3th party, we use cache in the QuoteManager. Each client that sent a feed request is subscribe to the stock in keep in the cache, and when the connection is closed for this client, the server will unsubscribe it and close the request with the 3th party if required.

Download the source code here : to run the application java -jar nio_quotestock_demo_v1.jar

In the next article “Grizzly Migration Guide Part 2“, I’ll show you how to use non blocking connection using Selector.

Categories: Grizzly Tags: ,