00001
00028 #include <Capability.H>
00029 #include <CommsManager.H>
00030 #include <map>
00031
00032 #ifdef HAVE_CONFIG_H
00033 #include <config.h>
00034 #else
00035 #define PACKAGE_STRING "WURDE"
00036 #endif
00037
00038 using namespace WURDE;
00039
00040 std::map<std::string, commshelperloader_ptr *, std::less<std::string> > commsHelperFactory;
00041 namespace WURDE{
00042 CommsManager *g_globalCommsManager_ptr;
00043
00044 CommsManager::CommsManager(string name) :
00045 m_mcpRequest("MCP"),
00046 m_heartbeat(name)
00047 {
00048 g_globalCommsManager_ptr=this;
00049 m_helpstring="\n";
00050 m_realName=name;
00051
00052
00053
00054
00055 m_name=name;
00056 defaultProtocol=PROTOCOL_CMUIPC;
00057 initialized=false;
00058 m_doSleep=true;
00059 sleepInterval.tv_sec=0;
00060 sleepInterval.tv_nsec=100000000;
00061 m_heartbeat.data.TTL=0.2;
00062 m_doHeartbeat=true;
00063 preSleep=false;
00064 m_heartbeat.info.state.setValue(STATE_RUN);
00065 m_currState=STATE_RUN;
00066 onlyTrigger=false;
00067
00068 m_laxRequests=false;
00069 m_doMcpRequests=true;
00070 m_doIdleProcessing=false;
00071 m_nameServiceOverride=false;
00072
00073 if(commsHelperFactory.find("CMUIPC")==commsHelperFactory.end()){
00074 cout << "Error! Couldn't find the CMUIPC adaptor!\n";
00075 exit(1);
00076 }else{
00077 myCommsHelper=commsHelperFactory["CMUIPC"](name);
00078 }
00079
00080
00081 char *buffer;
00082 buffer=getenv("WURDE_CONFIG");
00083 if(buffer){
00084 m_globalConfigFile=buffer;
00085 }else{
00086 m_globalConfigFile="wurde-config.xml";
00087 }
00088 endsleep.now();
00089 }
00090
00091 CommsManager::CommsManager(string name, Protocols defaultProt) :
00092 m_mcpRequest("MCP"),
00093 m_heartbeat(name)
00094 {
00095 g_globalCommsManager_ptr=this;
00096 m_helpstring="\n";
00097 m_realName=name;
00098
00099
00100
00101
00102
00103
00104 m_name=name;
00105 defaultProtocol=defaultProt;
00106 initialized=false;
00107 m_doSleep=true;
00108 sleepInterval.tv_sec=0;
00109 sleepInterval.tv_nsec=100000000;
00110 m_heartbeat.data.TTL=0.2;
00111 m_doHeartbeat=true;
00112 preSleep=false;
00113 m_heartbeat.info.state.setValue(STATE_RUN);
00114 m_currState=STATE_RUN;
00115 onlyTrigger=false;
00116
00117 m_laxRequests=false;
00118 m_doMcpRequests=true;
00119 m_doIdleProcessing=false;
00120 m_nameServiceOverride=false;
00121
00122
00123 myCommsHelper=commsHelperFactory["CMUIPC"](name);
00124
00125 char *buffer;
00126 buffer=getenv("WURDE_CONFIG");
00127 if(buffer){
00128 m_globalConfigFile=buffer;
00129 }else{
00130 m_globalConfigFile="wurde-config.xml";
00131 }
00132
00133 endsleep.now();
00134 }
00135
00136
00137
00138 void CommsManager::initialize(){
00139
00140 setMinSleep(.0001);
00141 initialized=true;
00142 myCommsHelper->initialize(0,NULL,false);
00143 if(m_doMcpRequests){
00144 m_mcpRequest.setQueueMode(true);
00145 m_mcpRequest.setStreamName("MCP");
00146 m_mcpRequest.setAutoTag(false);
00147 registerConsumer(&m_mcpRequest);
00148 }
00149
00150 if(m_doHeartbeat){
00151 m_heartbeat.setStreamName(m_name);
00152 m_heartbeat.info.source=m_name;
00153 m_heartbeat.info.target="MCP";
00154 m_heartbeat.info.mType=MESSAGE_INFO;
00155 m_heartbeat.info.state.setValue(m_currState);
00156 m_heartbeat.info.managedType.setValue(PROC_MANAGED);
00157 InterfaceStream temp;
00158 vector<InterfaceStream> tempvector;
00159 temp.type="";
00160 temp.streamname="";
00161 tempvector.push_back(temp);
00162 m_heartbeat.info.interfaces.setValue(tempvector);
00163 m_heartbeat.setAutoTag(false);
00164 registerSupplier(&m_heartbeat);
00165 m_heartbeat.publishInfo();
00166 }else if(m_currState==STATE_INFO){
00167 m_heartbeat.setStreamName(m_name);
00168 registerSupplier(&m_heartbeat);
00169 m_heartbeat.info.source=m_name;
00170 m_heartbeat.info.target="MCP";
00171 m_heartbeat.info.mType=MESSAGE_INFO;
00172 m_heartbeat.info.state.setValue(m_currState);
00173 m_heartbeat.info.managedType.setValue(PROC_MANAGED);
00174 }
00175
00176
00177 }
00178
00179
00180 RunState CommsManager::runUpdate(){
00185 while(true){
00186 if(m_currState==STATE_QUIT||m_currState==STATE_RESET||m_currState==STATE_RESTART){
00187 break;
00188 }else if(m_currState==STATE_INFO){
00189
00190 sendInfo();
00191
00192 return STATE_QUIT;
00193
00194 }
00195
00196
00197 resetTriggers();
00198
00199
00200 if(preSleep&&m_doSleep){
00201
00202 this->sleep();
00203 }
00204
00205
00206 if(m_doHeartbeat){
00207
00208 m_heartbeat.data.timestamp.now();
00209 m_heartbeat.publishData();
00210 }
00211
00212
00213
00214 myCommsHelper->runUpdate();
00215
00216 if(m_currState==STATE_IDLE&&m_doIdleProcessing==false){
00217
00218 if(m_doHeartbeat)
00219 m_heartbeat.runUpdate();
00220
00221 if(m_doMcpRequests)
00222 m_mcpRequest.runUpdate();
00223 }else{
00224 updateObjects();
00225 }
00226
00227 if(m_doMcpRequests){
00228 handleMCP();
00229 }
00230
00231 if(m_doHeartbeat){
00232 if(m_heartbeat.newInfo()||m_heartbeat.newRequest()){
00233 m_heartbeat.getNextInfo();
00234 if(m_heartbeat.requests.mType==MESSAGE_REQUEST&&
00235 (m_heartbeat.requests.moduleName.getValue()==m_name||
00236 m_heartbeat.requests.moduleName.getValue()=="Any")){
00237 RunState temp=m_heartbeat.requests.state.getValue();
00238
00239 switch (temp){
00240 case STATE_QUIT:
00241 case STATE_RESET:
00242 case STATE_RESTART:
00243 case STATE_IDLE:
00244 case STATE_RUN:
00245 m_currState=temp;
00246 default:
00247 break;
00248 }
00249
00250 }else if(m_heartbeat.requests.mType==MESSAGE_INFO){
00251
00252
00253
00254
00255 #ifdef FULL_DEBUG
00256 #warning We need to handle heartbeat informational messages at some point
00257 #endif
00258 }
00259 }
00260 }
00261
00262
00263 if(m_doSleep&&!preSleep){
00264 this->sleep();
00265 }
00266
00267
00268 if(!onlyTrigger||activeTrigger()||triggerList.empty()){
00269 break;
00270 }
00271 }
00272
00273 return m_currState;
00274 }
00275
00276 void CommsManager::registerObject(Capability *someObject){
00277 g_logdebug << "Registering an object" << endl;
00278
00279 if(!initialized){
00280 initialize();
00281 }
00282
00283 someObject->setManager(this);
00284 someObject->setCOMObject(myCommsHelper->getCOMBase(someObject->getInterfaceName(),someObject->getGlobalName()));
00285
00286 objectList.push_back(someObject);
00287 }
00288
00289 void CommsManager::deregister(Capability *someObject){
00290 vector<Capability *>::iterator iter;
00291 bool found=false;
00292
00293 for(iter=objectList.begin();iter!=objectList.end();iter++){
00294 if(*iter==someObject){
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304 someObject->disconnect();
00305 found=true;
00306 break;
00307 }
00308 }
00309
00310 if(found){
00311 objectList.erase(iter);
00312 }
00313 }
00314
00315 void CommsManager::registerSupplier(Capability *someObject){
00316 registerObject(someObject);
00317 someObject->activateSupplier();
00318
00319
00320
00321
00322
00323 }
00324
00325 void CommsManager::registerConsumer(Capability *someObject){
00326 registerObject(someObject);
00327 someObject->activateConsumer();
00328
00329
00330
00331
00332
00333 }
00334
00335 void CommsManager::cleanUp(){
00336 if(m_doHeartbeat){
00337 g_debug("Sending quit message");
00338
00339 m_heartbeat.info.target="Any";
00340 m_heartbeat.info.state.setValue(STATE_QUIT);
00341 m_heartbeat.publishInfo();
00342
00343 runUpdate();
00344 m_heartbeat.runUpdate();
00345 myCommsHelper->runUpdate();
00346
00347 }
00348 initialized=false;
00349 for(unsigned int i=0;i<objectList.size();++i){
00350 objectList[i]->disconnect();
00351 }
00352
00353 objectList.clear();
00354
00355 myCommsHelper->disconnect();
00356
00357 }
00358
00359 void CommsManager::setSleep(double seconds){
00360 int count=0;
00361
00362 m_heartbeat.data.TTL=seconds*3;
00363 while(seconds>=1){
00364 count++;
00365 seconds--;
00366 }
00367
00368 sleepInterval.tv_sec=count;
00369 sleepInterval.tv_nsec=(long int) (seconds*1000000000);
00370
00371 }
00372
00373 void CommsManager::setMinSleep(double seconds){
00374 int count=0;
00375
00376 while(seconds>=1){
00377 count++;
00378 seconds--;
00379 }
00380
00381 minSleep.setSeconds(count);
00382 minSleep.setUSeconds((long int) (seconds*1000000));
00383
00384 }
00385
00386
00387 void CommsManager::setSleepOnly(double seconds){
00388 int count=0;
00389
00390 while(seconds>=1){
00391 count++;
00392 seconds--;
00393 }
00394
00395 sleepInterval.tv_sec=count;
00396 sleepInterval.tv_nsec=(long int) (seconds*1000000000);
00397
00398 }
00399
00400 void CommsManager::setHeartbeatOnly(double seconds){
00401 m_heartbeat.data.TTL=seconds;
00402 }
00403
00404 bool CommsManager::checkTriggers(Capability *someobject){
00405 bool retval=false;
00406
00407 vector<Trigger>::iterator iter;
00408
00409 for(iter=triggerList.begin();iter!=triggerList.end();iter++){
00410 if(iter->activateObject(someobject)){
00411 retval=true;
00412 }
00413 }
00414
00415 return retval;
00416 }
00417
00418 bool CommsManager::activeTrigger(){
00419 vector<Trigger>::iterator iter;
00420
00421 for(iter=triggerList.begin();iter!=triggerList.end();iter++){
00422 if(iter->active()){
00423 return true;
00424 }
00425 }
00426
00427 return false;
00428 }
00429
00430 void CommsManager::resetTriggers(){
00431 vector<Trigger>::iterator iter;
00432
00433 for(iter=triggerList.begin();iter!=triggerList.end();iter++){
00434 iter->reset();
00435 }
00436
00437 }
00438 void CommsManager::setIdle(){
00439 m_heartbeat.info.state.setValue(STATE_IDLE);
00440 m_heartbeat.publishInfo();
00441 m_currState=STATE_IDLE;
00442 }
00443 void CommsManager::setQuit(){
00444 m_heartbeat.info.state.setValue(STATE_QUIT);
00445 m_heartbeat.publishInfo();
00446 m_currState=STATE_QUIT;
00447 }
00448 void CommsManager::setReset(){
00449 m_heartbeat.info.state.setValue(STATE_RESET);
00450 m_heartbeat.publishInfo();
00451 m_currState=STATE_RESET;
00452 }
00453
00454 void CommsManager::setRun(){
00455 m_heartbeat.info.state.setValue(STATE_RUN);
00456 m_heartbeat.publishInfo();
00457 m_currState=STATE_RUN;
00458 }
00459
00460 void CommsManager::setRestart(){
00461 m_heartbeat.info.state.setValue(STATE_RESTART);
00462 m_heartbeat.publishInfo();
00463 m_currState=STATE_RESTART;
00464 }
00465
00466 void CommsManager::setInfoState(){
00467
00468 m_heartbeat.info.state.setValue(STATE_INFO);
00469 m_currState=STATE_INFO;
00470 m_doHeartbeat=false;
00471 m_doSleep=false;
00472 };
00473
00474 void CommsManager::parseOptions(int argc, char *argv[]){
00475 parseOptions(argc,argv,NULL);
00476 }
00477
00478 void CommsManager::parseOptions(int argc, char *argv[],char *optstring){
00479
00480 char *temp;
00481 temp=getenv("WURDE_CONFIG");
00482 if(temp){
00483 setGlobalConfigFile(temp);
00484 }
00485
00486 char curroption;
00487 string fulloptstring;
00488 if(optstring){
00489 fulloptstring=(string)"ron:g:h"+(string)optstring;
00490 }else{
00491 fulloptstring=(string)"ron:g:h";
00492 }
00493 curroption=getopt(argc,argv,fulloptstring.c_str());
00494 string tmpstring,tmpoption;
00495 while(curroption!=-1){
00496 switch(curroption){
00497 case 'r':
00498 setInfoState();
00499 g_debug("Set info state");
00500 break;
00501 case 'o':
00502 m_nameServiceOverride=true;
00503 break;
00504 case 'n':
00505 tmpstring=optarg;
00506 if(tmpstring[0]==' '){
00507 tmpstring.erase(0,1);
00508 }
00509 setName(tmpstring);
00510 break;
00511 case 'g':
00512 tmpstring=optarg;
00513 if(tmpstring[0]==' '){
00514 tmpstring.erase(0,1);
00515 }
00516 setGlobalConfigFile(tmpstring);
00517 break;
00518 case 'h':
00519 cout << "\n";
00520 cout << m_realName << ", part of "<< PACKAGE_STRING << endl;
00521 cout << "Invoked as " << m_name << endl;
00522 cout << "\nAvailable options:\n";
00523 cout << "\t-r: Registration mode. Submits module and interface information to MCP.\n\t\t Also prints available interfaces to stdout." << endl;
00524 cout << "\t-o: Set name service override (rebind) mode. Not applicable to all\n\t\t communication adaptors. Currently does not work with CMU IPC." << endl;
00525 cout << "\t-n: Name override. Changes the name of module as it binds to the name\n\t\t service. This may affect the internal configuration of the\n\t\tmodule."<< endl;
00526 cout << "\t-h: This message.\n";
00527 cout << m_helpstring;
00528
00529 exit(0);
00530 break;
00531 case '?':
00532 g_warn("Unknown command line option.");
00533 break;
00534 default:
00535 g_logdebug << "Option is " << curroption << endl;
00536 tmpoption=curroption;
00537 if(optarg){
00538 g_logdebug << "Has an argument: " << optarg << endl;
00539 tmpstring=optarg;
00540 if(tmpstring[0]==' '){
00541 tmpstring.erase(0,1);
00542 }
00543 g_globalConfiguration.setOption(tmpoption,tmpstring);
00544 }else{
00545 g_globalConfiguration.setOption(tmpoption,"true");
00546 }
00547 }
00548 curroption=getopt(argc,argv,fulloptstring.c_str());
00549 }
00550
00551 parseWURDEConfig(m_globalConfigFile);
00552 g_globalConfiguration.readEnvironment();
00553 if(g_robotLoggerPtr){
00554 g_robotLoggerPtr->setLogLevel(g_globalConfiguration.getLogLevel());
00555 }
00556
00557 }
00558
00559 void CommsManager::setName(std::string name){
00560 m_name=name;
00561 myCommsHelper->setName(name.c_str());
00562 m_heartbeat.setStreamName(name);
00563
00564 }
00565
00570 void CommsManager::sendInfo(){
00571
00572 vector<Capability*>::iterator iter;
00573 vector<InterfaceStream> interfaces;
00574 InterfaceStream temp;
00575
00576 cout << "Module Name: " << m_name << endl;
00577 cout << "===============================================" << endl;
00578 for(iter=objectList.begin();iter!=objectList.end();iter++){
00579
00580 if((*iter)->isSupplier()){
00581 temp.type=(*iter)->getInterfaceName();
00582 temp.streamname=(*iter)->getStreamName();
00583 interfaces.push_back(temp);
00584 cout << "Supplier: " << temp.streamname << " - Type: " << temp.type << endl;
00585 }else{
00586 cout << "Consumer: " << (*iter)->getGlobalName() << " - Type: " << (*iter)->getInterfaceName() << endl;
00587
00588 }
00589 }
00590 m_heartbeat.info.mType=MESSAGE_INFO;
00591 m_heartbeat.info.target="MCP";
00592 m_heartbeat.info.source=m_name;
00593 m_heartbeat.info.interfaces.setValue(interfaces);
00594 m_heartbeat.info.state.setValue(STATE_INFO);
00595 m_heartbeat.publishInfo();
00596 m_heartbeat.runUpdate();
00597
00598 }
00599
00600 void CommsManager::handleMCPReply(McpRequestInfoStruct someinfo){
00601
00602 for(unsigned int i=0;i<objectList.size();++i){
00603 if(!objectList[i]->isSupplier()&&
00604 objectList[i]->getGlobalName()==m_mcpRequest.info.gblname.getValue()&&
00605 objectList[i]->getInterfaceName()==m_mcpRequest.info.type.getValue()&&
00606 !objectList[i]->hasStream()){
00607 g_debug("Found the object. Setting it up.");
00608 objectList[i]->setStreamName(m_mcpRequest.info.stream.getValue());
00609 objectList[i]->setSourceModuleName(m_mcpRequest.info.robotModule.getValue());
00610 break;
00611 }
00612 }
00613
00614 }
00615
00622 void CommsManager::sleep(){
00623 struct timespec rem;
00624 Time remainder, temp(sleepInterval);
00625
00626
00627
00628
00629 remainder.now();
00630 remainder=remainder-endsleep;
00631
00632 if(remainder<temp){
00633 remainder=temp-remainder;
00634 }else{
00635 remainder.zero();
00636 }
00637
00638
00639 if(remainder>minSleep){
00640 nanosleep(&remainder.timespec(),NULL);
00641 }else{
00642 nanosleep(&minSleep.timespec(),NULL);
00643 }
00644
00645
00646 endsleep.now();
00647 }
00648
00649 void CommsManager::updateObjects(){
00650
00651 for(unsigned int i=0;i<objectList.size();++i){
00652
00653 if(objectList[i]->isSupplier()||objectList[i]->hasStream()){
00654
00655 objectList[i]->runUpdate();
00656
00657 if(onlyTrigger&&(objectList[i]->newData()||objectList[i]->newInfo())){
00658 checkTriggers(objectList[i]);
00659 }
00660 }else if(!objectList[i]->isSupplier()&&!objectList[i]->hasStream()){
00661 if(m_doMcpRequests){
00662 if(objectList[i]->getRequestStatus()==STAT_NULL){
00663 g_debug("Sending a request to find an appropriate stream...");
00664 cout << objectList[i]->getRequestStatus() << endl;
00665 m_mcpRequest.requests.source=m_name;
00666 m_mcpRequest.requests.target="MCP";
00667 m_mcpRequest.requests.mType=MESSAGE_REQUEST;
00668 m_mcpRequest.requests.status.setValue(STAT_REQUEST);
00669 m_mcpRequest.requests.type.setValueAndLock(objectList[i]->getInterfaceName());
00670 m_mcpRequest.requests.gblname.setValueAndLock(objectList[i]->getGlobalName());
00671 m_mcpRequest.requests.strategy.setValueAndLock(objectList[i]->getConnectionStrategy());
00672 m_mcpRequest.publishRequest();
00673 objectList[i]->setRequestStatus(STAT_PENDING);
00674 }
00675 }
00676 }
00677 }
00678
00679 }
00680
00681 void CommsManager::handleMCP(){
00682
00683 while(m_mcpRequest.newInfo()){
00684
00685 m_mcpRequest.getNextInfo();
00686 if(m_mcpRequest.info.mType==MESSAGE_INFO&&m_mcpRequest.info.target==m_name){
00687
00688 if(m_mcpRequest.info.status.getValue()==STAT_FAIL){
00689 if(m_mcpRequest.info.strategy.getValue()==STRAT_ASSIGNED&&m_laxRequests){
00690 m_mcpRequest.requests=m_mcpRequest.info;
00691 m_mcpRequest.requests.mType=MESSAGE_REQUEST;
00692 m_mcpRequest.requests.source=m_name;
00693 m_mcpRequest.requests.target="MCP";
00694 m_mcpRequest.requests.status.setValue(STAT_REQUEST);
00695 m_mcpRequest.requests.strategy.setValue(STRAT_AUTO);
00696 m_mcpRequest.publishRequest();
00697 }else{
00698
00699 #ifdef FULL_DEBUG
00700 #warning Need appropriate failure mode in CommsManager McpRequest handling.
00701 #endif
00702 g_error("MCP couldn't fill a module request!");
00703 }
00704 }else if(m_mcpRequest.info.status.getValue()==STAT_OKAY){
00705 g_debug("MCP found a source for us.");
00706 handleMCPReply(m_mcpRequest.info);
00707 }
00708 }else{
00709
00710 }
00711 }
00712
00713 }
00714
00715 bool registerSupplier(Capability *someObject){
00716 if(g_globalCommsManager_ptr){
00717 g_globalCommsManager_ptr->registerSupplier(someObject);
00718 return true;
00719 }
00720 g_logerror << "Attempted to register a supplier through global interface, but there's no CommsManager yet." << std::endl;
00721 return false;
00722 }
00723
00724 bool registerConsumer(Capability *someObject){
00725 if(g_globalCommsManager_ptr){
00726 g_globalCommsManager_ptr->registerConsumer(someObject);
00727 return true;
00728 }
00729 g_logerror << "Attempted to register a consumer through global interface, but there's no CommsManager yet." << std::endl;
00730 return false;
00731 }
00732 }