/home/fwph/code/wurde/rde/mcp/mcp.cpp

Go to the documentation of this file.
00001 #include <mcp.H>
00002 #include <unistd.h>
00003 #include <sys/types.h>
00004 #include <sys/wait.h>
00005 
00006 using namespace WURDE;
00007 using namespace MCP;
00008 
00020 vector<Module> modules;
00021 
00022 void handleBeat(HeartbeatDataStruct beat);
00023 void handleInfo(HeartbeatInfoStruct info);
00024 void updateModules(Heartbeat &myHeartbeat,CommsManager &myManager);
00025 McpRequestInfoStruct handleRequest(McpRequestInfoStruct request,Heartbeat &myHeartbeat,CommsManager &myManager);
00026 void probeModules(CommsManager &myManager);
00027 
00028 int main(int argc, char *argv[]){
00029 
00030        string configfile,mapfile;
00031        CommsManager myManager("MCPManager");
00032        
00033        myManager.setRealName("Master Control Program");
00034        /* \todo Maybe instead of keeping mapping/config MCP files separate, 
00035           we should be able to parse multiple mcp config files. Later config
00036           files will override previously specified mappings.
00037        */
00038        //       myManager.setHelpString("MCP-specific options:\n\t-c [config file] Use [config file] as the MCP configuration file\n");
00039 
00040        myManager.parseOptions(argc,argv,"c:");
00041        Logger myLogger("MCPLogger");
00042        if(g_globalConfiguration.haveOption("c")){
00043               configfile=g_globalConfiguration.getOption("c");
00044        }else{
00045               configfile=g_globalConfiguration.getMCPConfigFile();
00046               if(configfile[0]!='/'){
00047                      configfile=g_globalConfiguration.getConfigDirectory()+"/"
00048                             +g_globalConfiguration.getMCPConfigFile();
00049               }
00050        }
00051 
00052        /*       if(g_globalConfiguration.haveOption("m")){
00053               mapfile=g_globalConfiguration.getOption("m");
00054        }else{
00055               mapfile=g_globalConfiguration.getMCPMappingsFile();
00056               if(mapfile[0]!='/'){
00057                      mapfile=g_globalConfiguration.getConfigDirectory()+"/"
00058                             +g_globalConfiguration.getMCPMappingsFile();
00059               }
00060               }*/
00061 
00062 
00063        Heartbeat myHeartbeat("Any");
00064        MCPControl myController("Any");
00065        bool running=true;
00066        HeartbeatDataStruct currBeat;
00067        HeartbeatInfoStruct currInfo;
00068        Time lastControl;
00069        McpRequest myMCPRequests("Any");
00070        
00071        // parse the config file
00072        modules=parseConfigFile(configfile);
00073 
00074        /*       if(mapfile!=""){
00075               parseMapFile(mapfile, modules);
00076               }*/
00077 
00085        myHeartbeat.setQueueMode(true);
00086        myHeartbeat.setAutoTag(false);
00087        //       myHeartbeat.setGblName("MCP");
00088 
00089        myMCPRequests.setStreamName("MCP");
00090        myMCPRequests.setGlobalName("MCP");
00091        myMCPRequests.setQueueMode(true);      
00092        myMCPRequests.setAutoTag(false);
00093 
00094        myManager.setNoHeartbeat();
00095        myManager.setMcpRequestMode(false);
00096        myManager.setIdleDataProcessing(true);
00097 
00098        myManager.registerSupplier(&myLogger);
00099        myManager.registerSupplier(&myMCPRequests);
00100        myManager.registerConsumer(&myHeartbeat);
00101        myManager.registerConsumer(&myController);
00102        myManager.setSleep(0.5);
00103        myManager.setPreSleep();
00104 
00105        probeModules(myManager);
00106 
00107        g_loginfo << "MCP initialized. Entering main loop..." << endl;
00108        
00109        while(running){
00110               
00111               if(myController.newData()){
00112                      if(myController.data.state==STATE_QUIT){
00113                             running=false;
00114                      }
00115               }
00116               
00117               while(myHeartbeat.newData()){
00118                      handleBeat(myHeartbeat.getNextData());
00119               }
00120               
00121               while(myHeartbeat.newInfo()){
00122                      //              g_debug("Received new info message.");
00123                      handleInfo(myHeartbeat.getNextInfo());
00124               }
00125 
00126               while(myMCPRequests.newRequest()){
00127                      //              g_debug("Received new request.");
00128                      myMCPRequests.info=handleRequest(myMCPRequests.getNextRequest(),myHeartbeat,myManager);                 
00129                      myMCPRequests.publishInfo();
00130               }
00131               
00132               updateModules(myHeartbeat,myManager);
00133               
00134               myManager.runUpdate();
00135               
00136               //let processes clean up
00137               waitpid(-1,NULL,WNOHANG);
00138        }
00139 
00140        g_info("MCP is shutting down cleanly.");
00141        
00142        myManager.cleanUp();
00143 
00144        waitpid(-1,NULL,WNOHANG);
00145 
00146        return 0;
00147 }
00148 
00149 void handleBeat(HeartbeatDataStruct beat){
00150        vector<Module>::iterator modIter;
00151        bool found=false;
00152        
00153        for(modIter=modules.begin();modIter!=modules.end();modIter++){
00154               if(beat.source==modIter->getModuleName()){
00155                      modIter->beat(beat);
00156                      found=true;
00157                      break;
00158           }
00159      }
00160 }
00161 
00162 void handleInfo(HeartbeatInfoStruct info){
00163      vector<Module>::iterator modIter;
00164      bool found=false;
00165      //     cout << "Received info from " << info.source << endl;
00166      for(modIter=modules.begin();modIter!=modules.end();modIter++){
00167           if(info.source==modIter->getModuleName()){
00168                modIter->refresh(info);
00169                found=true;
00170                break;
00171           }
00172      }
00173 
00174      if(!found&&info.managedType.getValue()!=PROC_UNMANAGED){
00175           Module newmod(info);
00176           modules.push_back(newmod);
00177      }
00178 }
00179 
00180 void updateModules(Heartbeat &myHeartbeat,CommsManager &myManager){
00181      
00182      vector<Module>::iterator modIter;
00183      string buffer;
00184      RunState currState;
00185 
00186      for(modIter=modules.begin(); modIter!=modules.end();modIter++){
00187             currState=modIter->getState();
00188 
00189             if(modIter->stale()&&(currState==STATE_RUN||
00190                                   currState==STATE_IDLE)){
00191                    modIter->setState(STATE_ZOMBIE);
00192                    g_logwarn << "Module " << modIter->getModuleName() << " is a zombie." << endl;
00193                    
00194                    //send out an informational message
00195                    HeartbeatInfoStruct zombie;
00196                    zombie.source="MCP";
00197                    zombie.target="Any";
00198                    zombie.mType=MESSAGE_INFO;
00199                    zombie.moduleName.setValue(modIter->getModuleName());
00200                    zombie.state.setValue(STATE_ZOMBIE);
00201                    zombie.managedType.setValue(modIter->getType());
00202                    zombie.locale.setValue(modIter->getLocale());
00203                    myHeartbeat.requests=zombie;
00204                    myHeartbeat.publishRequest();
00205                    //if it's a critical module, order everyone to idle
00206                    if(modIter->getType()==PROC_CRITICAL){
00207                           zombie.mType=MESSAGE_REQUEST;
00208                           zombie.moduleName.setValue("Any");
00209                           zombie.state.setValue(STATE_IDLE);
00210                           myHeartbeat.requests=zombie;
00211                           myHeartbeat.publishRequest();
00212                    }
00213                    
00214             }else if(!modIter->stale()){
00215                    //g_debug("Not stale.");
00216                    switch(modIter->getState()){
00217                    case STATE_NULL:
00218                    case STATE_ZOMBIE:
00219                    case STATE_RESTART:
00220                    case STATE_RESET:
00221                    case STATE_FAIL:
00222                           //all we know is that the process is responsive
00223                           modIter->setState(STATE_IDLE);
00224                           break;
00225                    case STATE_INFO:
00226                           modIter->setState(STATE_INACTIVE);
00227                           break;
00228                    default:
00229                           break;
00230                    }
00231             }else if(modIter->getState()==STATE_ZOMBIE){
00232 
00241                    if(modIter->m_attemptedStarts < modIter->m_maxAttempts||
00242                       modIter->m_maxAttempts<0){
00243                           modIter->m_attemptedStarts++;
00244                           modIter->startUp(myManager,"-o");
00245                    }else{
00246                           modIter->setState(STATE_FAIL);
00247                           HeartbeatInfoStruct failure;
00248                           failure.source="MCP";
00249                           failure.target="Any";
00250                           failure.mType=MESSAGE_INFO;
00251                           failure.moduleName.setValue(modIter->getModuleName());
00252                           failure.state.setValue(STATE_FAIL);
00253                           failure.managedType.setValue(modIter->getType());
00254                           failure.locale.setValue(modIter->getLocale());
00255                           myHeartbeat.requests=failure;
00256                           myHeartbeat.publishRequest();
00257                                 
00258                    }
00259                    
00260             }
00261             
00262      }
00263      
00264 }
00265 
00266 McpRequestInfoStruct handleRequest(McpRequestInfoStruct request,Heartbeat &myHeartbeat,CommsManager &myManager){
00267        vector<Module>::iterator iter,iter2;
00268        string mapping="",modulename;
00269        bool done=false;
00270        
00271        if(request.status.getValue()==STAT_REQUEST){
00272               for(iter=modules.begin();iter!=modules.end()&&!done;iter++){
00273                      mapping="";
00274 
00275                      if(iter->getModuleName()==request.source){
00276                             if(request.strategy.getValue()!=STRAT_NORMAL){
00277                                    mapping=iter->findObjectMapping(request.gblname.getValue(),request.type.getValue(),request.strategy.getValue());                               
00278                             }else{
00279                                    // if normal is used, we're just using module auto-loading
00280                                    mapping=request.gblname.getValue();
00281                             }
00282                             //now we have to find the module which contains this stream, and make sure we can 
00283                             //start it up.
00284                             if(mapping!=""){
00285                                    
00286                                    for(iter2=modules.begin();iter2!=modules.end()&&!done;iter2++){
00287                                           if(iter2->getModuleName()!=request.source&&iter2->hasSupplier(mapping,request.type.getValue())){
00288                                                  g_debug("Found the right source.");
00289                                                  //start it up
00290                                                  if(iter2->getState()==STATE_IDLE){
00291                                                         myHeartbeat.requests.source="MCP";
00292                                                         myHeartbeat.requests.mType=MESSAGE_REQUEST;
00293                                                         myHeartbeat.requests.target=iter->getModuleName();
00294                                                         myHeartbeat.requests.moduleName.setValue(iter->getModuleName());
00295                                                         myHeartbeat.requests.state.setValue(STATE_RUN);
00296                                                         myHeartbeat.publishRequest();
00297                                                         modulename=iter2->getModuleName();
00298                                                         done=true;
00299                                                         break;
00300                                                  }else if(iter2->getState()==STATE_RUN||iter2->getState()==STATE_RESET||iter2->getState()==STATE_REQUEST||
00301                                                                iter2->getState()==STATE_STARTUP||iter2->getState()==STATE_RESTART){
00302                                                         modulename=iter2->getModuleName();
00303                                                         char buffer[512];
00304                                                         sprintf(buffer,"%s is already running.",modulename.c_str());
00305                                                         g_debug(buffer);
00306                                                         done=true;
00307                                                         break;
00308                                                  }else if(iter2->startUp(myManager," ")){
00309                                                         modulename=iter2->getModuleName();
00310                                                         done=true;
00311                                                         break;
00312                                                  }else{
00313                                                         g_debug("How did I get here??");
00314                                                         done=false;
00315                                                  }
00316                                           }else{
00317                                                  g_debug("Doesn't actually seem to have the stream...");
00318                                           }
00319                                    }                                    
00320                                    if(done){
00321                                           break;
00322                                    }else{
00323                                           
00324                                           mapping="";
00325                                    }
00326                             }else{
00327                                    g_warn("Couldn't find the mapping.");
00328                             }
00329                      }else{
00330                             char buffer[512];
00331                             sprintf(buffer,"%s is not this module: %s",request.source.c_str(),iter->getModuleName().c_str());
00332                             g_debug(buffer);
00333                      }
00334               }
00335               
00336               if(mapping==""){
00337                      g_warn("Failed to fill a request.");
00338                      request.target=request.source;
00339                      request.mType=MESSAGE_INFO;
00340                      request.source="MCP";
00341                      request.stream.setValue("NULL");
00342                      request.status.setValue(STAT_FAIL);
00343               }else{
00344                      g_debug("Filled a request successfully.");
00345                      request.target=request.source;
00346                      request.mType=MESSAGE_INFO;
00347                      request.source="MCP";
00348                      request.stream.setValue(mapping);
00349                      request.status.setValue(STAT_OKAY);
00350                      request.robotModule.setValue(modulename);
00351               }
00352               
00353        }
00354 
00355        return request;
00356 
00357 }
00358 
00362 void probeModules(CommsManager &myManager){
00363        
00364        vector<Module>::iterator iter;
00365        
00366        for(iter=modules.begin();iter!=modules.end();iter++){
00367               if((iter->getType()==PROC_MANAGED||iter->getType()==PROC_CRITICAL)
00368                  &&iter->getLocale()==LOCALE_INTERNAL&&iter->getState()==STATE_INACTIVE){
00369                      //              cout << "Probing " << iter->getModuleName() << endl;
00370                      if(! iter->startUp(myManager,"-r")){
00371                             //    cout << "Error starting " << iter->getModuleName() << endl;
00372                      }
00373               }
00374        }
00375 
00376 }

Generated on Thu Feb 1 15:31:54 2007 for WURDE by  doxygen 1.5.1