Communications.cc

Go to the documentation of this file.
00001 // ----------------------------------------------------------------------------
00002 // CERTI - HLA RunTime Infrastructure
00003 // Copyright (C) 2002-2005  ONERA
00004 //
00005 // This file is part of CERTI
00006 //
00007 // CERTI is free software ; you can redistribute it and/or modify
00008 // it under the terms of the GNU General Public License as published by
00009 // the Free Software Foundation ; either version 2 of the License, or
00010 // (at your option) any later version.
00011 //
00012 // CERTI is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY ; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program ; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00020 //
00021 // $Id: Communications.cc,v 3.33 2008/10/12 11:46:39 gotthardp Exp $
00022 // ----------------------------------------------------------------------------
00023 
00024 #include <config.h>
00025 #include "Communications.hh"
00026 #include <assert.h>
00027 #include "PrettyDebug.hh"
00028 #include "NM_Classes.hh"
00029 
00030 #include "SocketHTTPProxy.hh"
00031 #include "SecureTCPSocket.hh"
00032 
00033 #ifdef _WIN32
00034     #include <windows.h>
00035     #ifdef max
00036         #undef max
00037     #endif
00038 #else
00039     #include <fstream>
00040     #include <iostream>
00041     #include <unistd.h>
00042     #include <errno.h>
00043 #endif
00044 
00045 #include <cstdlib>
00046 
00047 
00048 
00049 using std::ifstream ;
00050 using std::ios ;
00051 using std::cout ;
00052 using std::endl ;
00053 using std::list ;
00054 
00055 namespace certi {
00056 namespace rtia {
00057 
00058 static pdCDebug D("RTIA_COMM", "(RTIA Comm) ");
00059 static PrettyDebug G("GENDOC",__FILE__);
00060 
00061 // ----------------------------------------------------------------------------
00062 
00063 NetworkMessage* Communications::waitMessage(
00064                                  NetworkMessage::Type type_msg,
00065                                  FederateHandle numeroFedere)
00066 {    
00067     assert(type_msg > 0 && type_msg < NetworkMessage::LAST);
00068     NetworkMessage *msg = NULL;
00069     
00070     D.Out(pdProtocol, "Waiting for Message of Type %d.", type_msg);
00071 
00072     // Does a new message of the expected type has arrived ?
00073     if (searchMessage(type_msg, numeroFedere, &msg))
00074         return msg;
00075 
00076     // Otherwise, wait for a message with same type than expected and with
00077     // same federate number.
00078     msg = NM_Factory::receive(socketTCP);
00079 
00080     D.Out(pdProtocol, "TCP Message of Type %d has arrived.", type_msg);
00081 
00082     while ((msg->getType() != type_msg) ||
00083            ((numeroFedere != 0) && (msg->federate != numeroFedere))) {
00084         waitingList.push_back(msg);
00085         msg = NM_Factory::receive(socketTCP);
00086         D.Out(pdProtocol, "Message of Type %d has arrived.", type_msg);
00087     }
00088     
00089     assert(msg != NULL);
00090     assert(msg->getType() == type_msg);
00091     return msg;
00092 } /* end of waitMessage */
00093 
00094 // ----------------------------------------------------------------------------
00096 Communications::Communications(int RTIA_port)
00097 {
00098     char nom_serveur_RTIG[200] ;
00099     const char *default_host = "localhost" ;
00100 
00101     socketUN = new SocketUN();
00102 #ifdef FEDERATION_USES_MULTICAST
00103     socketMC = new SocketMC();
00104 #endif
00105     if(getenv("CERTI_HTTP_PROXY") != NULL || getenv("http_proxy") != NULL)
00106         socketTCP = new SocketHTTPProxy();
00107     else
00108 #ifdef WITH_GSSAPI
00109         socketTCP = new SecureTCPSocket();
00110 #else
00111         socketTCP = new SocketTCP();
00112 #endif
00113     socketUDP = new SocketUDP();
00114 
00115     // Federate/RTIA link creation.
00116     socketUN->acceptUN(RTIA_port);
00117 
00118     // RTIG TCP link creation.
00119     const char *certihost = NULL ;
00120     ifstream file("RTIA.dat", ios::in);
00121 
00122     if (!file.is_open()) {
00123     certihost = getenv("CERTI_HOST");
00124     if (NULL == certihost) {
00125         certihost = default_host ;
00126     }
00127     }
00128     else {
00129     file.get(nom_serveur_RTIG, 200);
00130     file.close();   
00131     certihost = nom_serveur_RTIG ;
00132     }
00133 
00134     const char *tcp_port = getenv("CERTI_TCP_PORT");
00135     const char *udp_port = getenv("CERTI_UDP_PORT");
00136     if (tcp_port==NULL) tcp_port = PORT_TCP_RTIG ;
00137     if (udp_port==NULL) udp_port = PORT_UDP_RTIG ;
00138 
00139     socketTCP->createConnection(certihost, atoi(tcp_port));
00140     socketUDP->createConnection(certihost, atoi(udp_port));
00141 }
00142 
00143 // ----------------------------------------------------------------------------
00145 Communications::~Communications()
00146 {
00147     // Advertise RTIG that TCP link is being closed.
00148     G.Out(pdGendoc,"enter Communications::~Communications");
00149 
00150     NM_Close_Connexion closeMsg ;    
00151     closeMsg.send(socketTCP, NM_msgBufSend);
00152     socketTCP->close();
00153 
00154     delete socketUN;
00155 #ifdef FEDERATION_USES_MULTICAST
00156     delete socketMC;
00157 #endif
00158     delete socketTCP;
00159     delete socketUDP;
00160 
00161     G.Out(pdGendoc,"exit  Communications::~Communications");
00162 }
00163 
00164 // ----------------------------------------------------------------------------
00166 void
00167 Communications::requestFederateService(Message *req)
00168 {
00169     // G.Out(pdGendoc,"enter Communications::requestFederateService for message "
00170     //               "type %d",req->type);
00171     assert(req != NULL);
00172     D.Out(pdRequest, "Sending Request to Federate, Type %d.", req->type);
00173     req->send(socketUN, msgBufSend);
00174     // G.Out(pdGendoc,"exit  Communications::requestFederateService");
00175 }
00176 
00177 // ----------------------------------------------------------------------------
00178 unsigned long
00179 Communications::getAddress()
00180 {
00181     return socketUDP->getAddr();
00182 }
00183 
00184 // ----------------------------------------------------------------------------
00185 unsigned int
00186 Communications::getPort()
00187 {
00188     return socketUDP->getPort();
00189 }
00190 
00191 // ----------------------------------------------------------------------------
00193 
00196 void
00197 Communications::readMessage(int &n, NetworkMessage **msg_reseau, Message **msg,
00198                             struct timeval *timeout)
00199 {
00200     const int tcp_fd(socketTCP->returnSocket());
00201     const int udp_fd(socketUDP->returnSocket());
00202 
00203     int max_fd = 0; // not used for _WIN32
00204     fd_set fdset ;
00205     FD_ZERO(&fdset);
00206 
00207     if (msg_reseau) {
00208         FD_SET(tcp_fd, &fdset);
00209         FD_SET(udp_fd, &fdset);
00210 #ifndef _WIN32
00211     max_fd = std::max(max_fd, std::max(tcp_fd, udp_fd));
00212 #endif
00213     }
00214     if (msg) {
00215         FD_SET(socketUN->returnSocket(), &fdset);
00216 #ifndef _WIN32
00217     max_fd = std::max(max_fd, socketUN->returnSocket());
00218 #endif
00219     }
00220 
00221 #ifdef FEDERATION_USES_MULTICAST
00222     // if multicast link is initialized (during join federation).
00223     if (_est_init_mc) {
00224         FD_SET(_socket_mc, &fdset);
00225 #ifndef _WIN32
00226         max_fd = std::max(max_fd, _socket_mc);
00227 #endif
00228    }
00229 #endif
00230 
00231     if (msg_reseau && !waitingList.empty()) {
00232         // One message is in waiting buffer.        
00233         *msg_reseau = waitingList.front();
00234         waitingList.pop_front();                
00235         n = 1 ;
00236     }
00237     else if (msg_reseau && socketTCP->isDataReady()) {
00238         // Datas are in TCP waiting buffer.
00239         // Read a message from RTIG TCP link.
00240         *msg_reseau = NM_Factory::receive(socketTCP);
00241         n = 1 ;
00242     }
00243     else if (msg_reseau && socketUDP->isDataReady()) {
00244         // Datas are in UDP waiting buffer.
00245         // Read a message from RTIG UDP link.
00246         *msg_reseau = NM_Factory::receive(socketUDP);
00247         n = 1 ;
00248     }
00249     else if (msg && socketUN->isDataReady()) {
00250         // Datas are in UNIX waiting buffer.
00251         // Read a message from federate UNIX link.
00252         (*msg) = new Message();
00253         (*msg)->receive(socketUN, msgBufReceive);
00254         n = 2 ;
00255     }
00256     else {
00257         // waitingList is empty and no data in TCP buffer.
00258         // Wait a message (coming from federate or network).
00259 #ifdef _WIN32
00260         if (select(max_fd, &fdset, NULL, NULL, timeout) < 0) {
00261             if (WSAGetLastError() == WSAEINTR)
00262 #else
00263         if (select(max_fd+1, &fdset, NULL, NULL, timeout) < 0) {
00264             if (errno == EINTR)
00265 #endif 
00266             {
00267                 throw NetworkSignal("EINTR on select");
00268                 }
00269                 else {
00270                  throw NetworkError("Unexpected errno on select");
00271                 }
00272         }
00273 
00274         // At least one message has been received, read this message.
00275 
00276 #ifdef FEDERATION_USES_MULTICAST
00277         // Priorite aux messages venant du multicast(pour essayer d'eviter
00278         // un depassement de la file et donc la perte de messages)
00279 
00280         if (_est_init_mc && FD_ISSET(_socket_mc, &fdset)) {
00281             // Read a message coming from the multicast link.
00282             receiveMC(*msg_reseau);
00283             n = 1 ;
00284         }
00285 #endif
00286 
00287         if (FD_ISSET(socketTCP->returnSocket(), &fdset)) {
00288             // Read a message coming from the TCP link with RTIG.
00289             (*msg_reseau) = NM_Factory::receive(socketTCP);
00290             n = 1 ;
00291         }
00292         else if (FD_ISSET(socketUDP->returnSocket(), &fdset)) {
00293             // Read a message coming from the UDP link with RTIG.
00294             (*msg_reseau) = NM_Factory::receive(socketUDP);
00295             n = 1 ;
00296         }
00297         else if (FD_ISSET(socketUN->returnSocket(), &fdset)) {
00298             // Read a message coming from the federate.            
00299             (*msg) = new Message();
00300             (*msg)->receive(socketUN, msgBufReceive);
00301             n = 2 ;
00302         }
00303         else
00304         {
00305             // select() timeout occured
00306             n = 3;
00307         }
00308     }
00309 } /* end of readMessage */
00310 
00311 // ----------------------------------------------------------------------------
00317 bool
00318 Communications::searchMessage(NetworkMessage::Type type_msg,
00319                               FederateHandle numeroFedere,
00320                               NetworkMessage **msg)
00321 {
00322     list<NetworkMessage *>::iterator i ;
00323     for (i = waitingList.begin(); i != waitingList.end(); i++) {
00324 
00325         D.Out(pdProtocol, "Rechercher message de type %d .", type_msg);
00326 
00327         if ((*i)->getType() == type_msg) {
00328             // if numeroFedere != 0, verify that federateNumbers are similar
00329             if (((*i)->federate == numeroFedere) || (numeroFedere == 0)) {
00330                 *msg = *i;                                
00331                 waitingList.erase(i);
00332                 D.Out(pdProtocol,
00333                       "Message of Type %d was already here.",
00334                       type_msg);
00335                 return true ;
00336             }
00337         }
00338     }
00339     return false ;
00340 }
00341 
00342 // ----------------------------------------------------------------------------
00343 void
00344 Communications::sendMessage(NetworkMessage *Msg)
00345 {
00346     Msg->send(socketTCP, NM_msgBufSend);
00347 }
00348 
00349 // ----------------------------------------------------------------------------
00350 void
00351 Communications::sendUN(Message *Msg)
00352 {
00353     Msg->send(socketUN, msgBufSend);
00354 }
00355 
00356 // ----------------------------------------------------------------------------
00357 void
00358 Communications::receiveUN(Message *Msg)
00359 {
00360     Msg->receive(socketUN, msgBufReceive);
00361 }
00362 
00363 }} // namespace certi/rtia
00364 
00365 // $Id: Communications.cc,v 3.33 2008/10/12 11:46:39 gotthardp Exp $

Generated on Thu Apr 30 15:53:48 2009 for CERTIDeveloperDocumentation by doxygen 1.5.5