00001 #include <mcp.H>
00002 #include <unistd.h>
00003 #include <sys/types.h>
00004 #include <sys/wait.h>
00005
00006 using namespace WURDE;
00007 using namespace MCP;
00008
00020 vector<Module> modules;
00021
00022 void handleBeat(HeartbeatDataStruct beat);
00023 void handleInfo(HeartbeatInfoStruct info);
00024 void updateModules(Heartbeat &myHeartbeat,CommsManager &myManager);
00025 McpRequestInfoStruct handleRequest(McpRequestInfoStruct request,Heartbeat &myHeartbeat,CommsManager &myManager);
00026 void probeModules(CommsManager &myManager);
00027
00028 int main(int argc, char *argv[]){
00029
00030 string configfile,mapfile;
00031 CommsManager myManager("MCPManager");
00032
00033 myManager.setRealName("Master Control Program");
00034
00035
00036
00037
00038
00039
00040 myManager.parseOptions(argc,argv,"c:");
00041 Logger myLogger("MCPLogger");
00042 if(g_globalConfiguration.haveOption("c")){
00043 configfile=g_globalConfiguration.getOption("c");
00044 }else{
00045 configfile=g_globalConfiguration.getMCPConfigFile();
00046 if(configfile[0]!='/'){
00047 configfile=g_globalConfiguration.getConfigDirectory()+"/"
00048 +g_globalConfiguration.getMCPConfigFile();
00049 }
00050 }
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 Heartbeat myHeartbeat("Any");
00064 MCPControl myController("Any");
00065 bool running=true;
00066 HeartbeatDataStruct currBeat;
00067 HeartbeatInfoStruct currInfo;
00068 Time lastControl;
00069 McpRequest myMCPRequests("Any");
00070
00071
00072 modules=parseConfigFile(configfile);
00073
00074
00075
00076
00077
00085 myHeartbeat.setQueueMode(true);
00086 myHeartbeat.setAutoTag(false);
00087
00088
00089 myMCPRequests.setStreamName("MCP");
00090 myMCPRequests.setGlobalName("MCP");
00091 myMCPRequests.setQueueMode(true);
00092 myMCPRequests.setAutoTag(false);
00093
00094 myManager.setNoHeartbeat();
00095 myManager.setMcpRequestMode(false);
00096 myManager.setIdleDataProcessing(true);
00097
00098 myManager.registerSupplier(&myLogger);
00099 myManager.registerSupplier(&myMCPRequests);
00100 myManager.registerConsumer(&myHeartbeat);
00101 myManager.registerConsumer(&myController);
00102 myManager.setSleep(0.5);
00103 myManager.setPreSleep();
00104
00105 probeModules(myManager);
00106
00107 g_loginfo << "MCP initialized. Entering main loop..." << endl;
00108
00109 while(running){
00110
00111 if(myController.newData()){
00112 if(myController.data.state==STATE_QUIT){
00113 running=false;
00114 }
00115 }
00116
00117 while(myHeartbeat.newData()){
00118 handleBeat(myHeartbeat.getNextData());
00119 }
00120
00121 while(myHeartbeat.newInfo()){
00122
00123 handleInfo(myHeartbeat.getNextInfo());
00124 }
00125
00126 while(myMCPRequests.newRequest()){
00127
00128 myMCPRequests.info=handleRequest(myMCPRequests.getNextRequest(),myHeartbeat,myManager);
00129 myMCPRequests.publishInfo();
00130 }
00131
00132 updateModules(myHeartbeat,myManager);
00133
00134 myManager.runUpdate();
00135
00136
00137 waitpid(-1,NULL,WNOHANG);
00138 }
00139
00140 g_info("MCP is shutting down cleanly.");
00141
00142 myManager.cleanUp();
00143
00144 waitpid(-1,NULL,WNOHANG);
00145
00146 return 0;
00147 }
00148
00149 void handleBeat(HeartbeatDataStruct beat){
00150 vector<Module>::iterator modIter;
00151 bool found=false;
00152
00153 for(modIter=modules.begin();modIter!=modules.end();modIter++){
00154 if(beat.source==modIter->getModuleName()){
00155 modIter->beat(beat);
00156 found=true;
00157 break;
00158 }
00159 }
00160 }
00161
00162 void handleInfo(HeartbeatInfoStruct info){
00163 vector<Module>::iterator modIter;
00164 bool found=false;
00165
00166 for(modIter=modules.begin();modIter!=modules.end();modIter++){
00167 if(info.source==modIter->getModuleName()){
00168 modIter->refresh(info);
00169 found=true;
00170 break;
00171 }
00172 }
00173
00174 if(!found&&info.managedType.getValue()!=PROC_UNMANAGED){
00175 Module newmod(info);
00176 modules.push_back(newmod);
00177 }
00178 }
00179
00180 void updateModules(Heartbeat &myHeartbeat,CommsManager &myManager){
00181
00182 vector<Module>::iterator modIter;
00183 string buffer;
00184 RunState currState;
00185
00186 for(modIter=modules.begin(); modIter!=modules.end();modIter++){
00187 currState=modIter->getState();
00188
00189 if(modIter->stale()&&(currState==STATE_RUN||
00190 currState==STATE_IDLE)){
00191 modIter->setState(STATE_ZOMBIE);
00192 g_logwarn << "Module " << modIter->getModuleName() << " is a zombie." << endl;
00193
00194
00195 HeartbeatInfoStruct zombie;
00196 zombie.source="MCP";
00197 zombie.target="Any";
00198 zombie.mType=MESSAGE_INFO;
00199 zombie.moduleName.setValue(modIter->getModuleName());
00200 zombie.state.setValue(STATE_ZOMBIE);
00201 zombie.managedType.setValue(modIter->getType());
00202 zombie.locale.setValue(modIter->getLocale());
00203 myHeartbeat.requests=zombie;
00204 myHeartbeat.publishRequest();
00205
00206 if(modIter->getType()==PROC_CRITICAL){
00207 zombie.mType=MESSAGE_REQUEST;
00208 zombie.moduleName.setValue("Any");
00209 zombie.state.setValue(STATE_IDLE);
00210 myHeartbeat.requests=zombie;
00211 myHeartbeat.publishRequest();
00212 }
00213
00214 }else if(!modIter->stale()){
00215
00216 switch(modIter->getState()){
00217 case STATE_NULL:
00218 case STATE_ZOMBIE:
00219 case STATE_RESTART:
00220 case STATE_RESET:
00221 case STATE_FAIL:
00222
00223 modIter->setState(STATE_IDLE);
00224 break;
00225 case STATE_INFO:
00226 modIter->setState(STATE_INACTIVE);
00227 break;
00228 default:
00229 break;
00230 }
00231 }else if(modIter->getState()==STATE_ZOMBIE){
00232
00241 if(modIter->m_attemptedStarts < modIter->m_maxAttempts||
00242 modIter->m_maxAttempts<0){
00243 modIter->m_attemptedStarts++;
00244 modIter->startUp(myManager,"-o");
00245 }else{
00246 modIter->setState(STATE_FAIL);
00247 HeartbeatInfoStruct failure;
00248 failure.source="MCP";
00249 failure.target="Any";
00250 failure.mType=MESSAGE_INFO;
00251 failure.moduleName.setValue(modIter->getModuleName());
00252 failure.state.setValue(STATE_FAIL);
00253 failure.managedType.setValue(modIter->getType());
00254 failure.locale.setValue(modIter->getLocale());
00255 myHeartbeat.requests=failure;
00256 myHeartbeat.publishRequest();
00257
00258 }
00259
00260 }
00261
00262 }
00263
00264 }
00265
00266 McpRequestInfoStruct handleRequest(McpRequestInfoStruct request,Heartbeat &myHeartbeat,CommsManager &myManager){
00267 vector<Module>::iterator iter,iter2;
00268 string mapping="",modulename;
00269 bool done=false;
00270
00271 if(request.status.getValue()==STAT_REQUEST){
00272 for(iter=modules.begin();iter!=modules.end()&&!done;iter++){
00273 mapping="";
00274
00275 if(iter->getModuleName()==request.source){
00276 if(request.strategy.getValue()!=STRAT_NORMAL){
00277 mapping=iter->findObjectMapping(request.gblname.getValue(),request.type.getValue(),request.strategy.getValue());
00278 }else{
00279
00280 mapping=request.gblname.getValue();
00281 }
00282
00283
00284 if(mapping!=""){
00285
00286 for(iter2=modules.begin();iter2!=modules.end()&&!done;iter2++){
00287 if(iter2->getModuleName()!=request.source&&iter2->hasSupplier(mapping,request.type.getValue())){
00288 g_debug("Found the right source.");
00289
00290 if(iter2->getState()==STATE_IDLE){
00291 myHeartbeat.requests.source="MCP";
00292 myHeartbeat.requests.mType=MESSAGE_REQUEST;
00293 myHeartbeat.requests.target=iter->getModuleName();
00294 myHeartbeat.requests.moduleName.setValue(iter->getModuleName());
00295 myHeartbeat.requests.state.setValue(STATE_RUN);
00296 myHeartbeat.publishRequest();
00297 modulename=iter2->getModuleName();
00298 done=true;
00299 break;
00300 }else if(iter2->getState()==STATE_RUN||iter2->getState()==STATE_RESET||iter2->getState()==STATE_REQUEST||
00301 iter2->getState()==STATE_STARTUP||iter2->getState()==STATE_RESTART){
00302 modulename=iter2->getModuleName();
00303 char buffer[512];
00304 sprintf(buffer,"%s is already running.",modulename.c_str());
00305 g_debug(buffer);
00306 done=true;
00307 break;
00308 }else if(iter2->startUp(myManager," ")){
00309 modulename=iter2->getModuleName();
00310 done=true;
00311 break;
00312 }else{
00313 g_debug("How did I get here??");
00314 done=false;
00315 }
00316 }else{
00317 g_debug("Doesn't actually seem to have the stream...");
00318 }
00319 }
00320 if(done){
00321 break;
00322 }else{
00323
00324 mapping="";
00325 }
00326 }else{
00327 g_warn("Couldn't find the mapping.");
00328 }
00329 }else{
00330 char buffer[512];
00331 sprintf(buffer,"%s is not this module: %s",request.source.c_str(),iter->getModuleName().c_str());
00332 g_debug(buffer);
00333 }
00334 }
00335
00336 if(mapping==""){
00337 g_warn("Failed to fill a request.");
00338 request.target=request.source;
00339 request.mType=MESSAGE_INFO;
00340 request.source="MCP";
00341 request.stream.setValue("NULL");
00342 request.status.setValue(STAT_FAIL);
00343 }else{
00344 g_debug("Filled a request successfully.");
00345 request.target=request.source;
00346 request.mType=MESSAGE_INFO;
00347 request.source="MCP";
00348 request.stream.setValue(mapping);
00349 request.status.setValue(STAT_OKAY);
00350 request.robotModule.setValue(modulename);
00351 }
00352
00353 }
00354
00355 return request;
00356
00357 }
00358
00362 void probeModules(CommsManager &myManager){
00363
00364 vector<Module>::iterator iter;
00365
00366 for(iter=modules.begin();iter!=modules.end();iter++){
00367 if((iter->getType()==PROC_MANAGED||iter->getType()==PROC_CRITICAL)
00368 &&iter->getLocale()==LOCALE_INTERNAL&&iter->getState()==STATE_INACTIVE){
00369
00370 if(! iter->startUp(myManager,"-r")){
00371
00372 }
00373 }
00374 }
00375
00376 }