00001 /* --------------------------------------------------------------------------- 00002 Phission : 00003 Realtime Vision Processing System 00004 00005 Copyright (C) 2003 Philip D.S. Thoren (pthoren@cs.uml.edu) 00006 University of Massachusetts at Lowell, 00007 Laboratory for Artificial Intelligence and Robotics 00008 00009 This file is part of Phission. 00010 00011 ---------------------------------------------------------------------------*/ 00012 #include <phClientHandler.h> 00013 #include <phNodeComm.h> 00014 #include <phLamportSystem.h> 00015 00016 00017 /* -------------------------------------------------------------------------- */ 00018 phLamportSystem::phLamportSystem() 00019 { 00020 phFUNCTION("phLamportSystem::phLamportSystem") 00021 int locked = 0; 00022 00023 phTHIS_LOCK(locked); 00024 00025 this->m_node_id = phID_INVALID; 00026 this->setName("phLamportSystem"); 00027 error: 00028 phTHIS_ERROR_UNLOCK(locked); 00029 } 00030 00031 /* -------------------------------------------------------------------------- */ 00032 phLamportSystem::~phLamportSystem() 00033 { 00034 phFUNCTION("phLamportSystem::~phLamportSystem") 00035 int locked = 0; 00036 00037 phTHIS_LOOSE_LOCK(locked); 00038 00039 } 00040 /* -------------------------------------------------------------------------- * 00041 * setNodeId: 00042 * -------------------------------------------------------------------------- */ 00043 int phLamportSystem::setNodeId(uint32_t node_id) 00044 { 00045 phFUNCTION("phLamportSystem::setNodeId") 00046 int locked = 0; 00047 00048 phTHIS_LOCK(locked); 00049 this->m_node_id = node_id; 00050 00051 phTHIS_UNLOCK(locked); 00052 00053 return phSUCCESS; 00054 error: 00055 phTHIS_ERROR_UNLOCK(locked); 00056 00057 return phFAIL; 00058 } 00059 /* -------------------------------------------------------------------------- * 00060 * 00061 * -------------------------------------------------------------------------- */ 00062 uint32_t phLamportSystem::getNodeId() 00063 { 00064 return this->m_node_id; 00065 } 00066 /* -------------------------------------------------------------------------- */ 00067 int phLamportSystem::registerResource( phClientHandler *client ) 00068 { 00069 phFUNCTION("phLamportSystem::registerResource") 00070 int locked = 0; 00071 phObjectNode *node = NULL; 00072 00073 phTHIS_LOCK(locked); 00074 00075 node = this->m_client_list.pushObject( client ); 00076 phCHECK_NULLPTR(node,NULL,"client_list.pushObject"); 00077 00078 rc = node->disableAutoDelete(); 00079 phPRINT_RC(rc,NULL,"disableAutoDelete failed."); 00080 00081 phTHIS_UNLOCK(locked); 00082 00083 return phSUCCESS; 00084 error: 00085 phTHIS_ERROR_UNLOCK(locked); 00086 00087 return phFAIL; 00088 } 00089 00090 /* -------------------------------------------------------------------------- */ 00091 int phLamportSystem::removeResource( phClientHandler *client ) 00092 { 00093 phFUNCTION("phLamportSystem::removeResource") 00094 int locked = 0; 00095 phObject *obj = NULL; 00096 00097 phTHIS_LOCK(locked); 00098 00099 obj = this->m_client_list.removeObject( client ); 00100 phCHECK_NULLPTR(obj,NULL,"m_client_list.removeObject failed."); 00101 00102 phTHIS_UNLOCK(locked); 00103 00104 return phSUCCESS; 00105 error: 00106 phTHIS_ERROR_UNLOCK(locked); 00107 00108 return phFAIL; 00109 } 00110 00111 /* -------------------------------------------------------------------------- */ 00112 phClientHandler *phLamportSystem::getClient( uint32_t id ) 00113 { 00114 phFUNCTION("phLamportSystem::getClient") 00115 int locked = 0; 00116 phClientHandler *client = NULL; 00117 phObjectNode *node = NULL; 00118 00119 phTHIS_LOCK(locked); 00120 00121 node = this->m_client_list.getHeadObject(); 00122 00123 /* Loop through all the displays in the list */ 00124 while (node != NULL) 00125 { 00126 client = (phClientHandler *)node->getObject(); 00127 00128 if (client != NULL) 00129 { 00130 if (client->getClientId() == id) 00131 { 00132 goto success; 00133 } 00134 00135 node = node->getNextObject(); 00136 } 00137 } 00138 00139 /* we didn't find the match, return NULL */ 00140 client = NULL; 00141 success: 00142 phTHIS_UNLOCK(locked); 00143 00144 return client; 00145 error: 00146 phTHIS_ERROR_UNLOCK(locked); 00147 00148 return NULL; 00149 } 00150 00151 /* -------------------------------------------------------------------------- */ 00152 int phLamportSystem::registerResource( phNodeComm *node ) 00153 { 00154 phFUNCTION("phLamportSystem::registerResource") 00155 int locked = 0; 00156 00157 phObjectNode *n = NULL; 00158 00159 phTHIS_LOCK(locked); 00160 00161 n = this->m_node_list.pushObject( node ); 00162 phCHECK_NULLPTR(node,NULL,"node_list.pushObject"); 00163 00164 rc = n->disableAutoDelete(); 00165 phPRINT_RC(rc,NULL,"disableAutoDelete failed."); 00166 00167 phTHIS_UNLOCK(locked); 00168 00169 return phSUCCESS; 00170 error: 00171 phTHIS_ERROR_UNLOCK(locked); 00172 00173 return phFAIL; 00174 } 00175 00176 /* -------------------------------------------------------------------------- */ 00177 int phLamportSystem::removeResource( phNodeComm *node ) 00178 { 00179 phFUNCTION("phLamportSystem::removeResource") 00180 int locked = 0; 00181 phObject *obj = NULL; 00182 00183 phTHIS_LOCK(locked); 00184 00185 obj = this->m_node_list.removeObject( node ); 00186 phCHECK_NULLPTR(obj,NULL,"m_node_list.removeObject failed."); 00187 00188 phTHIS_UNLOCK(locked); 00189 00190 return phSUCCESS; 00191 error: 00192 phTHIS_ERROR_UNLOCK(locked); 00193 00194 return phFAIL; 00195 } 00196 00197 /* -------------------------------------------------------------------------- */ 00198 phNodeComm *phLamportSystem::getNode( uint32_t id ) 00199 { 00200 phFUNCTION("phLamportSystem::getNode") 00201 int locked = 0; 00202 phNodeComm *node = NULL; 00203 phObjectNode *n = NULL; 00204 00205 00206 phTHIS_LOCK(locked); 00207 00208 n = this->m_node_list.getHeadObject(); 00209 00210 /* Loop through all the displays in the list */ 00211 while (n != NULL) 00212 { 00213 node = (phNodeComm *)n->getObject(); 00214 00215 if (node != NULL) 00216 { 00217 if (node->getId() == id) 00218 { 00219 goto success; 00220 } 00221 00222 n = n->getNextObject(); 00223 } 00224 } 00225 00226 /* we didn't find the match, return NULL */ 00227 node = NULL; 00228 success: 00229 phTHIS_UNLOCK(locked); 00230 00231 return node; 00232 error: 00233 phTHIS_ERROR_UNLOCK(locked); 00234 00235 return NULL; 00236 } 00237 00238 /* ------------------------------------------------------------------------ */ 00239 int phLamportSystem::broadcast_message(uint32_t message_id, 00240 uint32_t mutex_id, 00241 uint32_t timestamp ) 00242 { 00243 phFUNCTION("phLamportSystem::broadcast_message") 00244 int locked = 0; 00245 00246 phMessage message; 00247 uint32_t i = 0; 00248 00249 phObjectNode *n = NULL; 00250 phNodeComm *node = NULL; 00251 00252 phTHIS_LOCK(locked); 00253 00254 /* 1.) Setup the message to be sent to another node by setting 00255 * the appropriate settings, HA, blah blah etc. */ 00256 00257 /* a.) Set the srcId, src class, dst class, message_id */ 00258 rc = message.set(this->getNodeId(), phID_INVALID, 00259 phCLASS_NODE, phCLASS_NODE, 00260 message_id, mutex_id, 00261 timestamp ); 00262 phPRINT_RC(rc,NULL,"message.set"); 00263 00264 /* 2.) Send a NODE_REQUEST out through all the node 00265 * comm channels */ 00266 n = this->m_node_list.getHeadObject(); 00267 00268 /* Loop through all the displays in the list */ 00269 while (n != NULL) 00270 { 00271 node = (phNodeComm *)n->getObject(); 00272 00273 if (node != NULL) 00274 { 00275 /* set the destination id to that of the node */ 00276 rc = message.setDstId(node->getOtherSideId()); 00277 phPRINT_RC(rc,NULL,"message.setDstId()"); 00278 00279 DEBUG_PRINT("Sending to node[%u]:%u\n", 00280 i, 00281 node->getOtherSideId()); 00282 00283 rc = message.send(node->getSocket()); 00284 phPRINT_RC(rc,NULL, 00285 "message.send(node->getSocket():%p for node[%u]:%u failed", 00286 node->getSocket(), i, node->getOtherSideId()); 00287 00288 message.print(stderr,2); 00289 00290 DEBUG_PRINT("Sent to node[%u]:%u\n", 00291 i,node->getOtherSideId()); 00292 00293 n = n->getNextObject(); 00294 i++; 00295 } 00296 } 00297 00298 success: 00299 phTHIS_UNLOCK(locked); 00300 00301 return phSUCCESS; 00302 error: 00303 phTHIS_ERROR_UNLOCK(locked); 00304 00305 return phFAIL; 00306 } 00307 00308 /* ------------------------------------------------------------------------ */ 00309 int phLamportSystem::send_message(uint32_t message_id, 00310 uint32_t dst_id, 00311 uint32_t mutex_id, 00312 uint32_t timestamp ) 00313 { 00314 phFUNCTION("phLamportSystem::send_message") 00315 int locked = 0; 00316 00317 phMessage message; 00318 uint32_t i = 0; 00319 00320 phObjectNode *n = NULL; 00321 phNodeComm *node = NULL; 00322 00323 phTHIS_LOCK(locked); 00324 00325 /* 1.) Setup the message to be sent to another node by setting 00326 * the appropriate settings, HA, blah blah etc. */ 00327 00328 /* a.) Set the srcId, src class, dst class, message_id */ 00329 rc = message.set(this->getNodeId(), dst_id, 00330 phCLASS_NODE, phCLASS_NODE, 00331 message_id, mutex_id, 00332 timestamp ); 00333 phPRINT_RC(rc,NULL,"message.set"); 00334 00335 /* 2.) Send a NODE_REQUEST out through all the node 00336 * comm channels */ 00337 n = this->m_node_list.getHeadObject(); 00338 00339 /* Loop through all the displays in the list */ 00340 while (n != NULL) 00341 { 00342 node = (phNodeComm *)n->getObject(); 00343 00344 if (node != NULL) 00345 { 00346 if (node->getOtherSideId() == dst_id) 00347 { 00348 DEBUG_PRINT("Sending to node[%u]:%u\n", 00349 i, 00350 node->getOtherSideId()); 00351 00352 rc = message.send(node->getSocket()); 00353 phPRINT_RC(rc,NULL, 00354 "message.send(node->getSocket():%p for node[%u]:%u failed", 00355 node->getSocket(), i, node->getOtherSideId()); 00356 00357 DEBUG_PRINT("Sent to node[%u]:%u\n", 00358 i,node->getOtherSideId()); 00359 } 00360 00361 n = n->getNextObject(); 00362 i++; 00363 } 00364 } 00365 00366 success: 00367 phTHIS_UNLOCK(locked); 00368 00369 return phSUCCESS; 00370 error: 00371 phTHIS_ERROR_UNLOCK(locked); 00372 00373 return phFAIL; 00374 } 00375 00376 /* -------------------------------------------------------------------------- */ 00377 int phLamportSystem::handle_message( phMessage message ) 00378 { 00379 phFUNCTION("phLamportSystem::handle_message") 00380 int locked = 0; 00381 00382 uint32_t i = 0; 00383 uint32_t dstId = phID_INVALID; 00384 phRequestList *reqlist = NULL; 00385 phMessageNode *msgNode = NULL; 00386 int check_head_node = 0; 00387 char action[255]; 00388 00389 sprintf(action,"None"); 00390 00391 phTHIS_LOCK(locked); 00392 00393 if (!((message.getMessageId() == phCLIENT_REQUEST) || 00394 (message.getMessageId() == phCLIENT_RELEASE) || 00395 (message.getMessageId() == phNODE_REQUEST) || 00396 (message.getMessageId() == phNODE_REPLY) || 00397 (message.getMessageId() == phNODE_RELEASE))) 00398 { 00399 phPROGRESS("*** WARNING ***: Unrecognized message\n"); 00400 rc = message.print(stderr); 00401 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00402 goto success; 00403 } 00404 00405 DEBUG_PRINT("phLamportSystem handling message:\n"); 00406 00407 /* 1.) retreive the list */ 00408 reqlist = this->m_db.getRequestList(message.getMutexId()); 00409 phCHECK_NULLPTR(reqlist,NULL, 00410 "m_db.getRequestList(%d)", 00411 message.getMutexId()); 00412 00413 if (message.getMessageId() == phCLIENT_REQUEST) 00414 { 00415 DEBUG_PRINT("\n"); 00416 00417 /* 00418 rc = message.print(stderr); 00419 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00420 */ 00421 /* Tick the clock, set the timestamp to the new value */ 00422 rc = message.setTimestamp( reqlist->tick() ); 00423 phPRINT_RC(rc,NULL,"message.setTimestamp:%d", 00424 reqlist->getTimestamp()); 00425 00426 sprintf(action,"C-REQUEST:%u",message.getTimestamp()); 00427 00428 /* 2.) Add the message with timestamp to the list */ 00429 00430 /* This is coming from a client to the node so we need to 00431 * swap the IDs of each */ 00432 /* dstId - client origin */ 00433 dstId = message.getSrcId(); 00434 00435 rc = message.setSrcId(this->getNodeId()); 00436 phPRINT_RC(rc,NULL,"setSrcId"); 00437 00438 /* set the client origin */ 00439 rc = message.setDstId(dstId); 00440 phPRINT_RC(rc,NULL,"setDstId"); 00441 00442 rc = message.setSrcClass(phCLASS_NODE); 00443 phPRINT_RC(rc,NULL,"message.setSrcClass"); 00444 00445 rc = message.setDstClass(phCLASS_CLIENT); 00446 phPRINT_RC(rc,NULL,"message.setDstClass"); 00447 00448 /* 00449 phPROGRESS("adding:\n"); 00450 rc = message.print(stderr); 00451 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00452 */ 00453 00454 /* Add the request to the request list */ 00455 msgNode = reqlist->addMessage(message, 00456 this->m_node_list.length()); 00457 phCHECK_NULLPTR(msgNode,NULL,"reqlist->addMessage(msg,%u) failed", 00458 this->m_node_list.length()); 00459 00460 /* 3.) Broadcast the request message to everyone else */ 00461 rc = this->broadcast_message(phNODE_REQUEST, 00462 message.getMutexId(), 00463 message.getTimestamp() ); 00464 phPRINT_RC(rc,NULL,"broadcast_message failed."); 00465 00466 /* 00467 phPROGRESS("Waiting on:\n"); 00468 msgNode->getMessage().print(stderr); 00469 */ 00470 00471 reqlist->print(stderr,action); 00472 00473 /* 4.) Finally, this is a CLIENT_REQUEST so we need 00474 * to wait to get permission to continue */ 00475 /* unlock the clock, we'll not be changing anything 00476 * else in the clock and the msgNode won't be 00477 * removed until we've sent a CLIENT_RELEASE */ 00478 phTHIS_UNLOCK(locked); 00479 00480 rc = msgNode->messageWait(); 00481 phPRINT_RC(rc,NULL,"msgNode->messageWait failed."); 00482 00483 /* 00484 phPROGRESS("Awake on:\n"); 00485 msgNode->getMessage().print(stderr); 00486 */ 00487 00488 } 00489 else if (message.getMessageId() == phCLIENT_RELEASE) 00490 { 00491 DEBUG_PRINT("\n"); 00492 /* 00493 rc = message.print(stderr); 00494 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00495 */ 00496 00497 /* Tick the clock, set the timestamp to the new value */ 00498 rc = message.setTimestamp( reqlist->tick() ); 00499 phPRINT_RC(rc,NULL,"message.setTimestamp:%d", 00500 reqlist->getTimestamp()); 00501 00502 sprintf(action,"C-RELEASE:%u",message.getTimestamp()); 00503 00504 /* remove the head request to the request list */ 00505 rc = reqlist->removeHeadRequest(); 00506 phCHECK_RC(rc,NULL,"reqlist->removeHeadRequest() failed"); 00507 00508 /* Broadcast the request message to everyone else */ 00509 rc = this->broadcast_message(phNODE_RELEASE, 00510 message.getMutexId(), 00511 message.getTimestamp() ); 00512 phPRINT_RC(rc,NULL,"broadcast_message failed."); 00513 00514 check_head_node = 1; 00515 } 00516 else if (message.getMessageId() == phNODE_REQUEST) 00517 { 00518 sprintf(action,"N-REQUEST:%u",message.getTimestamp()); 00519 DEBUG_PRINT("\n"); 00520 /* 00521 rc = message.print(stderr); 00522 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00523 */ 00524 00525 /* Adjust the timestamp */ 00526 reqlist->tick(message.getTimestamp()); 00527 00528 /* This is coming from a client to the node so we need to 00529 * swap the IDs of each */ 00530 /* dstId - our current node */ 00531 if (message.getDstId() != this->getNodeId()) 00532 { 00533 phPROGRESS("** Warning ***: received node request w/o our id\n"); 00534 } 00535 00536 /* Add the request to the request list */ 00537 msgNode = reqlist->addMessage(message, 00538 this->m_node_list.length()); 00539 phCHECK_NULLPTR(msgNode,NULL,"reqlist->addMessage(msg,%u) failed", 00540 this->m_node_list.length()); 00541 00542 /* 3.) Broadcast the request message to everyone else */ 00543 rc = this->send_message(phNODE_REPLY, 00544 message.getSrcId(), 00545 message.getMutexId(), 00546 reqlist->getTimestamp() ); 00547 phPRINT_RC(rc,NULL,"send_message failed."); 00548 } 00549 else if (message.getMessageId() == phNODE_REPLY) 00550 { 00551 sprintf(action,"N-REPLY:%u",message.getTimestamp()); 00552 DEBUG_PRINT("\n"); 00553 /* 00554 rc = message.print(stderr); 00555 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00556 */ 00557 reqlist->tick(message.getTimestamp()); 00558 00559 /* This is coming from a client to the node so we need to 00560 * swap the IDs of each */ 00561 /* dstId - our current node */ 00562 if (message.getDstId() != this->getNodeId()) 00563 { 00564 phPROGRESS("** Warning ***: received node request w/o our id\n"); 00565 } 00566 00567 /* Add the request to the request list */ 00568 msgNode = reqlist->addMessage(message, 00569 this->m_node_list.length()); 00570 phCHECK_NULLPTR(msgNode,NULL,"reqlist->addMessage(msg,%u) failed", 00571 this->m_node_list.length()); 00572 00573 check_head_node = 1; 00574 00575 } 00576 else if (message.getMessageId() == phNODE_RELEASE) 00577 { 00578 sprintf(action,"N-RELEASE:%u",message.getTimestamp()); 00579 DEBUG_PRINT("\n"); 00580 /* 00581 rc = message.print(stderr); 00582 phPRINT_RC(rc,NULL,"message.print(stderr)"); 00583 */ 00584 reqlist->tick(message.getTimestamp()); 00585 00586 /* remove the head request to the request list */ 00587 rc = reqlist->removeHeadRequest(); 00588 phCHECK_RC(rc,NULL,"reqlist->removeHeadRequest() failed"); 00589 00590 check_head_node = 1; 00591 } 00592 00593 /* reqlist will be set correctly if this condition is set */ 00594 if (check_head_node) 00595 { 00596 /* Check the head node, see if it's belongs to this node, 00597 * release it */ 00598 msgNode = reqlist->getHeadRequest(); 00599 if (msgNode != NULL) 00600 { 00601 if ((msgNode->getTotalReplies() >= this->m_node_list.length()) && 00602 (msgNode->getMessage().getSrcId() ==this->getNodeId())) 00603 { 00604 /* 00605 phPROGRESS("Waking:\n"); 00606 msgNode->getMessage().print(stderr); 00607 */ 00608 rc = msgNode->messageRelease(); 00609 phPRINT_RC(rc,NULL,"msgNode->messageRelease"); 00610 } 00611 } 00612 } 00613 00614 success: 00615 if (locked) 00616 { 00617 reqlist->print(stderr,action); 00618 } 00619 phTHIS_UNLOCK(locked); 00620 00621 return phSUCCESS; 00622 error: 00623 phTHIS_ERROR_UNLOCK(locked); 00624 00625 return phFAIL; 00626 } 00627 00628 00629 00630
Copyright (C) 2002 - 2007 |
Philip D.S. Thoren ( pthoren@users.sourceforge.net ) University Of Massachusetts at Lowell Robotics Lab |