Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

ThreadedSocketAcceptor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "ThreadedSocketAcceptor.h"
00028 #include "Settings.h"
00029 #include "Utility.h"
00030 
00031 namespace FIX
00032 {
00033 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00034   Application& application,
00035   MessageStoreFactory& factory,
00036   const SessionSettings& settings ) throw( ConfigError )
00037 : Acceptor( application, factory, settings )
00038 { socket_init(); }
00039 
00040 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00041   Application& application,
00042   MessageStoreFactory& factory,
00043   const SessionSettings& settings,
00044   LogFactory& logFactory ) throw( ConfigError )
00045 : Acceptor( application, factory, settings, logFactory )
00046 { 
00047   socket_init(); 
00048 }
00049 
00050 ThreadedSocketAcceptor::~ThreadedSocketAcceptor()
00051 { 
00052   socket_term(); 
00053 }
00054 
00055 void ThreadedSocketAcceptor::onConfigure( const SessionSettings& s )
00056 throw ( ConfigError )
00057 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00058 
00059   std::set<SessionID> sessions = s.getSessions();
00060   std::set<SessionID>::iterator i;
00061   for( i = sessions.begin(); i != sessions.end(); ++i )
00062   {
00063     const Dictionary& settings = s.get( *i );
00064     settings.getLong( SOCKET_ACCEPT_PORT );
00065     if( settings.has(SOCKET_REUSE_ADDRESS) )
00066       settings.getBool( SOCKET_REUSE_ADDRESS );
00067     if( settings.has(SOCKET_NODELAY) )
00068       settings.getBool( SOCKET_NODELAY );
00069   }
00070 
00071   QF_STACK_POP
00072 }
00073 
00074 void ThreadedSocketAcceptor::onInitialize( const SessionSettings& s )
00075 throw ( RuntimeError )
00076 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00077 
00078   short port = 0;
00079   std::set<int> ports;
00080 
00081   std::set<SessionID> sessions = s.getSessions();
00082   std::set<SessionID>::iterator i = sessions.begin();
00083   for( ; i != sessions.end(); ++i )
00084   {
00085     Dictionary settings = s.get( *i );
00086     port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00087 
00088     m_portToSessions[port].insert( *i );
00089 
00090     if( ports.find(port) != ports.end() )
00091       continue;
00092     ports.insert( port );
00093 
00094     const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ? 
00095       s.get().getBool( SOCKET_REUSE_ADDRESS ) : true;
00096 
00097     const bool noDelay = settings.has( SOCKET_NODELAY ) ? 
00098       s.get().getBool( SOCKET_NODELAY ) : false;
00099 
00100     const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
00101       s.get().getLong( SOCKET_SEND_BUFFER_SIZE ) : 0;
00102 
00103     const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
00104       s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
00105 
00106     int socket = socket_createAcceptor( port, reuseAddress );
00107     if( socket < 0 )
00108     {
00109       SocketException e;
00110       socket_close( socket );
00111       throw RuntimeError( "Unable to create, bind, or listen to port " 
00112                          + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
00113     }
00114     if( noDelay )
00115       socket_setsockopt( socket, TCP_NODELAY );
00116     if( sendBufSize )
00117       socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
00118     if( rcvBufSize )
00119       socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
00120 
00121     m_socketToPort[socket] = port;
00122     m_sockets.insert( socket );
00123   }    
00124 
00125   QF_STACK_POP
00126 }
00127 
00128 void ThreadedSocketAcceptor::onStart()
00129 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00130 
00131   Sockets::iterator i;
00132   for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00133   {
00134     Locker l( m_mutex );
00135     int port = m_socketToPort[*i];
00136     AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00137     thread_id thread;
00138     thread_spawn( &socketAcceptorThread, info, thread );
00139     addThread( *i, thread );
00140   }
00141 
00142   QF_STACK_POP
00143 }
00144 
00145 bool ThreadedSocketAcceptor::onPoll( double timeout )
00146 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00147 
00148   return false;
00149 
00150   QF_STACK_POP
00151 }
00152 
00153 void ThreadedSocketAcceptor::onStop()
00154 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00155   
00156   SocketToThread threads;
00157   SocketToThread::iterator i;
00158 
00159   {
00160     Locker l(m_mutex);
00161 
00162     time_t start = 0;
00163     time_t now = 0;
00164 
00165     ::time( &start );
00166     while ( isLoggedOn() )
00167     {
00168       if( ::time(&now) -5 >= start )
00169         break;
00170     }
00171 
00172     threads = m_threads;
00173     m_threads.clear();
00174   }
00175 
00176   for ( i = threads.begin(); i != threads.end(); ++i )
00177     socket_close( i->first );
00178   for ( i = threads.begin(); i != threads.end(); ++i )
00179     thread_join( i->second );
00180 
00181   QF_STACK_POP
00182 }
00183 
00184 void ThreadedSocketAcceptor::addThread( int s, thread_id t )
00185 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00186 
00187   Locker l(m_mutex);
00188 
00189   m_threads[ s ] = t;
00190 
00191   QF_STACK_POP
00192 }
00193 
00194 void ThreadedSocketAcceptor::removeThread( int s )
00195 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00196 
00197   Locker l(m_mutex);
00198   SocketToThread::iterator i = m_threads.find( s );
00199   if ( i != m_threads.end() )
00200   {
00201     thread_detach( i->second );
00202     m_threads.erase( i );
00203   }
00204 
00205   QF_STACK_POP
00206 }
00207 
00208 THREAD_PROC ThreadedSocketAcceptor::socketAcceptorThread( void* p )
00209 { QF_STACK_TRY
00210   QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00211 
00212   AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00213 
00214   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00215   int s = info->m_socket;
00216   int port = info->m_port;
00217   delete info;
00218 
00219   int noDelay = 0;
00220   int sendBufSize = 0;
00221   int rcvBufSize = 0;
00222   socket_getsockopt( s, TCP_NODELAY, noDelay );
00223   socket_getsockopt( s, SO_SNDBUF, sendBufSize );
00224   socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
00225 
00226   int socket = 0;
00227   while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00228   {
00229     if( noDelay )
00230       socket_setsockopt( socket, TCP_NODELAY );
00231     if( sendBufSize )
00232       socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
00233     if( rcvBufSize )
00234       socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
00235 
00236     Sessions sessions = pAcceptor->m_portToSessions[port];
00237 
00238     ThreadedSocketConnection * pConnection =
00239       new ThreadedSocketConnection
00240         ( socket, sessions, pAcceptor->getApplication(), pAcceptor->getLog() );
00241 
00242     ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00243 
00244     {
00245       Locker l( pAcceptor->m_mutex );
00246 
00247       std::stringstream stream;
00248       stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00249 
00250       if( pAcceptor->getLog() )
00251         pAcceptor->getLog()->onEvent( stream.str() );
00252 
00253       thread_id thread;
00254       if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00255         delete info;
00256       pAcceptor->addThread( socket, thread );
00257     }
00258   }
00259 
00260   if( !pAcceptor->isStopped() )
00261     pAcceptor->removeThread( s );
00262 
00263   return 0;
00264 
00265   QF_STACK_POP
00266   QF_STACK_CATCH
00267 }
00268 
00269 THREAD_PROC ThreadedSocketAcceptor::socketConnectionThread( void* p )
00270 { QF_STACK_TRY
00271   QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00272 
00273   ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00274 
00275   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00276   ThreadedSocketConnection* pConnection = info->m_pConnection;
00277   delete info;
00278 
00279   int socket = pConnection->getSocket();
00280 
00281   while ( pConnection->read() ) {}
00282   delete pConnection;
00283   if( !pAcceptor->isStopped() )
00284     pAcceptor->removeThread( socket );
00285   return 0;
00286 
00287   QF_STACK_POP
00288   QF_STACK_CATCH
00289 }
00290 }

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001