Channel Streaming Layer

This layer has the following:
  • Streaming Interface
  • Streaming implementation: Sockets
  • Channel Event Implementation
  • Channel Pipeline Implementation
  • Handler Implementation
  • Processing Layer

Streaming Interface

The Streaming interface has a set of interfaces that define the boundaries within which any channel implementation will work. The following are the interfaces defined here:

Channel: The channel through which communication occurs to the external world and back. This can be something likes a socket, pipe etc.

ChannelConfig: This defines the configuration for the channel

ChannelEvent: ChannelEvents are created for events that occur on the channel, for eg., when a client connects we can have a CONNECT event, when data arrives on the channel we can have a receive event and so on.

MessageEvent: This is a specialization of the Channel Event and has the message that has arrived or have been sent on the channel.

EventTracker: Events are tracked using EventTrackers. As the event progresses through the various handlers that event tracker changes state. Listeners can be registered with the EventTracker to be called when a particular state of the event occurs.

EventStateListener: This will be called when the state of the event changes in the event tracker. This will be registered into the EventTracker object.

ChannelPipeline: The events flow through a pipeline defined by ChannelPipeline. The ChannelPipeline has a pipe of handlers. Events from the channel such as received message will flow up the stream of handlers while events to the channel will flow down the stream of handlers and finally into the sink.

Handler: Events are handled by Handlers. This contains the processing logic to be done when an event occurs.

HandlerContext: The context in which a handler executes.

UpStreamHandler: The handler that is called when the message is flowing up from the channel.

DownStreamHandler: The handler that is called when the message is flowing down the channel.

Sink: The sink into which all downstream messages will be sunk before it is sent through the channel.

The below shows how the events flow through the pipeline:


Streaming Implementation: Sockets

This layer implements the Streaming layer to communicate via TCP channels. Pending still to implement communication via UDP.

SocketChannel: This is a server socket channel which is used for listening for connections and processing accepted connections. This generates the BOUND and ACCEPTED event to be streamed up the pipeline.

AcceptedChannel: This is the client socket channel which is used for communicating data through the socket channel.

SocketChannelConfig: This defines the configuration of the SocketChannel and the AcceptedChannel.

!!Channel Event Implementation

Defines the various event objects that will flow up and down the channel pipeline. It contains the eventtracker object which keeps track of the state of the event. Listeners can be registered which will be called when the event completes.

ChannelStateEvent: This event will be raised when the state of the channel changes.

UpstreamMsgEvent: The Message event generated when data is received in the channel.

DownStreamMsgEvent: The Message event generated when data has to be sent in the channel.

DownstreamStateEvent: The event raised by when the state of the channel is changed by the code

DefaultEventTracker: The default implementation for the event tracker functionality.

Channel Pipeline Implementation

This contains pipeline definition that can be used to handle events.

DefaultChannelPipeline: This class implements the pipeline functionality using the Pipeline class. This is a synchronous implementation that calls the handler functionality in synch with the thread that is firing the event.

AsynchronousPipeline: This class derives from the DefaultChannelPipeline and overrides the streaming functionality to run the handler calls in a separate thread from a threadpool. This calls the base class’s stream functionality in the separate thread.

SocketChannelSink: This acts as the sink for the Socket channels. It starts up a server worker thread that waits for connections on the server channel and a set of WorkerSelectors based on the number of client connections that come in.

SocketWorker: The worker class that does the actual work of receiving and sending the data across the socket channel. This is called by the WorkerSelector based on whether data is available for the channels monitored by these SocketWorker. The server worker distributes each of client connections that come in to be worked up by different socketworkers based on the load on that socketworker. Currently it does a equal distribution of work.

WorkerSelector: This is that class to which different SocketWorkers are distributed for periodic monitoring for availability of work to be done. If work is available in any of the sockets monitored by the SocketWorker the work is scheduled to be run on a thread pool.


The main thread starts up the serverworker thread and creates two distribution pools, one for the workerselector and one for the socketworker. It creates one ThreadPool in which the SocketWorkers are run.

When a new connection comes in, the socket’s work is distributed to the least work load SocketWorker, and if not already scheduled the SocketWorker is scheduled to a WorkerSelector with the least work to be done. The WorkerSelector schedules work from the SocketWorker to run in the SocketWorker Thread pool.

Handler Implementation

MessageFormatHandler: The message format handler handles reading of whole messages defined by the Reader interface. The format handler checks if the reader returns that the message is completely read, if not, it does not continue to the next handler and waits for the receipt of the rest of the message. When the whole message has been received, this will send the formatted message as given by the reader to the next in the handler stream. All messages sent in the channel is at the end passed through this handler before redirecting to the sink to be sent. This changes the object to a byte buffer using the reader.

MessagePersistence: All messages received by the format handler is passed through this to be persisted before it is passed on for processing.

MessageProcessorHandler: When the message has been completely read from the channel, the message is sent to the message processor to handle the message. Check the Processor design later on for more on this.

Processing Layer

All Message processors are registered in the MessageListenerRegistry. These are pooled using the ListenerPool object and accessible via the ListenerContainer. The Processor Handler will try to get an object from the listener pool once got, it calls the onMessage of the listener. The GBListener is a convenience implementation for the MessageListener. There are two lifecycle functions that can be overridden, activated, called when the object is activated for processing this message and passivated called when the object has completed the processing and the object is returned back to the pool.

MessageListener: The listener that has to be implemented for processing a message.

ListenerObject: A poolable wrapper for the MessageListener for pooling the object for use.

ListenerPool: A StrictObjectPool that calls the correct lifecycle functions

ListenerContainer: A container for the MessageListener Pools for different messages.

MessageListenerRegistry: A registry into which listeners are registered for various messages. More than one listener can be registered for the same message. The listeners will be called sequentially in the order registered.

Last edited Jan 20, 2011 at 9:15 AM by rsankarx, version 5


No comments yet.