Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

phNodeComm.cpp

Go to the documentation of this file.
00001 /* ---------------------------------------------------------------------------
00002     Phission : 
00003         Realtime Vision Processing System
00004 
00005     Copyright (C) 2003 Philip D.S. Thoren (pthoren@cs.uml.edu)
00006     University of Massachusetts at Lowell,
00007     Laboratory for Artificial Intelligence and Robotics
00008 
00009     This file is part of Phission.
00010 
00011  ---------------------------------------------------------------------------*/
00012 /* -------------------------------------------------------------------------- *
00013  * phNodeComm.cpp: This class is used to establish the TCP network
00014  * over which the Node Controllers communicate.
00015  *
00016  * There are two types of Node Controller Threads:
00017  *  Waiter: This is a server type thread that waits for a connection
00018  *  Connector: This is a client type thread that tries to connect
00019  *      to a Waiter/server thread on another host/port
00020  *
00021  * Once the connection is established, the Node Comms Handler is
00022  * used to handle incoming/outgoing messages between the Node
00023  * Controllers.
00024  * -------------------------------------------------------------------------- */
00025 #include <phission.h>
00026 #include <phNodeComm.h>
00027 
00028 /* -------------------------------------------------------------------------- * 
00029  * phNodeComm:
00030  * -------------------------------------------------------------------------- */
00031 phNodeComm::phNodeComm(int id, int other_id, 
00032                        int port, char *host,
00033                        int type,
00034                        phLamportSystem *system )
00035 {
00036     phFUNCTION("phNodeComm::phNodeComm")
00037     int locked = 0;
00038     
00039     phTHIS_LOOSE_LOCK(locked);
00040     
00041     this->m_port            = 0;
00042     this->m_host            = NULL;
00043     this->m_host_size       = 0;
00044     this->m_id              = -1;
00045     this->m_other_side_id   = -1;
00046     this->m_type            = -1;
00047     this->m_node_sock       = NULL;
00048     this->m_server          = NULL;
00049 
00050     this->m_buffer          = NULL;
00051     this->m_buffer_size     = 0;
00052 
00053     this->m_connected       = 0;
00054 
00055     this->m_system          = NULL;
00056     
00057     rc = this->setId(id);
00058     rc = this->setOtherSideId(other_id);
00059     rc = this->setPort(port);
00060     rc = this->setHost(host);
00061     rc = this->setConnectionType(type);
00062     rc = this->setSystem(system); 
00063 error:
00064     
00065     phTHIS_LOOSE_UNLOCK(locked);
00066     
00067     rc = 0;  
00068 }
00069 
00070 /* -------------------------------------------------------------------------- *
00071  * ~phNodeComm:
00072  * -------------------------------------------------------------------------- */
00073 phNodeComm::~phNodeComm()
00074 {
00075     phFUNCTION("phNodeComm::~phNodeComm")
00076     int locked = 0;
00077 
00078     phTHIS_LOOSE_LOCK(locked);
00079  
00080     this->m_connected = 0;
00081     
00082     phDelete(this->m_server);
00083     phDelete(this->m_node_sock);
00084     phFree(this->m_buffer);
00085     phFree(this->m_host);
00086 }
00087 
00088 /* -------------------------------------------------------------------------- *
00089  * setPort:
00090  * -------------------------------------------------------------------------- */
00091 int phNodeComm::setPort(int port)
00092 {
00093     phFUNCTION("phNodeComm::setPort")
00094     int locked = 0;
00095     
00096     phTHIS_LOCK(locked);
00097 
00098     this->m_port = port;
00099     
00100     phTHIS_UNLOCK(locked);
00101 
00102     return phSUCCESS;
00103 error:
00104     phTHIS_ERROR_UNLOCK(locked);
00105 
00106     return phFAIL;
00107 }
00108 
00109 /* -------------------------------------------------------------------------- *
00110  * getPort:
00111  * -------------------------------------------------------------------------- */
00112 int phNodeComm::getPort()
00113 {
00114     phFUNCTION("phNodeComm::getPort")
00115     int ret_port = 0;
00116     int locked = 0;
00117     
00118     phTHIS_LOCK(locked);
00119 
00120     ret_port = this->m_port;
00121     
00122 error:
00123     phTHIS_ERROR_UNLOCK(locked);
00124 
00125     return ret_port;
00126 }
00127 
00128 /* -------------------------------------------------------------------------- *
00129  * setConnectionType:
00130  * -------------------------------------------------------------------------- */
00131 int phNodeComm::setConnectionType(int type)
00132 {
00133     phFUNCTION("phNodeComm::setConnectionType")
00134     int locked = 0;
00135     
00136     phTHIS_LOCK(locked);
00137 
00138     this->m_type = type;
00139     
00140     phTHIS_UNLOCK(locked);
00141 
00142     return phSUCCESS;
00143 error:
00144     phTHIS_ERROR_UNLOCK(locked);
00145 
00146     return phFAIL;
00147 }
00148 
00149 /* -------------------------------------------------------------------------- *
00150  * getConnectionType:
00151  * -------------------------------------------------------------------------- */
00152 int phNodeComm::getConnectionType()
00153 {
00154     phFUNCTION("phNodeComm::getConnectionType")
00155     int ret_type = 0;
00156     int locked = 0;
00157     
00158     phTHIS_LOCK(locked);
00159 
00160     ret_type = this->m_type;
00161     
00162 error:
00163     phTHIS_ERROR_UNLOCK(locked);
00164 
00165     return ret_type;
00166 }
00167 
00168 /* -------------------------------------------------------------------------- *
00169  * setHost:
00170  * -------------------------------------------------------------------------- */
00171 int phNodeComm::setHost(char *host)
00172 {
00173     phFUNCTION("phNodeComm::setHost")
00174     int locked = 0;
00175     uint32_t length = (host != NULL) ? strlen(host) : 0;
00176    
00177     if (length == 0)
00178     {
00179         return phFAIL;
00180     }
00181     
00182     phTHIS_LOCK(locked);
00183 
00184     if (this->m_host == NULL)
00185     {
00186         this->m_host = (char *)phCalloc(length + 25, sizeof(char));
00187         phCHECK_NULLPTR(this->m_host,"phCalloc","phCalloc failed.");
00188         
00189         this->m_host_size = (length + 25) * sizeof(char);
00190     }
00191     else if (((length+1) * sizeof(char)) > (uint32_t)this->m_host_size)
00192     {
00193         this->m_host_size = (length + 25) * sizeof(char);
00194         this->m_host = (char *)phRealloc(this->m_host,
00195                                          this->m_host_size);
00196     }
00197     
00198     sprintf(this->m_host,"%s",host);
00199     
00200     phTHIS_UNLOCK(locked);
00201 
00202     return phSUCCESS;
00203 error:
00204     phTHIS_ERROR_UNLOCK(locked);
00205 
00206     return phFAIL;
00207 }
00208 
00209 /* -------------------------------------------------------------------------- *
00210  * getHost:
00211  * -------------------------------------------------------------------------- */
00212 char *phNodeComm::getHost()
00213 {
00214     phFUNCTION("phNodeComm::getHost")
00215     char *ret_type = 0;
00216     int locked = 0;
00217     
00218     phTHIS_LOCK(locked);
00219 
00220     ret_type = this->m_host;
00221     
00222 error:
00223     phTHIS_ERROR_UNLOCK(locked);
00224 
00225     return ret_type;
00226 }
00227 
00228 /* -------------------------------------------------------------------------- *
00229  * setId:
00230  * -------------------------------------------------------------------------- */
00231 int phNodeComm::setId(int id)
00232 {
00233     phFUNCTION("phNodeComm::setId")
00234     int locked = 0;
00235     
00236     phTHIS_LOCK(locked);
00237 
00238     this->m_id = id;
00239     
00240     phTHIS_UNLOCK(locked);
00241 
00242     return phSUCCESS;
00243 error:
00244     phTHIS_ERROR_UNLOCK(locked);
00245 
00246     return phFAIL;
00247 }
00248 
00249 /* -------------------------------------------------------------------------- *
00250  * getId:
00251  * -------------------------------------------------------------------------- */
00252 int phNodeComm::getId()
00253 {
00254     return this->m_id;
00255 }
00256 
00257 /* -------------------------------------------------------------------------- *
00258  * setOtherSideId:
00259  * -------------------------------------------------------------------------- */
00260 int phNodeComm::setOtherSideId(int other_side_id)
00261 {
00262     phFUNCTION("phNodeComm::setOtherSideId")
00263     int locked = 0;
00264     
00265     phTHIS_LOCK(locked);
00266 
00267     this->m_other_side_id = other_side_id;
00268     
00269     phTHIS_UNLOCK(locked);
00270 
00271     return phSUCCESS;
00272 error:
00273     phTHIS_ERROR_UNLOCK(locked);
00274 
00275     return phFAIL;
00276 }
00277 
00278 /* -------------------------------------------------------------------------- *
00279  * getOtherSideId:
00280  * -------------------------------------------------------------------------- */
00281 int phNodeComm::getOtherSideId()
00282 {
00283     phFUNCTION("phNodeComm::getOtherSideId")
00284     int ret_id = 0;
00285     int locked = 0;
00286     
00287     phTHIS_LOCK(locked);
00288 
00289     ret_id = this->m_other_side_id;
00290     
00291 error:
00292     phTHIS_ERROR_UNLOCK(locked);
00293 
00294     return ret_id;
00295 }
00296 
00297 /* -------------------------------------------------------------------------- *
00298  * setConnected:
00299  * -------------------------------------------------------------------------- */
00300 int phNodeComm::setConnected(int isConnected)
00301 {
00302     phFUNCTION("phNodeComm::setConnected")
00303     int locked = 0;
00304     
00305     phTHIS_LOCK(locked);
00306 
00307     this->m_connected = isConnected;
00308     
00309     phTHIS_UNLOCK(locked);
00310 
00311     return phSUCCESS;
00312 error:
00313     phTHIS_ERROR_UNLOCK(locked);
00314 
00315     return phFAIL;
00316 }
00317 
00318 /* -------------------------------------------------------------------------- */
00319 phSocket *phNodeComm::getSocket()
00320 {
00321     return this->m_node_sock;
00322 }
00323 
00324 /* -------------------------------------------------------------------------- *
00325  * isConnected:
00326  * -------------------------------------------------------------------------- */
00327 int phNodeComm::isConnected()
00328 {
00329     phFUNCTION("phNodeComm::isConnected")
00330     int ret_port = 0;
00331     int locked = 0;
00332     
00333     phTHIS_LOCK(locked);
00334 
00335     ret_port = this->m_connected;
00336     
00337 error:
00338     phTHIS_ERROR_UNLOCK(locked);
00339 
00340     return ret_port;
00341 }
00342 
00343 /* -------------------------------------------------------------------------- *
00344  * setSystem:
00345  * -------------------------------------------------------------------------- */
00346 int phNodeComm::setSystem(phLamportSystem *system)
00347 {
00348     phFUNCTION("phNodeComm::setSystem")
00349     int locked = 0;
00350     
00351     phTHIS_LOCK(locked);
00352 
00353     this->m_system = system;
00354     
00355     if (this->m_system != NULL)
00356     {
00357         rc = this->m_system->registerResource(this);
00358         phPRINT_RC(rc,NULL,"m_system.registerResource(this:%p) failed.",
00359              this);
00360     }
00361     
00362     phTHIS_UNLOCK(locked);
00363 
00364     return phSUCCESS;
00365 error:
00366     phTHIS_ERROR_UNLOCK(locked);
00367 
00368     return phFAIL;
00369 }
00370 
00371 /* -------------------------------------------------------------------------- *
00372  * getSystem:
00373  * -------------------------------------------------------------------------- */
00374 phLamportSystem *phNodeComm::getSystem()
00375 {
00376     phFUNCTION("phNodeComm::")
00377     phLamportSystem *system = 0;
00378     int locked = 0;
00379     
00380     phTHIS_LOCK(locked);
00381 
00382     system = this->m_system;
00383     
00384 error:
00385     phTHIS_ERROR_UNLOCK(locked);
00386 
00387     return system;
00388 }
00389 
00390 /* -------------------------------------------------------------------------- *
00391  * Connector:
00392  * -------------------------------------------------------------------------- */
00393 int phNodeComm::Connector()
00394 {
00395     phFUNCTION("phNodeComm::Connector")
00396 
00397     int port = this->m_port;
00398     char *host = ((this->m_host != NULL) ? strdup(this->m_host) : NULL);
00399 
00400     /* Print out the progress of the client */
00401     DEBUG_PRINT("\n\t\t[id:%d]Attempting connect to [oid:%d] %s on port %d\n\n",
00402               this->m_id, this->m_other_side_id, 
00403               this->m_host, this->m_port );
00404    
00405     this->m_node_sock = new phSocket();
00406 
00407     do 
00408     {
00409         rc = this->m_node_sock->connect( this->m_host, 
00410                                          this->m_port);
00411         phPRINT_RC(rc,NULL,"sock->connect failed to connect");
00412     }
00413     while (this->isRunning() && (!this->m_node_sock->isConnected()));
00414     
00415     if (!this->m_node_sock->isConnected())
00416     {
00417         phDelete(this->m_node_sock);
00418     }
00419 
00420     /* run has the lock until here */
00421     rc = this->signal_running();
00422     phCHECK_RC(rc, NULL, "this->signal_running()");
00423         
00424     if (this->m_node_sock == NULL)
00425     {
00426         return 1;
00427     }
00428     else
00429     {
00430         DEBUG_PRINT("Connected...\n");
00431 
00432         /* 
00433          * rc = this->m_node_sock->print(stderr);
00434          * phPRINT_RC(rc,NULL,"m_node_sock->print");
00435          */
00436         return phSUCCESS;
00437     }
00438 error:
00439    
00440     DEBUG_PRINT("Connect failed...\n");
00441     
00442     phDelete(this->m_node_sock);
00443     
00444     return phFAIL;
00445 }
00446 
00447 /* -------------------------------------------------------------------------- *
00448  * Waiter:
00449  * -------------------------------------------------------------------------- */
00450 int phNodeComm::Waiter()
00451 {
00452     phFUNCTION("phNodeComm::Waiter")
00453 
00454     this->m_server = new phServerSocket();
00455     phCHECK_NULLPTR(this->m_server,"new","new failed to allocate");
00456     
00457     /* Do any setup here */
00458     rc = this->m_server->setPort(this->getPort());
00459     phPRINT_RC(rc,NULL,"server->setPort");
00460     
00461     rc = this->m_server->setBackLog(1);
00462     phPRINT_RC(rc,NULL,"server->setBackLog()");
00463   
00464     /* Start the server socket listening on the port number
00465      * passed to it in the constructor*/
00466     rc =this->m_server->listen();
00467     phCHECK_RC(rc,NULL,"server->listen()");
00468    
00469     /* Set the server thread to be non-blocking */
00470     rc = this->m_server->setBlocking(phNONBLOCK);
00471     phCHECK_RC(rc,NULL,"server->setBlocking(phNONBLOCK)");
00472     
00473     DEBUG_PRINT("\n\t\t[id:%d] Listening on port:%d for [oid:%d]\n\n",
00474              this->m_id, this->m_port, this->m_other_side_id );
00475  
00476     /* 
00477      * rc = this->m_server->print(stderr);
00478      * phPRINT_RC(rc,NULL,"m_server->print");
00479      */
00480     DEBUG_PRINT("Entering accept loop...\n");
00481     while (this->m_server->isSockValid() && (this->m_node_sock == NULL))
00482     {
00483         DEBUG_PRINT("accepting...\n");
00484         this->m_node_sock = this->m_server->accept();
00485         if (this->m_server->isSockValid() == 0)
00486         {
00487             phDelete(this->m_node_sock);
00488             continue;
00489         }
00490         phCONT_NULLPTR(this->m_node_sock, "accept",
00491                      "failed to accept client connection\n\n");
00492     
00493         /* 
00494          * rc = this->m_node_sock->print(stderr);
00495          * phPRINT_RC(rc,NULL,"m_node_sock->print");
00496          */
00497     }
00498     DEBUG_PRINT("Finished accept loop (running:%d)...\n",
00499              this->isRunning());
00500     
00501     phDelete(this->m_server);
00502    
00503 
00504     /* run has the lock until here */
00505     rc = this->signal_running();
00506     phCHECK_RC(rc, NULL, "this->signal_running()");
00507         
00508     if (this->m_node_sock == NULL)
00509     {
00510         return 1;
00511     }
00512     else
00513     {
00514         DEBUG_PRINT("Connected...\n");
00515         DEBUG_PRINT("\t\t[id:%d] Accepted client on port:%d for [oid:%d]\n\n",
00516                  this->m_id,this->m_port,this->m_other_side_id );
00517  
00518         return phSUCCESS;
00519     }
00520 error:
00521    
00522     return phFAIL;
00523 }
00524  
00525 
00526 /* -------------------------------------------------------------------------- *
00527  * run:
00528  * -------------------------------------------------------------------------- */
00529 int phNodeComm::run()
00530 {
00531     phFUNCTION("phNodeComm::run")
00532     int locked = 0;
00533 
00534     /* make sure there is a mutual exclusion system to negotiate with */
00535     phCHECK_NULLPTR(this->m_system,NULL,
00536         "Bad parameter: this->m_system == NULL" );
00537    
00538     /* Depending on NODE ID and THREAD ID, this could be either a 
00539      * Waiter thread or a Connector thread... pick one.
00540      * The parent thread will set this variable prior to spawning 
00541      * this thread */
00542     if (this->m_type == phNodeController_Waiter)
00543     {
00544         rc = this->Waiter();
00545         phCHECK_RC(rc,NULL,"this->Waiter failed");
00546     }
00547     else if (this->m_type == phNodeController_Connector)
00548     {
00549         rc = this->Connector();
00550         phCHECK_RC(rc,NULL,"this->Connector failed");
00551     }
00552     else
00553     {
00554         rc = this->signal_error();
00555         phCHECK_RC(rc, NULL, "this->signal_error()");
00556    
00557         return phFAIL;
00558     }
00559     
00560     if (rc == 1)
00561     {
00562         goto success;
00563     }
00564     
00565     /* The connection has been established, handle the comms */
00566     if (this->m_node_sock != NULL)
00567     {
00568         /* Set the client thread's socket to blocking */
00569         rc = this->m_node_sock->setBlocking(phBLOCK);
00570         phPRINT_RC(rc,NULL,"this->m_node_sock->setBlocking(phBLOCK)");
00571     }
00572 
00573     /* Set the Connected state */
00574     rc = this->setConnected(1);
00575     phCHECK_RC(rc,NULL,"setConnected(1) failed.");
00576    
00577     if (this->isRunning())
00578     {
00579         /* This handles all the necessary Node Communication
00580          * required */
00581         rc = this->comm_loop();
00582         phPRINT_RC(rc,NULL,"this->comm_loop()");
00583         
00584         phTHIS_LOCK(locked);
00585         
00586         rc = this->setConnected(0);
00587         phPRINT_RC(rc,NULL,"setConnected(0) failed");
00588         
00589         /* this disconnects and frees up the phSocket obj */
00590         phDelete(this->m_node_sock);
00591         
00592         phTHIS_UNLOCK(locked);
00593     }
00594     
00595 success:
00596     return phSUCCESS;
00597 
00598 error:
00599     phPROGRESS("Thread returning with error\n");
00600     
00601     rc = this->signal_error();
00602     phCHECK_RC(rc, NULL, "this->signal_error()");
00603    
00604     return phFAIL;
00605 }
00606  
00607 /* -------------------------------------------------------------------------- *
00608  * comm_loop:
00609  * -------------------------------------------------------------------------- */
00610 int phNodeComm::comm_loop()
00611 {
00612     phFUNCTION("phNodeComm::comm_loop")
00613     phMessage message;
00614    
00615     /* Print a message that we're connected to the user */
00616     phPROGRESS("[%d<->%d] Running...\n\n", 
00617              this->m_id,
00618              this->m_other_side_id);
00619         
00620     while (this->isRunning() && this->isConnected())
00621     {
00622         rc = message.receive(this->m_node_sock);
00623         phPRINT_RC(rc,NULL,"message.receive");
00624         
00625         if ((rc == phSUCCESS) && (this->m_node_sock->isConnected()))
00626         {
00627             rc = this->m_system->handle_message(message);
00628             phPRINT_RC(rc,NULL,"m_system->handle_message");
00629 
00630             if (rc != phSUCCESS)
00631             {
00632                 phPROGRESS("System error handling message:\n");
00633                 message.print(stderr);
00634             }
00635         }
00636         
00637         phMSleep(10);
00638         
00639         /* yield to prevent tight looping */
00640         phYield();
00641         if (this->m_node_sock == NULL)
00642         {
00643             this->setConnected(0);
00644         }
00645         else if (!this->m_node_sock->isConnected())
00646         {
00647             this->setConnected(0);
00648         }
00649     }
00650  
00651     return phSUCCESS;
00652 error:
00653     return phFAIL;
00654 }
00655 
00656 /* ------------------------------------------------------------------------- *
00657  * wakeup: 
00658  * ------------------------------------------------------------------------- */
00659 int phNodeComm::wakeup()
00660 {
00661     phFUNCTION("phNodeComm::wakeup")
00662     int locked = 0;
00663 
00664     /* phTHIS_LOCK(locked); */
00665     
00666     phPROGRESS("id:%u\n",this->m_id);
00667     if (this->m_server != NULL)
00668     {
00669         this->m_server->wakeup();
00670     }
00671     if (this->m_node_sock != NULL)
00672     {
00673         this->m_node_sock->wakeup();
00674     }
00675     
00676     /* phTHIS_UNLOCK(locked); */
00677     
00678     return phSUCCESS;
00679 }
00680     
00681 /* ------------------------------------------------------------------------- *
00682  * cleanup:
00683  * ------------------------------------------------------------------------- */
00684 int phNodeComm::cleanup()
00685 {
00686     phFUNCTION("phNodeComm::cleanup")
00687     int locked = 0;
00688 
00689     phTHIS_LOCK(locked);
00690     
00691     /* phPROGRESS("\n"); */
00692     
00693     phTHIS_UNLOCK(locked);
00694     
00695     return phSUCCESS;
00696 error:
00697     phTHIS_ERROR_UNLOCK(locked);
00698     
00699     return phFAIL;
00700 }
00701     
00702 
00703 /* ------------------------------------------------------------------------- * 
00704  * error:
00705  * ------------------------------------------------------------------------- */
00706 int phNodeComm::error()
00707 {
00708     phFUNCTION("phNodeComm::error")
00709     int locked = 0;
00710 
00711     phTHIS_LOCK(locked);
00712 
00713     /* phPROGRESS("\n"); */
00714    
00715     phTHIS_UNLOCK(locked);
00716     
00717     return phSUCCESS;
00718 error:
00719     phTHIS_ERROR_UNLOCK(locked);
00720     
00721     return phFAIL;
00722 }
00723 
00724 




Copyright (C) 2002 - 2007 Philip D.S. Thoren ( pthoren@users.sourceforge.net )
University Of Massachusetts at Lowell
Robotics Lab
SourceForge.net Logo

Generated on Sat Jun 16 02:44:06 2007 for phission by  doxygen 1.4.4