Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

ThreadedSocketInitiator.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "ThreadedSocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030 
00031 namespace FIX
00032 {
00033 ThreadedSocketInitiator::ThreadedSocketInitiator(
00034   Application& application,
00035   MessageStoreFactory& factory,
00036   const SessionSettings& settings ) throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ), 
00039   m_sendBufSize( 0 ), m_rcvBufSize( 0 ) 
00040 { 
00041   socket_init(); 
00042 }
00043 
00044 ThreadedSocketInitiator::ThreadedSocketInitiator(
00045   Application& application,
00046   MessageStoreFactory& factory,
00047   const SessionSettings& settings,
00048   LogFactory& logFactory ) throw( ConfigError )
00049 : Initiator( application, factory, settings, logFactory ),
00050   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ), 
00051   m_sendBufSize( 0 ), m_rcvBufSize( 0 ) 
00052 { 
00053   socket_init(); 
00054 }
00055 
00056 ThreadedSocketInitiator::~ThreadedSocketInitiator()
00057 { 
00058   socket_term(); 
00059 }
00060 
00061 void ThreadedSocketInitiator::onConfigure( const SessionSettings& s )
00062 throw ( ConfigError )
00063 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure)
00064 
00065   try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); }
00066   catch ( std::exception& ) {}
00067   if( s.get().has( SOCKET_NODELAY ) )
00068     m_noDelay = s.get().getBool( SOCKET_NODELAY );
00069   if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00070     m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00071   if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00072     m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00073 
00074   QF_STACK_POP
00075 }
00076 
00077 void ThreadedSocketInitiator::onInitialize( const SessionSettings& s )
00078 throw ( RuntimeError )
00079 { QF_STACK_PUSH(ThreadedSocketInitiator::onInitialize)
00080   QF_STACK_POP
00081 }
00082 
00083 void ThreadedSocketInitiator::onStart()
00084 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart)
00085 
00086   while ( !isStopped() )
00087   {
00088     time_t now;
00089     ::time( &now );
00090 
00091     if ( (now - m_lastConnect) >= m_reconnectInterval )
00092     {
00093       Locker l( m_mutex );
00094       connect();
00095       m_lastConnect = now;
00096     }
00097 
00098     process_sleep( 1 );
00099   }
00100 
00101   QF_STACK_POP
00102 }
00103 
00104 bool ThreadedSocketInitiator::onPoll( double timeout )
00105 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll)
00106 
00107   return false;
00108 
00109   QF_STACK_POP
00110 }
00111 
00112 void ThreadedSocketInitiator::onStop()
00113 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop)
00114 
00115   SocketToThread threads;
00116   SocketToThread::iterator i;
00117   
00118   {
00119     Locker l(m_mutex);
00120 
00121     time_t start = 0;
00122     time_t now = 0;
00123 
00124     ::time( &start );
00125     while ( isLoggedOn() )
00126     {
00127       if( ::time(&now) -5 >= start )
00128         break;
00129     }
00130 
00131     threads = m_threads;
00132     m_threads.clear();
00133   }   
00134 
00135   for ( i = threads.begin(); i != threads.end(); ++i )
00136     socket_close( i->first );
00137   
00138   for ( i = threads.begin(); i != threads.end(); ++i )
00139     thread_join( i->second );
00140   threads.clear();
00141 
00142   QF_STACK_POP
00143 }
00144 
00145 void ThreadedSocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00146 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect)
00147 
00148   try
00149   {
00150     Session* session = Session::lookupSession( s );
00151     if( !session->isSessionTime(UtcTimeStamp()) ) return;
00152 
00153     Log* log = session->getLog();
00154 
00155     std::string address;
00156     short port = 0;
00157     getHost( s, d, address, port );
00158 
00159     int socket = socket_createConnector();
00160     if( m_noDelay )
00161       socket_setsockopt( socket, TCP_NODELAY );
00162     if( m_sendBufSize )
00163       socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
00164     if( m_rcvBufSize )
00165       socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
00166 
00167     setPending( s );
00168     log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00169 
00170     ThreadedSocketConnection* pConnection =
00171       new ThreadedSocketConnection( s, socket, address, port, getApplication(), getLog() );
00172 
00173     ThreadPair* pair = new ThreadPair( this, pConnection );
00174 
00175     {
00176       Locker l( m_mutex );
00177       thread_id thread;
00178       if ( thread_spawn( &socketThread, pair, thread ) )
00179       {
00180         addThread( socket, thread );
00181       }
00182       else
00183       {
00184         delete pair;
00185         pConnection->disconnect();
00186         delete pConnection;
00187         setDisconnected( s );
00188       }
00189     }
00190   }
00191   catch ( std::exception& ) {}
00192 
00193   QF_STACK_POP
00194 }
00195 
00196 void ThreadedSocketInitiator::addThread( int s, thread_id t )
00197 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread)
00198 
00199   Locker l(m_mutex);
00200 
00201   m_threads[ s ] = t;
00202   QF_STACK_POP
00203 }
00204 
00205 void ThreadedSocketInitiator::removeThread( int s )
00206 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread)
00207 
00208   Locker l(m_mutex);
00209   SocketToThread::iterator i = m_threads.find( s );
00210 
00211   if ( i != m_threads.end() )
00212   {
00213     thread_detach( i->second );
00214     m_threads.erase( i );
00215   }
00216 
00217   QF_STACK_POP
00218 }
00219 
00220 THREAD_PROC ThreadedSocketInitiator::socketThread( void* p )
00221 { QF_STACK_TRY
00222   QF_STACK_PUSH(ThreadedSocketInitiator::socketThread)
00223 
00224   ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
00225 
00226   ThreadedSocketInitiator* pInitiator = pair->first;
00227   ThreadedSocketConnection* pConnection = pair->second;
00228   FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
00229   FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
00230   int socket = pConnection->getSocket();
00231   delete pair;
00232 
00233   pInitiator->lock();
00234 
00235   if( !pConnection->connect() )
00236   {
00237     pInitiator->getLog()->onEvent( "Connection failed" );
00238     pConnection->disconnect();
00239     delete pConnection;
00240     pInitiator->removeThread( socket );
00241     pInitiator->setDisconnected( sessionID );
00242     return 0;
00243   }
00244 
00245   pInitiator->setConnected( sessionID );
00246   pInitiator->getLog()->onEvent( "Connection succeeded" );
00247 
00248   pSession->next();
00249 
00250   while ( pConnection->read() ) {}
00251 
00252   delete pConnection;
00253   if( !pInitiator->isStopped() )
00254     pInitiator->removeThread( socket );
00255   
00256   pInitiator->setDisconnected( sessionID );
00257   return 0;
00258 
00259   QF_STACK_POP
00260   QF_STACK_CATCH
00261 }
00262 
00263 void ThreadedSocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00264                                        std::string& address, short& port )
00265 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost)
00266 
00267   int num = 0;
00268   SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00269   if ( i != m_sessionToHostNum.end() ) num = i->second;
00270 
00271   std::stringstream hostStream;
00272   hostStream << SOCKET_CONNECT_HOST << num;
00273   std::string hostString = hostStream.str();
00274 
00275   std::stringstream portStream;
00276   std::string portString = portStream.str();
00277   portStream << SOCKET_CONNECT_PORT << num;
00278 
00279   if( d.has(hostString) && d.has(portString) )
00280   {
00281     address = d.getString( hostString );
00282     port = ( short ) d.getLong( portString );
00283   }
00284   else
00285   {
00286     num = 0;
00287     address = d.getString( SOCKET_CONNECT_HOST );
00288     port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00289   }
00290 
00291   m_sessionToHostNum[ s ] = ++num;
00292 
00293   QF_STACK_POP
00294 }
00295 
00296 }

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001