Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

NodeController.cpp

Go to the documentation of this file.
00001 /* ---------------------------------------------------------------------------
00002     Phission : 
00003         Realtime Vision Processing System
00004 
00005     Copyright (C) 2003 Philip D.S. Thoren (pthoren@cs.uml.edu)
00006     University of Massachusetts at Lowell,
00007     Laboratory for Artificial Intelligence and Robotics
00008 
00009     This file is part of Phission.
00010 
00011  ---------------------------------------------------------------------------*/
00012 #include <phission.h>
00013 #include <phNodeComm.h>
00014 #include <phClientListener.h>
00015 #include <phLamportSystem.h>
00016 #include <signal.h>
00017 
00018 unsigned int glbl_main_pid       = 0;
00019 int glbl_in_main_loop   = 0;
00020 int glbl_cleanup        = 0;
00021 
00022 class phLamportSystem;
00023 
00024 /* -------------------------------------------------------------------------- *
00025  * sig_handler:
00026  * -------------------------------------------------------------------------- */
00027 void sig_handler(int sig)
00028 {
00029     phFUNCTION("sig_handler")
00030     
00031     phPROGRESS("sig_handler(signal#%d) count:%d pid:%d main:%d main_loop:%d\n",
00032             sig,
00033             glbl_cleanup,
00034             phGetCurrentThreadId(),
00035             glbl_main_pid,
00036             glbl_in_main_loop);
00037 
00038     if (glbl_main_pid == phGetCurrentThreadId())
00039     {
00040         if ((sig == SIGQUIT) || (sig == SIGTERM) || (sig == SIGINT))
00041         {
00042             glbl_cleanup++;
00043         
00044         }
00045         else 
00046         {
00047             glbl_cleanup++;
00048             exit(1);
00049         }
00050     }
00051     else if ((sig != SIGQUIT) && 
00052              (sig != SIGTERM) && 
00053              (sig != SIGINT))
00054     {
00055         phThread::forcedExit();
00056     }
00057            
00058     if (glbl_cleanup >= 3)
00059     {
00060         phThread::forcedExit();
00061     }
00062     
00063     return;
00064 }
00065 
00066 #ifdef WIN32
00067 #include <windows.h>
00068 /* ----------------------------------------------------------------------- */
00069 /* BOOL WINAPI HandlerRoutine( DWORD dwCtrlType ); */
00070 /* ----------------------------------------------------------------------- */
00071 BOOL WINAPI SignalHandler( DWORD ctrl )
00072 {
00073     char *function = "SignalHandler";
00074     
00075     BOOL bResult = TRUE;
00076     
00077     switch (ctrl) 
00078     { 
00079         /*  Handle the CTRL+C signal.  */
00080         case CTRL_C_EVENT: 
00081             DEBUG_PRINT("CTRL_C_EVENT\n");
00082             bResult = TRUE; 
00083             break;
00084  
00085         /*  CTRL+CLOSE: confirm that the user wants to exit.  */
00086         case CTRL_CLOSE_EVENT: 
00087             DEBUG_PRINT("CTRL_CLOSE_EVENT\n");
00088             bResult = TRUE; 
00089             break;
00090  
00091         /*  Pass other signals to the next handler.  */
00092         case CTRL_BREAK_EVENT: 
00093             DEBUG_PRINT("CTRL_BREAK_EVENT\n");
00094             bResult = FALSE;
00095             break;
00096             
00097         case CTRL_LOGOFF_EVENT: 
00098             DEBUG_PRINT("CTRL_LOGOFF_EVENT\n");
00099             bResult = FALSE;
00100             break;
00101  
00102         case CTRL_SHUTDOWN_EVENT: 
00103             DEBUG_PRINT("CTRL_SHUTDOWN_EVENT\n");
00104             bResult = FALSE;
00105             break;
00106  
00107         default: 
00108             DEBUG_PRINT("OTHER EVENT?\n");
00109             bResult = FALSE; 
00110             break;
00111     } 
00112    
00113     DEBUG_PRINT("Signaled cleanup. Wait for it...\n\n");
00114 
00115     exit(1);
00116     return FALSE;
00117 
00118 error_return:
00119     return FALSE;
00120 }
00121 #endif
00122 
00123 /* ------------------------------------------------------------------------- *
00124  * init_sig_handler:
00125  * ------------------------------------------------------------------------- */
00126 void init_sig_handler()
00127 {
00128     phFUNCTION("init_sig_handler")
00129 
00130     int                 i       = 0;
00131     int                 nsigs   = 0;
00132     struct sigaction    new_act;
00133     sigset_t            mask_sigs;
00134     sigset_t            all_signals;
00135     
00136     int                 sigs [] = { SIGBUS, 
00137                                     SIGSEGV, 
00138                                     SIGFPE, 
00139                                     SIGINT, 
00140                                     SIGTERM, 
00141                                     SIGQUIT };
00142     /* SIGHUP, SIGINT, SIGQUIT, SIGBUS, SIGTERM, SIGSEGV, SIGFPE }; */
00143 #ifdef WIN32
00144     BOOL bResult = TRUE;
00145 #endif
00146 
00147     /* Get the size of the sigs[] array */
00148     nsigs = sizeof (sigs) / sizeof (int);
00149 
00150     /* Set all the signals in the mask */
00151     rc = sigfillset ( &all_signals );
00152     phPRINT_RC(rc,"sigfillset","sigfillset");
00153     
00154     /* Remove the sigs[i] signals from the BLOCK set */
00155     for ( i = 0; i < nsigs; i++ )
00156     {
00157         rc = sigdelset ( &all_signals, sigs [i] );
00158         phPRINT_RC(rc,"sigdelset", "sigdelset");
00159     }
00160 
00161     /* Set the BLOCK mask to be used with the current mask */ 
00162     rc = sigprocmask ( SIG_BLOCK, &all_signals, NULL );
00163     phPRINT_RC(rc,"sigprocmask","sigprocmask" );
00164     
00165     rc = sigfillset (&all_signals );
00166     phPRINT_RC(rc,"sigfillset","sigfillset");
00167     
00168     for(i = 0; i <  nsigs; i++)
00169     {
00170         new_act.sa_handler  = sig_handler;
00171         new_act.sa_mask     = all_signals;
00172         new_act.sa_flags    = 0;
00173         
00174         rc = sigaction (sigs [i], &new_act, NULL);
00175         phCHECK_RC(rc,"sigaction","sigaction can't set signals.");
00176 
00177         glbl_main_pid=phGetCurrentThreadId();
00178     }
00179     
00180 #ifdef WIN32
00181     /* BOOL SetConsoleCtrlHandler( PHANDLER_ROUTINE HandlerRoutine,
00182      *                             BOOL Add ); */
00183 
00184     bResult = SetConsoleCtrlHandler(  (PHANDLER_ROUTINE)SignalHandler, TRUE );
00185     if (!bResult)
00186     {
00187         phCHECK_RC(-1,"SetConsoleCtrlHandler","SetConsoleCtrlHandler failed to set signal handler.");
00188     }
00189 #endif
00190     
00191     return;
00192 error:
00193     exit(1);
00194 }
00195 
00196 /* -------------------------------------------------------------------------- *
00197  * usage:
00198  * -------------------------------------------------------------------------- */
00199 void usage()
00200 {
00201     printf("\nUsage:\n\t./NodeController.bin --port <portnumber>\n\n");
00202     fflush(stdout);
00203     exit(1);
00204 }
00205 
00206 /* -------------------------------------------------------------------------- *
00207  * main:
00208  * -------------------------------------------------------------------------- */
00209 #define NODES 3
00210 int main( int argc, char *argv[] )
00211 {
00212     phFUNCTION("main::NodeController")
00213     int retrc = phSUCCESS;
00214 
00215     phArgTable      *arg_parser = new phArgTable();
00216 
00217     int i = 0;
00218     int j = 0;
00219     int nComms = 0;
00220     int node_threads_running = 0;
00221    
00222     int     id              = 0;
00223     int     client_port     = 0;
00224     int     port[NODES]     = {0};
00225     char    *host[NODES]    = {NULL};
00226     int     tempport        = 0;
00227     char    *temphost       = NULL;
00228     int     type            = 0;
00229    
00230     /* Allocate a a new array for node comm thread objects */
00231     /* Fully connected network  = N - 1 connections out */
00232     phNodeComm          **node_comm_thread  = new phNodeComm *[NODES-1];
00233     phClientListener    *node_client_thread = NULL;
00234     phLamportSystem     *lamport_system     = NULL;
00235 
00236     phCHECK_NULLPTR(node_comm_thread,"new",
00237         "new (phNodeComm *)[NODES-1:%d] failed",
00238         NODES-1);
00239    
00240     /* initialize the signal handler */
00241     init_sig_handler();
00242    
00243     /* Setup and parse the command line arguments */
00244     /* phARG_CHAR, phARG_INT, phARG_UINT, phARG_LONG,
00245      * phARG_ULONG, phARG_BOOL, phARG_FUNC */
00246     rc = arg_parser->add("--help",(void*)&usage,phARG_FUNC);
00247     phCHECK_RC(rc,NULL,"arg_parser->add");
00248 
00249     rc = arg_parser->add("--id",&id,phARG_INT);
00250     phCHECK_RC(rc,NULL,"arg_parser->add");
00251 
00252     rc = arg_parser->add("--host0",&host[0],phARG_CHAR);
00253     phCHECK_RC(rc,NULL,"arg_parser->add-host0");
00254 
00255     rc = arg_parser->add("--port0",&port[0],phARG_INT);
00256     phCHECK_RC(rc,NULL,"arg_parser->add-port0");
00257 
00258     rc = arg_parser->add("--host1",&host[1],phARG_CHAR);
00259     phCHECK_RC(rc,NULL,"arg_parser->add-host1");
00260 
00261     rc = arg_parser->add("--port1",&port[1],phARG_INT);
00262     phCHECK_RC(rc,NULL,"arg_parser->add-port1");
00263 
00264     rc = arg_parser->add("--host2",&host[2],phARG_CHAR);
00265     phCHECK_RC(rc,NULL,"arg_parser->add-host2");
00266 
00267     rc = arg_parser->add("--port2",&port[2],phARG_INT);
00268     phCHECK_RC(rc,NULL,"arg_parser->add-port2");
00269 
00270     rc = arg_parser->add("--client_port",&client_port,phARG_INT);
00271     phCHECK_RC(rc,NULL,"arg_parser->add-client_port");
00272 
00273     rc = arg_parser->parse(argc,argv);
00274     phCHECK_RC(rc,NULL,"arg_parser->parse");
00275 
00276     for (i = 0; i < NODES; i++)
00277     {
00278         if ((port[i] <= 0) || (port[i] > 65535))
00279         {
00280             printf("Error: Port[%d] value is invalid - valid values (1,65535)\n",
00281                     port[i]);
00282             usage();
00283         }
00284     }
00285     
00286     
00287     /* 1.) Create the distributed mutual exclusion system object */
00288     
00289 
00290     /* The Lamport System is used by all client threads and by
00291      * the node comm threads.
00292      * 
00293      * When a message is received by a client handler, it calls
00294      * the lamport system's routines to handle it.
00295      * 
00296      * This maintains the Lamport Clock and it maintains the
00297      * lists for each mutex available in the system.
00298      * 
00299      * If a mutex list hasn't been created for a particular 
00300      * mutex id yet, this will create it dynamically at the
00301      * current node */
00302     lamport_system = new phLamportSystem();
00303     phCHECK_NULLPTR(lamport_system,
00304                   "new",
00305                   "new phLamportSystem failed.");
00306    
00307     /* set the node id for the lamport clock system */
00308     rc = lamport_system->setNodeId(id);
00309     phPRINT_RC(rc,NULL,"lamport_system->setNodeId(id:%d)", id);
00310             
00311     
00312     /* 2.) Create and start up the node system */
00313 
00314     
00315     /* Start the node_comm threads */
00316     nComms = 0;
00317     /* All connections to nodes before this 'id' are established
00318      * using the Waitor/server method. They connect here. */
00319     /* If this is intialized to Connector then:
00320      *  0 Server <- 1 Connector
00321      *  0 Server <- 2 Connector
00322      *  1 Server <- 2 Connector
00323      * otherwise
00324      *  0 Connector -> 1 Server
00325      *  0 Connector -> 2 Server
00326      *  1 Connector -> 2 Server
00327      */
00328     type = phNodeController_Waiter;
00329     
00330     for (i = (NODES - 1); i >= 0; i-- )
00331     {
00332         DEBUG_PRINT("id:%d i:%d j:%d\n",id,i,j);
00333         
00334         /* Don't try to connect to oneself */
00335         if (id != i) 
00336         {
00337             node_comm_thread[nComms] = new phNodeComm();
00338            
00339             rc = node_comm_thread[nComms]->setSystem(lamport_system);
00340             phPRINT_RC(rc,NULL,
00341              "ncthread[nComms:%d]->setSystem(%p) failed.",
00342              nComms,
00343              lamport_system);
00344             
00345             rc = node_comm_thread[nComms]->setId(id);
00346             phPRINT_RC(rc,NULL,
00347              "ncthread[nComms:%d]->setId(id:%d) failed.",
00348              nComms,id);
00349             
00350             rc = node_comm_thread[nComms]->setOtherSideId(i);
00351             phPRINT_RC(rc,NULL,
00352              "ncthread[nComms:%d]->setOtheSideId(id:%d) failed.",
00353              nComms,i);
00354           
00355             DEBUG_PRINT("Set Port and Host-Comms:%d i:%d j:%d\n",
00356                         nComms, i, j);
00357             
00358             if (type == phNodeController_Waiter)
00359             {
00360                 DEBUG_PRINT("Waiter: %s:%d\n",host[id],port[id]+i);
00361                 tempport = port[id]+i;
00362                 temphost = host[id];
00363             }
00364             else if (type == phNodeController_Connector)
00365             {
00366                 DEBUG_PRINT("Connector: %s:%d\n",host[i],port[i]+id);
00367                 tempport = port[i]+id;
00368                 temphost = host[i];
00369             }
00370             else
00371             {
00372                 DEBUG_PRINT("Unknown: %s:%d\n",host[i],port[i]+id);
00373                 tempport = port[i]+id;
00374                 temphost = host[i];
00375             }
00376                 
00377             rc = node_comm_thread[nComms]->setPort(tempport);
00378             phPRINT_RC(rc,NULL,"");
00379             
00380             rc = node_comm_thread[nComms]->setHost(temphost);
00381 
00382             
00383             DEBUG_PRINT("Set Connection Type\n");
00384             rc = node_comm_thread[nComms]->setConnectionType(type);
00385             
00386             DEBUG_PRINT("Start\n");
00387             rc = node_comm_thread[nComms]->start();
00388             phCHECK_RC(rc,NULL,"node_comm_thread[nComms:%d]->start",
00389                      nComms);
00390             DEBUG_PRINT("Started\n");
00391 
00392             nComms++;
00393         }
00394         /* Once we've reached our id in the loop, 
00395          * all other
00396          * connections are going to be the other type.
00397          * This allows testing of order for servers/clients
00398          * starting up */
00399         if (id == i)
00400         {
00401             if (type == phNodeController_Connector)
00402             {
00403                 type = phNodeController_Waiter;
00404             }
00405             else if (type == phNodeController_Waiter)
00406             {
00407                 type = phNodeController_Connector;
00408             }
00409         }
00410     }
00411     
00412     
00413     /* 3.) Wait for the node system to be fully connected:
00414      *      - we can do a simple list traversal because there's
00415      *      only three nodes here. A large system would require 
00416      *      some kind linked list from which we could pop 
00417      *      connected nodes, to decrease repeat comparisons on
00418      *      nodes that are already started. */
00419     
00420     
00421     /* Wait for the network to be up */
00422     /* This is necessary for the System to work correctly...
00423      * ... the Node system needs to be up before requests are
00424      * started. */
00425     do
00426     {
00427         node_threads_running = 1;
00428 
00429         for (i = 0; i < nComms; i++)
00430         {
00431             int running = node_comm_thread[i]->isRunning();
00432             bool connected = node_comm_thread[i]->isConnected();
00433 
00434             if ((!running) || (!connected))
00435             {
00436                 node_threads_running = 0;
00437             }
00438         }
00439         
00440         phYield();
00441     }
00442     while ((!node_threads_running) && (glbl_cleanup == 0));
00443  
00444     if (glbl_cleanup) goto cleanup;
00445     
00446     
00447     /* 4.) Create and start the client listener. 
00448      *  - Once the system is up, we can handle clients coming 
00449      *  and going, requesting mutexes and releasing them. */
00450     
00451     
00452     DEBUG_PRINT("Starting client listener...\n");
00453     
00454     /* Create the client listener for producer and consumer messages */
00455     node_client_thread = new phClientListener(id,client_port,host[id]);
00456     phCHECK_NULLPTR(node_client_thread,"new",
00457             "new phClientListener failed.");
00458 
00459     /* Assign the global-ish mutex agreement system to the client
00460      * server thread so it can hand it off to all the clients when
00461      * they connect. It allows the clients to send messages
00462      * over the network requesting a mutex and allows the 
00463      * system to pass a message back to the client to give it
00464      * the mutex. */
00465     rc = node_client_thread->setSystem(lamport_system);
00466     phCHECK_RC(rc,NULL,"nclient_thread->setSystem failed.");
00467     
00468     /* Start the client listener thread */
00469     DEBUG_PRINT("Start listener\n");
00470     rc = node_client_thread->start();
00471     phCHECK_RC(rc,NULL,"node_client_thread->start");
00472     
00473     
00474     /* 5.) Enter the main loop to wait for any of the threads to die.
00475      *      - It is assumed that if any of the threads exit, some error
00476      *      occured and the system will become inoperable. So we close 
00477      *      down everything which includes disconnecting clients. */
00478     
00479     
00480     phPROGRESS("Main loop\n");
00481     glbl_in_main_loop = 1;
00482     /* Main loop. Waits for a signal to cleanup when either the server
00483      * stops on it's own or a signal is recieved */
00484     node_threads_running = 1;
00485     do
00486     {
00487         /* loop through all the comm threads and check 
00488          * the client thread for status. If any of them die,
00489          * fall out of the loop and shutdown the system */
00490         for (i = 0; i < nComms; i++)
00491         {
00492             if (!(node_comm_thread[i]->isRunning()))
00493             {
00494                 node_threads_running = 0;
00495             }
00496         }
00497         if (!(node_client_thread->isRunning()))
00498         {
00499             node_threads_running = 0;
00500         }
00501         
00502         phYield();
00503     }
00504     while (node_threads_running && (glbl_cleanup == 0));
00505 
00506     if (glbl_cleanup) goto cleanup;
00507 
00508 
00509     /* 6.) Cleanup. Print status */
00510 
00511     
00512     goto success;
00513    
00514 cleanup:
00515     phPROGRESS("Cleanup signal caught\n");
00516     goto return_jump;
00517     
00518 error:
00519     phPROGRESS("Finished w/Failure.\n");
00520     retrc = phFAIL;
00521     goto return_jump;
00522     
00523 success:
00524     phPROGRESS("Finished Successfully. \n");
00525     
00526 return_jump:
00527     /* If we get here, stop the node comm threads, or try to */
00528     if (node_comm_thread != NULL)
00529     {
00530         for (i = 0; i < nComms; i++)
00531         {
00532             /* if the comm thread isn't NULL,
00533              * stop the node comm thread */
00534             if (node_comm_thread[i] != NULL)
00535             {
00536                 rc = node_comm_thread[i]->stop();
00537                 phPRINT_RC(rc,NULL,"node_comm_thread[i%d]->stop()",i);
00538             }
00539             
00540             /* Delete the particular comm thread instance */
00541             phDelete(node_comm_thread[i]);
00542         }
00543     }
00544 
00545     /* Delete the comm thread array */
00546     phDelete(node_comm_thread);
00547 
00548     /* Stop the client server thread */
00549     if (node_client_thread != NULL)
00550     {
00551         rc = node_client_thread->stop();
00552         phPRINT_RC(rc,NULL,"node_client_thread->stop()");
00553     }
00554     
00555     /* Destroy the client server thread */
00556     phDelete(node_client_thread);
00557     /* Destroy the process global-ish mutual exclusion system */
00558     phDelete(lamport_system);
00559     
00560     phDelete(arg_parser);
00561 
00562     return retrc;    
00563 }
00564 




Copyright (C) 2002 - 2007 Philip D.S. Thoren ( pthoren@users.sourceforge.net )
University Of Massachusetts at Lowell
Robotics Lab
SourceForge.net Logo

Generated on Sat Jun 16 02:44:05 2007 for phission by  doxygen 1.4.4