Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

phLamportSystem.cpp

Go to the documentation of this file.
00001 /* ---------------------------------------------------------------------------
00002     Phission : 
00003         Realtime Vision Processing System
00004 
00005     Copyright (C) 2003 Philip D.S. Thoren (pthoren@cs.uml.edu)
00006     University of Massachusetts at Lowell,
00007     Laboratory for Artificial Intelligence and Robotics
00008 
00009     This file is part of Phission.
00010 
00011  ---------------------------------------------------------------------------*/
00012 #include <phClientHandler.h>
00013 #include <phNodeComm.h>
00014 #include <phLamportSystem.h>
00015 
00016 
00017 /* -------------------------------------------------------------------------- */
00018 phLamportSystem::phLamportSystem()
00019 {
00020     phFUNCTION("phLamportSystem::phLamportSystem")
00021     int locked = 0;
00022 
00023     phTHIS_LOCK(locked);
00024 
00025     this->m_node_id = phID_INVALID;
00026     this->setName("phLamportSystem");
00027 error:
00028     phTHIS_ERROR_UNLOCK(locked);
00029 }
00030 
00031 /* -------------------------------------------------------------------------- */
00032 phLamportSystem::~phLamportSystem()
00033 {
00034     phFUNCTION("phLamportSystem::~phLamportSystem")
00035     int locked = 0;
00036 
00037     phTHIS_LOOSE_LOCK(locked);
00038     
00039 }
00040 /* -------------------------------------------------------------------------- *
00041  * setNodeId:
00042  * -------------------------------------------------------------------------- */
00043 int phLamportSystem::setNodeId(uint32_t node_id)
00044 {
00045     phFUNCTION("phLamportSystem::setNodeId")
00046     int locked = 0;
00047 
00048     phTHIS_LOCK(locked);
00049     this->m_node_id = node_id;
00050     
00051     phTHIS_UNLOCK(locked);
00052     
00053     return phSUCCESS;
00054 error:
00055     phTHIS_ERROR_UNLOCK(locked);
00056     
00057     return phFAIL;
00058 }
00059 /* -------------------------------------------------------------------------- *
00060  * 
00061  * -------------------------------------------------------------------------- */
00062 uint32_t phLamportSystem::getNodeId() 
00063 { 
00064     return this->m_node_id; 
00065 }
00066 /* -------------------------------------------------------------------------- */
00067 int phLamportSystem::registerResource( phClientHandler *client )
00068 {
00069     phFUNCTION("phLamportSystem::registerResource")
00070     int locked = 0;
00071     phObjectNode *node = NULL;
00072 
00073     phTHIS_LOCK(locked);
00074 
00075     node = this->m_client_list.pushObject( client );
00076     phCHECK_NULLPTR(node,NULL,"client_list.pushObject");
00077 
00078     rc = node->disableAutoDelete(); 
00079     phPRINT_RC(rc,NULL,"disableAutoDelete failed.");
00080 
00081     phTHIS_UNLOCK(locked);
00082 
00083     return phSUCCESS;
00084 error:
00085     phTHIS_ERROR_UNLOCK(locked);
00086 
00087     return phFAIL;
00088 }
00089 
00090 /* -------------------------------------------------------------------------- */
00091 int phLamportSystem::removeResource( phClientHandler *client )
00092 {
00093     phFUNCTION("phLamportSystem::removeResource")
00094     int locked = 0;
00095     phObject *obj = NULL;
00096 
00097     phTHIS_LOCK(locked);
00098 
00099     obj = this->m_client_list.removeObject( client );
00100     phCHECK_NULLPTR(obj,NULL,"m_client_list.removeObject failed.");
00101     
00102     phTHIS_UNLOCK(locked);
00103 
00104     return phSUCCESS;
00105 error:
00106     phTHIS_ERROR_UNLOCK(locked);
00107 
00108     return phFAIL;
00109 }
00110     
00111 /* -------------------------------------------------------------------------- */
00112 phClientHandler *phLamportSystem::getClient( uint32_t id )
00113 {
00114     phFUNCTION("phLamportSystem::getClient")
00115     int locked = 0;
00116     phClientHandler *client = NULL;
00117     phObjectNode    *node = NULL;
00118 
00119     phTHIS_LOCK(locked);
00120 
00121     node = this->m_client_list.getHeadObject();
00122     
00123     /*  Loop through all the displays in the list */
00124     while (node != NULL)
00125     {
00126         client = (phClientHandler *)node->getObject();
00127         
00128         if (client != NULL)
00129         {
00130             if (client->getClientId() == id)
00131             {
00132                 goto success;
00133             }
00134 
00135             node = node->getNextObject();
00136         }
00137     }
00138    
00139     /* we didn't find the match, return NULL */
00140     client = NULL;
00141 success:
00142     phTHIS_UNLOCK(locked);
00143 
00144     return client;
00145 error:
00146     phTHIS_ERROR_UNLOCK(locked);
00147 
00148     return NULL;
00149 }
00150     
00151 /* -------------------------------------------------------------------------- */
00152 int phLamportSystem::registerResource( phNodeComm *node )
00153 {
00154     phFUNCTION("phLamportSystem::registerResource")
00155     int locked = 0;
00156 
00157     phObjectNode *n = NULL;
00158 
00159     phTHIS_LOCK(locked);
00160 
00161     n = this->m_node_list.pushObject( node );
00162     phCHECK_NULLPTR(node,NULL,"node_list.pushObject");
00163 
00164     rc = n->disableAutoDelete(); 
00165     phPRINT_RC(rc,NULL,"disableAutoDelete failed.");
00166 
00167     phTHIS_UNLOCK(locked);
00168 
00169     return phSUCCESS;
00170 error:
00171     phTHIS_ERROR_UNLOCK(locked);
00172 
00173     return phFAIL;
00174 }
00175 
00176 /* -------------------------------------------------------------------------- */
00177 int phLamportSystem::removeResource( phNodeComm *node )
00178 {
00179     phFUNCTION("phLamportSystem::removeResource")
00180     int locked = 0;
00181     phObject *obj = NULL;
00182 
00183     phTHIS_LOCK(locked);
00184 
00185     obj = this->m_node_list.removeObject( node );
00186     phCHECK_NULLPTR(obj,NULL,"m_node_list.removeObject failed.");
00187     
00188     phTHIS_UNLOCK(locked);
00189 
00190     return phSUCCESS;
00191 error:
00192     phTHIS_ERROR_UNLOCK(locked);
00193 
00194     return phFAIL;
00195 }
00196     
00197 /* -------------------------------------------------------------------------- */
00198 phNodeComm *phLamportSystem::getNode( uint32_t id )
00199 {
00200     phFUNCTION("phLamportSystem::getNode")
00201     int locked = 0;
00202     phNodeComm      *node = NULL;
00203     phObjectNode    *n = NULL;
00204 
00205 
00206     phTHIS_LOCK(locked);
00207 
00208     n = this->m_node_list.getHeadObject();
00209     
00210     /*  Loop through all the displays in the list */
00211     while (n != NULL)
00212     {
00213         node = (phNodeComm *)n->getObject();
00214         
00215         if (node != NULL)
00216         {
00217             if (node->getId() == id)
00218             {
00219                 goto success;
00220             }
00221 
00222             n = n->getNextObject();
00223         }
00224     }
00225 
00226     /* we didn't find the match, return NULL */
00227     node = NULL;
00228 success:
00229     phTHIS_UNLOCK(locked);
00230 
00231     return node;
00232 error:
00233     phTHIS_ERROR_UNLOCK(locked);
00234 
00235     return NULL;
00236 }
00237 
00238 /* ------------------------------------------------------------------------ */
00239 int phLamportSystem::broadcast_message(uint32_t message_id,
00240                                        uint32_t mutex_id,
00241                                        uint32_t timestamp )
00242 {
00243     phFUNCTION("phLamportSystem::broadcast_message")
00244     int locked = 0;
00245     
00246     phMessage   message;
00247     uint32_t    i = 0;
00248     
00249     phObjectNode    *n          = NULL;
00250     phNodeComm      *node       = NULL;
00251     
00252     phTHIS_LOCK(locked);
00253     
00254     /* 1.) Setup the message to be sent to another node by setting
00255      * the appropriate settings, HA, blah blah etc. */
00256     
00257     /* a.) Set the srcId, src class, dst class, message_id */
00258     rc = message.set(this->getNodeId(), phID_INVALID,
00259                      phCLASS_NODE, phCLASS_NODE,
00260                      message_id, mutex_id,
00261                      timestamp );
00262     phPRINT_RC(rc,NULL,"message.set");
00263         
00264     /* 2.) Send a NODE_REQUEST out through all the node 
00265      * comm channels */
00266     n = this->m_node_list.getHeadObject();
00267     
00268     /*  Loop through all the displays in the list */
00269     while (n != NULL)
00270     {
00271         node = (phNodeComm *)n->getObject();
00272         
00273         if (node != NULL)
00274         {
00275             /* set the destination id to that of the node */
00276             rc = message.setDstId(node->getOtherSideId());
00277             phPRINT_RC(rc,NULL,"message.setDstId()");
00278 
00279             DEBUG_PRINT("Sending to node[%u]:%u\n",
00280                      i,
00281                      node->getOtherSideId());
00282 
00283             rc = message.send(node->getSocket());
00284             phPRINT_RC(rc,NULL,
00285              "message.send(node->getSocket():%p for node[%u]:%u failed",
00286              node->getSocket(), i, node->getOtherSideId());
00287 
00288             message.print(stderr,2);
00289             
00290             DEBUG_PRINT("Sent to node[%u]:%u\n",
00291                      i,node->getOtherSideId());
00292             
00293             n = n->getNextObject();
00294             i++;
00295         }
00296     }
00297 
00298 success:
00299     phTHIS_UNLOCK(locked);
00300 
00301     return phSUCCESS;
00302 error:
00303     phTHIS_ERROR_UNLOCK(locked);
00304     
00305     return phFAIL;
00306 }
00307 
00308 /* ------------------------------------------------------------------------ */
00309 int phLamportSystem::send_message(uint32_t message_id,
00310                                   uint32_t dst_id,
00311                                   uint32_t mutex_id,
00312                                   uint32_t timestamp )
00313 {
00314     phFUNCTION("phLamportSystem::send_message")
00315     int locked = 0;
00316     
00317     phMessage   message;
00318     uint32_t    i = 0;
00319     
00320     phObjectNode    *n          = NULL;
00321     phNodeComm      *node       = NULL;
00322     
00323     phTHIS_LOCK(locked);
00324     
00325     /* 1.) Setup the message to be sent to another node by setting
00326      * the appropriate settings, HA, blah blah etc. */
00327     
00328     /* a.) Set the srcId, src class, dst class, message_id */
00329     rc = message.set(this->getNodeId(), dst_id,
00330                      phCLASS_NODE, phCLASS_NODE,
00331                      message_id, mutex_id,
00332                      timestamp );
00333     phPRINT_RC(rc,NULL,"message.set");
00334         
00335     /* 2.) Send a NODE_REQUEST out through all the node 
00336      * comm channels */
00337     n = this->m_node_list.getHeadObject();
00338     
00339     /*  Loop through all the displays in the list */
00340     while (n != NULL)
00341     {
00342         node = (phNodeComm *)n->getObject();
00343         
00344         if (node != NULL)
00345         {
00346             if (node->getOtherSideId() == dst_id)
00347             {
00348                 DEBUG_PRINT("Sending to node[%u]:%u\n",
00349                          i,
00350                          node->getOtherSideId());
00351 
00352                 rc = message.send(node->getSocket());
00353                 phPRINT_RC(rc,NULL,
00354                  "message.send(node->getSocket():%p for node[%u]:%u failed",
00355                  node->getSocket(), i, node->getOtherSideId());
00356             
00357                 DEBUG_PRINT("Sent to node[%u]:%u\n",
00358                          i,node->getOtherSideId());
00359             }
00360 
00361             n = n->getNextObject();
00362             i++;
00363         }
00364     }
00365 
00366 success:
00367     phTHIS_UNLOCK(locked);
00368 
00369     return phSUCCESS;
00370 error:
00371     phTHIS_ERROR_UNLOCK(locked);
00372     
00373     return phFAIL;
00374 }
00375 
00376 /* -------------------------------------------------------------------------- */
00377 int phLamportSystem::handle_message( phMessage message )
00378 {
00379     phFUNCTION("phLamportSystem::handle_message")
00380     int locked = 0;
00381     
00382     uint32_t        i           = 0;
00383     uint32_t        dstId       = phID_INVALID;
00384     phRequestList   *reqlist    = NULL;
00385     phMessageNode   *msgNode    = NULL;
00386     int             check_head_node = 0;
00387     char action[255];
00388    
00389     sprintf(action,"None");
00390     
00391     phTHIS_LOCK(locked);
00392 
00393     if (!((message.getMessageId() == phCLIENT_REQUEST) ||
00394           (message.getMessageId() == phCLIENT_RELEASE) ||
00395           (message.getMessageId() == phNODE_REQUEST) ||
00396           (message.getMessageId() == phNODE_REPLY) ||
00397           (message.getMessageId() == phNODE_RELEASE)))
00398     {
00399         phPROGRESS("*** WARNING ***: Unrecognized message\n");
00400         rc = message.print(stderr);
00401         phPRINT_RC(rc,NULL,"message.print(stderr)");
00402         goto success;
00403     }
00404     
00405     DEBUG_PRINT("phLamportSystem handling message:\n");
00406     
00407     /* 1.) retreive the list */
00408     reqlist = this->m_db.getRequestList(message.getMutexId());
00409     phCHECK_NULLPTR(reqlist,NULL,
00410         "m_db.getRequestList(%d)",
00411         message.getMutexId());
00412 
00413     if (message.getMessageId() == phCLIENT_REQUEST)
00414     {
00415         DEBUG_PRINT("\n");
00416 
00417         /*
00418         rc = message.print(stderr);
00419         phPRINT_RC(rc,NULL,"message.print(stderr)");
00420         */
00421         /* Tick the clock, set the timestamp to the new value */
00422         rc = message.setTimestamp( reqlist->tick() );
00423         phPRINT_RC(rc,NULL,"message.setTimestamp:%d",
00424                 reqlist->getTimestamp());
00425 
00426         sprintf(action,"C-REQUEST:%u",message.getTimestamp());
00427         
00428         /* 2.) Add the message with timestamp to the list */
00429         
00430         /* This is coming from a client to the node so we need to 
00431          * swap the IDs of each */
00432         /* dstId - client origin */
00433         dstId = message.getSrcId();
00434         
00435         rc = message.setSrcId(this->getNodeId());
00436         phPRINT_RC(rc,NULL,"setSrcId");
00437 
00438         /* set the client origin */
00439         rc = message.setDstId(dstId);
00440         phPRINT_RC(rc,NULL,"setDstId");
00441        
00442         rc = message.setSrcClass(phCLASS_NODE);
00443         phPRINT_RC(rc,NULL,"message.setSrcClass");
00444         
00445         rc = message.setDstClass(phCLASS_CLIENT);
00446         phPRINT_RC(rc,NULL,"message.setDstClass");
00447         
00448         /*
00449         phPROGRESS("adding:\n");
00450         rc = message.print(stderr);
00451         phPRINT_RC(rc,NULL,"message.print(stderr)");
00452         */
00453 
00454         /* Add the request to the request list */
00455         msgNode = reqlist->addMessage(message,
00456                                       this->m_node_list.length());
00457         phCHECK_NULLPTR(msgNode,NULL,"reqlist->addMessage(msg,%u) failed",
00458                       this->m_node_list.length());
00459       
00460         /* 3.) Broadcast the request message to everyone else */
00461         rc = this->broadcast_message(phNODE_REQUEST,
00462                                      message.getMutexId(),
00463                                      message.getTimestamp() );
00464         phPRINT_RC(rc,NULL,"broadcast_message failed.");
00465 
00466         /*
00467         phPROGRESS("Waiting on:\n");
00468         msgNode->getMessage().print(stderr);
00469         */
00470         
00471         reqlist->print(stderr,action);
00472         
00473         /* 4.) Finally, this is a CLIENT_REQUEST so we need
00474          * to wait to get permission to continue */
00475         /* unlock the clock, we'll not be changing anything 
00476          * else in the clock and the msgNode won't be
00477          * removed until we've sent a CLIENT_RELEASE */
00478         phTHIS_UNLOCK(locked);
00479 
00480         rc = msgNode->messageWait();
00481         phPRINT_RC(rc,NULL,"msgNode->messageWait failed.");
00482         
00483         /*
00484         phPROGRESS("Awake on:\n");
00485         msgNode->getMessage().print(stderr);
00486         */
00487         
00488     }
00489     else if (message.getMessageId() == phCLIENT_RELEASE)
00490     {
00491         DEBUG_PRINT("\n");
00492         /* 
00493         rc = message.print(stderr);
00494         phPRINT_RC(rc,NULL,"message.print(stderr)");
00495         */
00496 
00497         /* Tick the clock, set the timestamp to the new value */
00498         rc = message.setTimestamp( reqlist->tick() );
00499         phPRINT_RC(rc,NULL,"message.setTimestamp:%d",
00500                  reqlist->getTimestamp());
00501 
00502         sprintf(action,"C-RELEASE:%u",message.getTimestamp());
00503         
00504         /* remove the head request to the request list */
00505         rc = reqlist->removeHeadRequest();
00506         phCHECK_RC(rc,NULL,"reqlist->removeHeadRequest() failed");
00507       
00508         /* Broadcast the request message to everyone else */
00509         rc = this->broadcast_message(phNODE_RELEASE,
00510                                      message.getMutexId(),
00511                                      message.getTimestamp() );
00512         phPRINT_RC(rc,NULL,"broadcast_message failed.");
00513 
00514         check_head_node = 1;
00515     }
00516     else if (message.getMessageId() == phNODE_REQUEST)
00517     {
00518         sprintf(action,"N-REQUEST:%u",message.getTimestamp());
00519         DEBUG_PRINT("\n");
00520         /*
00521         rc = message.print(stderr);
00522         phPRINT_RC(rc,NULL,"message.print(stderr)");
00523         */
00524         
00525         /* Adjust the timestamp */
00526         reqlist->tick(message.getTimestamp());
00527         
00528         /* This is coming from a client to the node so we need to 
00529          * swap the IDs of each */
00530         /* dstId - our current node */
00531         if (message.getDstId() != this->getNodeId())
00532         {
00533             phPROGRESS("** Warning ***: received node request w/o our id\n");
00534         }
00535         
00536         /* Add the request to the request list */
00537         msgNode = reqlist->addMessage(message,
00538                                       this->m_node_list.length());
00539         phCHECK_NULLPTR(msgNode,NULL,"reqlist->addMessage(msg,%u) failed",
00540                       this->m_node_list.length());
00541       
00542         /* 3.) Broadcast the request message to everyone else */
00543         rc = this->send_message(phNODE_REPLY,
00544                                 message.getSrcId(),
00545                                 message.getMutexId(),
00546                                 reqlist->getTimestamp() );
00547         phPRINT_RC(rc,NULL,"send_message failed.");
00548     }
00549     else if (message.getMessageId() == phNODE_REPLY)
00550     {
00551         sprintf(action,"N-REPLY:%u",message.getTimestamp());
00552         DEBUG_PRINT("\n");
00553         /*
00554         rc = message.print(stderr);
00555         phPRINT_RC(rc,NULL,"message.print(stderr)");
00556         */
00557         reqlist->tick(message.getTimestamp());
00558         
00559         /* This is coming from a client to the node so we need to 
00560          * swap the IDs of each */
00561         /* dstId - our current node */
00562         if (message.getDstId() != this->getNodeId())
00563         {
00564             phPROGRESS("** Warning ***: received node request w/o our id\n");
00565         }
00566         
00567         /* Add the request to the request list */
00568         msgNode = reqlist->addMessage(message,
00569                                       this->m_node_list.length());
00570         phCHECK_NULLPTR(msgNode,NULL,"reqlist->addMessage(msg,%u) failed",
00571                       this->m_node_list.length());
00572 
00573         check_head_node = 1;
00574       
00575     }
00576     else if (message.getMessageId() == phNODE_RELEASE)
00577     {
00578         sprintf(action,"N-RELEASE:%u",message.getTimestamp());
00579         DEBUG_PRINT("\n");
00580         /*
00581         rc = message.print(stderr);
00582         phPRINT_RC(rc,NULL,"message.print(stderr)");
00583         */
00584         reqlist->tick(message.getTimestamp());
00585 
00586         /* remove the head request to the request list */
00587         rc = reqlist->removeHeadRequest();
00588         phCHECK_RC(rc,NULL,"reqlist->removeHeadRequest() failed");
00589       
00590         check_head_node = 1;
00591     }
00592 
00593     /* reqlist will be set correctly if this condition is set */
00594     if (check_head_node)
00595     {
00596         /* Check the head node, see if it's belongs to this node,
00597          * release it */
00598         msgNode = reqlist->getHeadRequest();
00599         if (msgNode != NULL)
00600         {
00601             if ((msgNode->getTotalReplies() >= this->m_node_list.length()) &&
00602                 (msgNode->getMessage().getSrcId() ==this->getNodeId()))
00603             {
00604                 /*
00605                 phPROGRESS("Waking:\n");
00606                 msgNode->getMessage().print(stderr);
00607                 */
00608                 rc = msgNode->messageRelease();
00609                 phPRINT_RC(rc,NULL,"msgNode->messageRelease");
00610             }
00611         }            
00612     }
00613 
00614 success:
00615     if (locked)
00616     {
00617         reqlist->print(stderr,action);
00618     }
00619     phTHIS_UNLOCK(locked);
00620 
00621     return phSUCCESS;
00622 error:
00623     phTHIS_ERROR_UNLOCK(locked);
00624     
00625     return phFAIL;
00626 }
00627 
00628 
00629 
00630 




Copyright (C) 2002 - 2007 Philip D.S. Thoren ( pthoren@users.sourceforge.net )
University Of Massachusetts at Lowell
Robotics Lab
SourceForge.net Logo

Generated on Sat Jun 16 02:44:05 2007 for phission by  doxygen 1.4.4