![]() |
Threaded Socket implementation of Initiator. More...
#include <ThreadedSocketInitiator.h>
Public Member Functions | |
ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError ) | |
ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError ) | |
virtual | ~ThreadedSocketInitiator () |
Private Types | |
typedef std::map< int, thread_id > | SocketToThread |
typedef std::map< SessionID, int > | SessionToHostNum |
typedef std::pair < ThreadedSocketInitiator *, ThreadedSocketConnection * > | ThreadPair |
Private Member Functions | |
void | onConfigure (const SessionSettings &) throw ( ConfigError ) |
Implemented to configure acceptor. | |
void | onInitialize (const SessionSettings &) throw ( RuntimeError ) |
Implemented to initialize initiator. | |
void | onStart () |
Implemented to start connecting to targets. | |
bool | onPoll (double timeout) |
Implemented to connect and poll for events. | |
void | onStop () |
Implemented to stop a running initiator. | |
void | doConnect (const SessionID &s, const Dictionary &d) |
Implemented to connect a session to its target. | |
void | addThread (int s, thread_id t) |
void | removeThread (int s) |
void | lock () |
void | getHost (const SessionID &, const Dictionary &, std::string &, short &) |
Static Private Member Functions | |
static THREAD_PROC | socketThread (void *p) |
Private Attributes | |
SessionSettings | m_settings |
SessionToHostNum | m_sessionToHostNum |
time_t | m_lastConnect |
int | m_reconnectInterval |
bool | m_noDelay |
int | m_sendBufSize |
int | m_rcvBufSize |
bool | m_stop |
SocketToThread | m_threads |
Mutex | m_mutex |
Threaded Socket implementation of Initiator.
Definition at line 39 of file ThreadedSocketInitiator.h.
typedef std::map< SessionID, int > FIX::ThreadedSocketInitiator::SessionToHostNum [private] |
Definition at line 52 of file ThreadedSocketInitiator.h.
typedef std::map< int, thread_id > FIX::ThreadedSocketInitiator::SocketToThread [private] |
Definition at line 51 of file ThreadedSocketInitiator.h.
typedef std::pair< ThreadedSocketInitiator*, ThreadedSocketConnection* > FIX::ThreadedSocketInitiator::ThreadPair [private] |
Definition at line 53 of file ThreadedSocketInitiator.h.
FIX::ThreadedSocketInitiator::ThreadedSocketInitiator | ( | Application & | application, | |
MessageStoreFactory & | factory, | |||
const SessionSettings & | settings | |||
) | throw ( ConfigError ) |
Definition at line 33 of file ThreadedSocketInitiator.cpp.
References FIX::socket_init().
00037 : Initiator( application, factory, settings ), 00038 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ), 00039 m_sendBufSize( 0 ), m_rcvBufSize( 0 ) 00040 { 00041 socket_init(); 00042 }
FIX::ThreadedSocketInitiator::ThreadedSocketInitiator | ( | Application & | application, | |
MessageStoreFactory & | factory, | |||
const SessionSettings & | settings, | |||
LogFactory & | logFactory | |||
) | throw ( ConfigError ) |
Definition at line 44 of file ThreadedSocketInitiator.cpp.
References FIX::socket_init().
00049 : Initiator( application, factory, settings, logFactory ), 00050 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ), 00051 m_sendBufSize( 0 ), m_rcvBufSize( 0 ) 00052 { 00053 socket_init(); 00054 }
FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator | ( | ) | [virtual] |
Definition at line 56 of file ThreadedSocketInitiator.cpp.
References FIX::socket_term().
00057 { 00058 socket_term(); 00059 }
void FIX::ThreadedSocketInitiator::addThread | ( | int | s, | |
thread_id | t | |||
) | [private] |
Definition at line 196 of file ThreadedSocketInitiator.cpp.
References m_mutex, m_threads, QF_STACK_POP, and QF_STACK_PUSH.
Referenced by doConnect().
00197 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread) 00198 00199 Locker l(m_mutex); 00200 00201 m_threads[ s ] = t; 00202 QF_STACK_POP 00203 }
void FIX::ThreadedSocketInitiator::doConnect | ( | const SessionID & | , | |
const Dictionary & | ||||
) | [private, virtual] |
Implemented to connect a session to its target.
Implements FIX::Initiator.
Definition at line 145 of file ThreadedSocketInitiator.cpp.
References addThread(), FIX::IntConvertor::convert(), FIX::ThreadedSocketConnection::disconnect(), FIX::Initiator::getApplication(), getHost(), FIX::Initiator::getLog(), FIX::Session::getLog(), FIX::Session::isSessionTime(), FIX::Session::lookupSession(), m_mutex, m_noDelay, m_rcvBufSize, m_sendBufSize, FIX::Log::onEvent(), QF_STACK_POP, QF_STACK_PUSH, FIX::Initiator::setDisconnected(), FIX::Initiator::setPending(), FIX::socket_createConnector(), FIX::socket_setsockopt(), socketThread(), and FIX::thread_spawn().
00146 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect) 00147 00148 try 00149 { 00150 Session* session = Session::lookupSession( s ); 00151 if( !session->isSessionTime(UtcTimeStamp()) ) return; 00152 00153 Log* log = session->getLog(); 00154 00155 std::string address; 00156 short port = 0; 00157 getHost( s, d, address, port ); 00158 00159 int socket = socket_createConnector(); 00160 if( m_noDelay ) 00161 socket_setsockopt( socket, TCP_NODELAY ); 00162 if( m_sendBufSize ) 00163 socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize ); 00164 if( m_rcvBufSize ) 00165 socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize ); 00166 00167 setPending( s ); 00168 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) ); 00169 00170 ThreadedSocketConnection* pConnection = 00171 new ThreadedSocketConnection( s, socket, address, port, getApplication(), getLog() ); 00172 00173 ThreadPair* pair = new ThreadPair( this, pConnection ); 00174 00175 { 00176 Locker l( m_mutex ); 00177 thread_id thread; 00178 if ( thread_spawn( &socketThread, pair, thread ) ) 00179 { 00180 addThread( socket, thread ); 00181 } 00182 else 00183 { 00184 delete pair; 00185 pConnection->disconnect(); 00186 delete pConnection; 00187 setDisconnected( s ); 00188 } 00189 } 00190 } 00191 catch ( std::exception& ) {} 00192 00193 QF_STACK_POP 00194 }
void FIX::ThreadedSocketInitiator::getHost | ( | const SessionID & | s, | |
const Dictionary & | d, | |||
std::string & | address, | |||
short & | port | |||
) | [private] |
Definition at line 263 of file ThreadedSocketInitiator.cpp.
References FIX::Dictionary::getLong(), FIX::Dictionary::getString(), FIX::Dictionary::has(), m_sessionToHostNum, QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_CONNECT_HOST, and FIX::SOCKET_CONNECT_PORT.
Referenced by doConnect().
00265 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost) 00266 00267 int num = 0; 00268 SessionToHostNum::iterator i = m_sessionToHostNum.find( s ); 00269 if ( i != m_sessionToHostNum.end() ) num = i->second; 00270 00271 std::stringstream hostStream; 00272 hostStream << SOCKET_CONNECT_HOST << num; 00273 std::string hostString = hostStream.str(); 00274 00275 std::stringstream portStream; 00276 std::string portString = portStream.str(); 00277 portStream << SOCKET_CONNECT_PORT << num; 00278 00279 if( d.has(hostString) && d.has(portString) ) 00280 { 00281 address = d.getString( hostString ); 00282 port = ( short ) d.getLong( portString ); 00283 } 00284 else 00285 { 00286 num = 0; 00287 address = d.getString( SOCKET_CONNECT_HOST ); 00288 port = ( short ) d.getLong( SOCKET_CONNECT_PORT ); 00289 } 00290 00291 m_sessionToHostNum[ s ] = ++num; 00292 00293 QF_STACK_POP 00294 }
void FIX::ThreadedSocketInitiator::lock | ( | ) | [inline, private] |
Definition at line 66 of file ThreadedSocketInitiator.h.
References m_mutex.
00066 { Locker l(m_mutex); }
void FIX::ThreadedSocketInitiator::onConfigure | ( | const SessionSettings & | ) | throw ( ConfigError ) [private, virtual] |
Implemented to configure acceptor.
Reimplemented from FIX::Initiator.
Definition at line 61 of file ThreadedSocketInitiator.cpp.
References QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, and FIX::SOCKET_SEND_BUFFER_SIZE.
00063 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure) 00064 00065 try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); } 00066 catch ( std::exception& ) {} 00067 if( s.get().has( SOCKET_NODELAY ) ) 00068 m_noDelay = s.get().getBool( SOCKET_NODELAY ); 00069 if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) ) 00070 m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE ); 00071 if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) ) 00072 m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ); 00073 00074 QF_STACK_POP 00075 }
void FIX::ThreadedSocketInitiator::onInitialize | ( | const SessionSettings & | ) | throw ( RuntimeError ) [private, virtual] |
Implemented to initialize initiator.
Reimplemented from FIX::Initiator.
Definition at line 77 of file ThreadedSocketInitiator.cpp.
References QF_STACK_POP, and QF_STACK_PUSH.
00079 { QF_STACK_PUSH(ThreadedSocketInitiator::onInitialize) 00080 QF_STACK_POP 00081 }
bool FIX::ThreadedSocketInitiator::onPoll | ( | double | timeout | ) | [private, virtual] |
Implemented to connect and poll for events.
Implements FIX::Initiator.
Definition at line 104 of file ThreadedSocketInitiator.cpp.
References QF_STACK_POP, and QF_STACK_PUSH.
00105 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll) 00106 00107 return false; 00108 00109 QF_STACK_POP 00110 }
void FIX::ThreadedSocketInitiator::onStart | ( | ) | [private, virtual] |
Implemented to start connecting to targets.
Implements FIX::Initiator.
Definition at line 83 of file ThreadedSocketInitiator.cpp.
References FIX::Initiator::connect(), FIX::Initiator::isStopped(), m_lastConnect, m_mutex, m_reconnectInterval, FIX::process_sleep(), QF_STACK_POP, and QF_STACK_PUSH.
00084 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart) 00085 00086 while ( !isStopped() ) 00087 { 00088 time_t now; 00089 ::time( &now ); 00090 00091 if ( (now - m_lastConnect) >= m_reconnectInterval ) 00092 { 00093 Locker l( m_mutex ); 00094 connect(); 00095 m_lastConnect = now; 00096 } 00097 00098 process_sleep( 1 ); 00099 } 00100 00101 QF_STACK_POP 00102 }
void FIX::ThreadedSocketInitiator::onStop | ( | ) | [private, virtual] |
Implemented to stop a running initiator.
Implements FIX::Initiator.
Definition at line 112 of file ThreadedSocketInitiator.cpp.
References FIX::Initiator::isLoggedOn(), m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), FIX::Initiator::start(), and FIX::thread_join().
00113 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop) 00114 00115 SocketToThread threads; 00116 SocketToThread::iterator i; 00117 00118 { 00119 Locker l(m_mutex); 00120 00121 time_t start = 0; 00122 time_t now = 0; 00123 00124 ::time( &start ); 00125 while ( isLoggedOn() ) 00126 { 00127 if( ::time(&now) -5 >= start ) 00128 break; 00129 } 00130 00131 threads = m_threads; 00132 m_threads.clear(); 00133 } 00134 00135 for ( i = threads.begin(); i != threads.end(); ++i ) 00136 socket_close( i->first ); 00137 00138 for ( i = threads.begin(); i != threads.end(); ++i ) 00139 thread_join( i->second ); 00140 threads.clear(); 00141 00142 QF_STACK_POP 00143 }
void FIX::ThreadedSocketInitiator::removeThread | ( | int | s | ) | [private] |
Definition at line 205 of file ThreadedSocketInitiator.cpp.
References m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach().
00206 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread) 00207 00208 Locker l(m_mutex); 00209 SocketToThread::iterator i = m_threads.find( s ); 00210 00211 if ( i != m_threads.end() ) 00212 { 00213 thread_detach( i->second ); 00214 m_threads.erase( i ); 00215 } 00216 00217 QF_STACK_POP 00218 }
THREAD_PROC FIX::ThreadedSocketInitiator::socketThread | ( | void * | p | ) | [static, private] |
Definition at line 220 of file ThreadedSocketInitiator.cpp.
References FIX::ThreadedSocketConnection::connect(), FIX::ThreadedSocketConnection::disconnect(), FIX::ThreadedSocketConnection::getSession(), FIX::Session::getSessionID(), FIX::ThreadedSocketConnection::getSocket(), FIX::Session::lookupSession(), FIX::Session::next(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, and FIX::ThreadedSocketConnection::read().
Referenced by doConnect().
00221 { QF_STACK_TRY 00222 QF_STACK_PUSH(ThreadedSocketInitiator::socketThread) 00223 00224 ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p ); 00225 00226 ThreadedSocketInitiator* pInitiator = pair->first; 00227 ThreadedSocketConnection* pConnection = pair->second; 00228 FIX::SessionID sessionID = pConnection->getSession()->getSessionID(); 00229 FIX::Session* pSession = FIX::Session::lookupSession( sessionID ); 00230 int socket = pConnection->getSocket(); 00231 delete pair; 00232 00233 pInitiator->lock(); 00234 00235 if( !pConnection->connect() ) 00236 { 00237 pInitiator->getLog()->onEvent( "Connection failed" ); 00238 pConnection->disconnect(); 00239 delete pConnection; 00240 pInitiator->removeThread( socket ); 00241 pInitiator->setDisconnected( sessionID ); 00242 return 0; 00243 } 00244 00245 pInitiator->setConnected( sessionID ); 00246 pInitiator->getLog()->onEvent( "Connection succeeded" ); 00247 00248 pSession->next(); 00249 00250 while ( pConnection->read() ) {} 00251 00252 delete pConnection; 00253 if( !pInitiator->isStopped() ) 00254 pInitiator->removeThread( socket ); 00255 00256 pInitiator->setDisconnected( sessionID ); 00257 return 0; 00258 00259 QF_STACK_POP 00260 QF_STACK_CATCH 00261 }
time_t FIX::ThreadedSocketInitiator::m_lastConnect [private] |
Definition at line 73 of file ThreadedSocketInitiator.h.
Referenced by onStart().
Mutex FIX::ThreadedSocketInitiator::m_mutex [private] |
Reimplemented from FIX::Initiator.
Definition at line 80 of file ThreadedSocketInitiator.h.
Referenced by addThread(), doConnect(), lock(), onStart(), onStop(), and removeThread().
bool FIX::ThreadedSocketInitiator::m_noDelay [private] |
Definition at line 75 of file ThreadedSocketInitiator.h.
Referenced by doConnect().
int FIX::ThreadedSocketInitiator::m_rcvBufSize [private] |
Definition at line 77 of file ThreadedSocketInitiator.h.
Referenced by doConnect().
int FIX::ThreadedSocketInitiator::m_reconnectInterval [private] |
Definition at line 74 of file ThreadedSocketInitiator.h.
Referenced by onStart().
int FIX::ThreadedSocketInitiator::m_sendBufSize [private] |
Definition at line 76 of file ThreadedSocketInitiator.h.
Referenced by doConnect().
Definition at line 72 of file ThreadedSocketInitiator.h.
Referenced by getHost().
Reimplemented from FIX::Initiator.
Definition at line 71 of file ThreadedSocketInitiator.h.
bool FIX::ThreadedSocketInitiator::m_stop [private] |
Reimplemented from FIX::Initiator.
Definition at line 78 of file ThreadedSocketInitiator.h.
Definition at line 79 of file ThreadedSocketInitiator.h.
Referenced by addThread(), onStop(), and removeThread().