/home/fwph/code/wurde/rde/core/capabilities/Heartbeat.cpp

Go to the documentation of this file.
00001 
00029 #include <Heartbeat.H>
00030 #include <CommsManager.H>
00031 #include <Logger.H>
00032 
00033 // $Id: Type.cpp 68 2007-02-01 17:56:57Z fwph $
00034 
00035 using namespace WURDE;
00036 
00037 Heartbeat::Heartbeat(const std::string & streamname){
00038        init(streamname,STRAT_NORMAL,"Any");
00039 
00040        //global name will be set later
00041        m_haveGlobalname=false;
00042 }
00043 
00044 Heartbeat::Heartbeat(const ConnectionStrategy & strategy,const std::string & gblName){
00045        init("Any",strategy,gblName);
00046 
00047        if(strategy==STRAT_NORMAL){
00048               //then the global name really specifies the streamname,
00049               // and the global name should be assigned automagically once we have
00050               // confirmation that the stream has been started
00051               m_haveGlobalname=false;   
00052        }
00053 
00054        m_haveStream=false;
00055        
00056 }
00057 
00058 Heartbeat::Heartbeat(const std::string &streamname,const std::string &gblName){
00059        init(streamname,STRAT_NORMAL,gblName);
00060 }
00061 
00062 void Heartbeat::init(const std::string &streamname,const ConnectionStrategy &strategy,const std::string &gblName){
00063   m_globalname=gblName;
00064   m_streamname=streamname;
00065 
00066   myCOMObject=NULL;
00067   
00068   //  data.source=streamname.c_str();
00069   requests=info;
00070 
00071   m_queue=false;
00072   m_supplier=false;
00073   m_doTimestamp=true;
00074   m_doTag=true;
00075 
00076   m_doPublishData=false;
00077   m_doPublishInfo=false;
00078 
00079   m_initialize=false;
00080   m_strategy=strategy;
00081 
00082   m_haveStream=true;
00083   m_haveGlobalname=true;
00084   m_doPublishData=false;
00085   m_doPublishInfo=false;
00086 
00087   m_doAutoPing=true;
00088 
00089   info.source=streamname;
00090   requests.source=m_globalname;
00091   info.target="Any";
00092   requests.target=streamname;
00093   info.timestamp.now();
00094   requests.timestamp.now();
00095 
00096   m_streamRequest=STAT_NULL;
00097   
00098   m_sourceModule="Unknown";
00099 }
00100 
00101 
00102 Heartbeat::~Heartbeat(){
00103   myManager=NULL;
00104   if(myCOMObject){
00105 
00106          //      delete myCOMObject;
00107   }
00108 }
00109 
00110 
00111 void Heartbeat::activateSupplier(){
00112        if(m_haveStream){
00113               m_supplier=true;
00114               m_globalname=m_streamname;
00115               m_haveGlobalname=true;
00116               m_initialize=false;
00117               if(myCOMObject){
00118                       //                     myCOMObject=new CMUIPCRobotProtocol::CMUIPCHeartbeat(m_globalname);
00119                      myCOMObject->setGlobalName(m_globalname);
00120                      myCOMObject->setStreamName(m_streamname);
00121                      myCOMObject->setInfo(&requests);
00122                      myCOMObject->setRequests(&info);
00123                      myCOMObject->setStaging(&data);
00124                      myCOMObject->activateSupplier();
00125                      myCOMObject->setAutoPing(m_doAutoPing);
00126                      myCOMObject->setInfoVector(&m_requestVector);
00127                      myCOMObject->setRequestVector(&m_infoVector);
00128                      myCOMObject->setDataVector(&m_dataVector);
00129 
00130                      data.source=m_streamname;
00131                      info.source=m_streamname;
00132 
00133                      if(m_queue){
00134                             myCOMObject->setQueueMode(true);
00135                      }
00136 
00137               }else{
00138                       g_fatal("Attempted to activateSupplier before setting a COMObject Please just use registerSupplier.");
00139                       exit(-1);
00140               }
00141         data.TTL=0;
00142         if(m_supplier){
00143                 info.moduleName.setCMode(C_SUPPLIER);
00144         }else{
00145                 info.moduleName.setCMode(C_CONSUMER);
00146         }
00147         if(m_supplier){
00148                 info.locale.setCMode(C_SUPPLIER);
00149         }else{
00150                 info.locale.setCMode(C_CONSUMER);
00151         }
00152         if(m_supplier){
00153                 info.state.setCMode(C_SUPPLIER);
00154         }else{
00155                 info.state.setCMode(C_CONSUMER);
00156         }
00157         if(m_supplier){
00158                 info.interfaces.setCMode(C_SUPPLIER);
00159         }else{
00160                 info.interfaces.setCMode(C_CONSUMER);
00161         }
00162         if(m_supplier){
00163                 info.managedType.setCMode(C_SUPPLIER);
00164         }else{
00165                 info.managedType.setCMode(C_CONSUMER);
00166         }
00167        }else{
00168               //this shouldn't happen, but there's no real problem with it.
00169               //it'd just be dumb.
00170               m_initialize=true;
00171               m_supplier=true;
00172        }
00173        
00174        requests=info;
00175 
00176 }
00177 
00178 void Heartbeat::activateConsumer(){
00179        std::string localname;
00180        
00181        if(m_haveStream){
00182               m_initialize=false;
00183               m_supplier=false;
00184               if(myCOMObject&&myManager){
00185                       /* global name so we don't have conflicts with the naming service */
00186                       if(!m_haveGlobalname&&m_strategy==STRAT_NORMAL){
00187                               m_globalname=myManager->getName()+HEARTBEAT+m_streamname+"Consumer";
00188                               m_haveGlobalname=true;
00189                       }
00190 
00191                      myCOMObject->setGlobalName(m_globalname);
00192                      myCOMObject->setStreamName(m_streamname);
00193                      requests.source=m_globalname;
00194                      data.source=m_streamname;
00195                      info.source=m_streamname;
00196                      myCOMObject->setInfo(&info);
00197                      myCOMObject->setRequests(&requests);
00198                      myCOMObject->setStaging(&data);
00199                      myCOMObject->setAutoPing(m_doAutoPing);
00200                      myCOMObject->activateConsumer();
00201                      myCOMObject->setInfoVector(&m_infoVector);
00202                      myCOMObject->setRequestVector(&m_requestVector);
00203                      myCOMObject->setDataVector(&m_dataVector);
00204 
00205                      if(m_queue){
00206                             myCOMObject->setQueueMode(true);
00207                      }
00208               }else{
00209                       g_fatal("Attempted to activateConsumer before setting a COMObject and CommsManager. Please use registerConsumer instead.");
00210                      exit(-1);
00211               }
00212 
00213         data.TTL=0;
00214         if(m_supplier){
00215                 info.moduleName.setCMode(C_SUPPLIER);
00216         }else{
00217                 info.moduleName.setCMode(C_CONSUMER);
00218         }
00219         if(m_supplier){
00220                 info.locale.setCMode(C_SUPPLIER);
00221         }else{
00222                 info.locale.setCMode(C_CONSUMER);
00223         }
00224         if(m_supplier){
00225                 info.state.setCMode(C_SUPPLIER);
00226         }else{
00227                 info.state.setCMode(C_CONSUMER);
00228         }
00229         if(m_supplier){
00230                 info.interfaces.setCMode(C_SUPPLIER);
00231         }else{
00232                 info.interfaces.setCMode(C_CONSUMER);
00233         }
00234         if(m_supplier){
00235                 info.managedType.setCMode(C_SUPPLIER);
00236         }else{
00237                 info.managedType.setCMode(C_CONSUMER);
00238         }
00239        }else{
00240               //no stream, so we can't connect yet. tell the system 
00241               //to initialize as soon as we've got one
00242               m_initialize=true;
00243        }
00244 
00245 }
00246 
00247 void Heartbeat::runUpdate(){
00248        if(myCOMObject){
00249 
00250               myCOMObject->runUpdate(m_doPublishData,m_doPublishInfo);
00251               
00252               m_doPublishData=false;
00253               m_doPublishInfo=false;
00254               
00255        }
00256 }
00257 
00258 void Heartbeat::setManager(CommsManager *myManager_){
00259 
00260   if(!myCOMObject){
00261     myManager=myManager_;
00262   }
00263 
00264 }
00265 
00266 void Heartbeat::setQueueMode(const bool & val){
00267 
00268        if(!myCOMObject){
00269               m_queue=val;
00270        }else if(!m_queue&&val){
00271               m_queue=val;
00272               myCOMObject->setDataVector(&m_dataVector);
00273               myCOMObject->setQueueMode(true);
00274 
00275        }else if(!val&&m_queue){
00276               m_queue=val;
00277               myCOMObject->setQueueMode(false);
00278               while(!m_dataVector.empty()){
00279                      m_dataVector.pop();
00280               }
00281               while(!m_infoVector.empty()){
00282                      m_infoVector.pop();
00283               }
00284               while(!m_requestVector.empty()){
00285                      m_requestVector.pop();
00286               }
00287        }
00288 }
00289 
00290 bool Heartbeat::newData(){
00291        if(m_queue){
00292               return !m_dataVector.empty();
00293        }else{
00294               if(myCOMObject){
00295                      return myCOMObject->newData();
00296               }
00297        }
00298 
00299        return false;
00300 }
00301 
00302 bool Heartbeat::newInfo(){
00303        if(m_queue){
00304               if(m_supplier){
00305                      return !m_requestVector.empty();
00306               }
00307 
00308               return !m_infoVector.empty();
00309        }else{
00310               if(myCOMObject){
00311                      return myCOMObject->newInfo();
00312               }
00313        }
00314 
00315 
00316        return false;
00317 }
00318 
00319 void Heartbeat::publishData(){
00320        if(m_supplier){
00321               if(m_doTimestamp){
00322                      data.timestamp.now();
00323               }
00324               if(m_doTag){
00325                      data.source=m_globalname;
00326               }
00327               if(m_queue){
00328                      HeartbeatDataStruct temp;
00329         temp.TTL=0;
00330                      temp.source=data.source;
00331 temp.timestamp=data.timestamp;
00332 temp.TTL=data.TTL;
00333  
00334                      m_dataVector.push(temp);
00335                               //                     m_dataVector.push(data);
00336               }
00337               m_doPublishData=true;
00338        }
00339 }
00340 
00341 void Heartbeat::publishInfo(){
00342        if(m_supplier){
00343 
00344               info.mType=MESSAGE_INFO;
00345               if(m_doTimestamp){
00346                      info.timestamp.now();
00347               }
00348 
00349               if(m_doTag){
00350                      info.source=m_globalname;
00351                      info.target="Any";
00352               }
00353 
00354               if(m_queue){
00355                      HeartbeatInfoStruct temp;
00356                      
00357                     
00358                      temp.moduleName=info.moduleName;
00359 temp.source=info.source;
00360 temp.locale=info.locale;
00361 temp.state=info.state;
00362 temp.mType=info.mType;
00363 temp.target=info.target;
00364 temp.timestamp=info.timestamp;
00365 temp.interfaces=info.interfaces;
00366 temp.managedType=info.managedType;
00367 
00368                      m_infoVector.push(temp);
00369                     
00370                      //              m_infoVector.push(info);
00371               }
00372 
00373               m_doPublishInfo=true;
00374        }else{
00375 
00376               requests.mType=MESSAGE_REQUEST;
00377               
00378               if(m_doTimestamp){
00379                      requests.timestamp.now();
00380               }
00381 
00382               if(m_doTag){
00383                      requests.source=m_globalname;
00384                      requests.target=m_streamname;
00385               }
00386 
00387               if(m_queue){
00388                      HeartbeatInfoStruct temp;
00389         if(m_supplier){
00390                 temp.moduleName.setCMode(C_SUPPLIER);
00391         }else{
00392                 temp.moduleName.setCMode(C_CONSUMER);
00393         }
00394         if(m_supplier){
00395                 temp.locale.setCMode(C_SUPPLIER);
00396         }else{
00397                 temp.locale.setCMode(C_CONSUMER);
00398         }
00399         if(m_supplier){
00400                 temp.state.setCMode(C_SUPPLIER);
00401         }else{
00402                 temp.state.setCMode(C_CONSUMER);
00403         }
00404         if(m_supplier){
00405                 temp.interfaces.setCMode(C_SUPPLIER);
00406         }else{
00407                 temp.interfaces.setCMode(C_CONSUMER);
00408         }
00409         if(m_supplier){
00410                 temp.managedType.setCMode(C_SUPPLIER);
00411         }else{
00412                 temp.managedType.setCMode(C_CONSUMER);
00413         }
00414                      temp.moduleName=requests.moduleName;
00415 temp.source=requests.source;
00416 temp.locale=requests.locale;
00417 temp.state=requests.state;
00418 temp.mType=requests.mType;
00419 temp.target=requests.target;
00420 temp.interfaces=requests.interfaces;
00421 temp.timestamp=requests.timestamp;
00422 temp.managedType=requests.managedType;
00423 
00424                      m_requestVector.push(temp);
00425 
00426                      //              m_requestVector.push(requests);
00427               }
00428 
00429               m_doPublishInfo=true;
00430        }
00431 }
00432 
00433 HeartbeatDataStruct Heartbeat::getNextData(){
00434        if(m_queue){
00435               HeartbeatDataStruct retval;
00436         retval.TTL=0;
00437               retval.source="NULL";
00438               if(m_dataVector.empty()){
00439                      return data;
00440               }
00441              
00442               retval.source=m_dataVector.front().source;
00443 retval.timestamp=m_dataVector.front().timestamp;
00444 retval.TTL=m_dataVector.front().TTL;
00445 
00446                        //             retval=m_dataVector.front();
00447               m_dataVector.pop();
00448               data=retval;
00449        
00450               return retval;
00451        }else{
00452               myCOMObject->resetData();
00453               return data;
00454        }
00455 }
00456 
00457 HeartbeatInfoStruct Heartbeat::getNextInfo(){
00458        if(m_queue){
00459               HeartbeatInfoStruct retval;
00460         if(m_supplier){
00461                 retval.moduleName.setCMode(C_SUPPLIER);
00462         }else{
00463                 retval.moduleName.setCMode(C_CONSUMER);
00464         }
00465         if(m_supplier){
00466                 retval.locale.setCMode(C_SUPPLIER);
00467         }else{
00468                 retval.locale.setCMode(C_CONSUMER);
00469         }
00470         if(m_supplier){
00471                 retval.state.setCMode(C_SUPPLIER);
00472         }else{
00473                 retval.state.setCMode(C_CONSUMER);
00474         }
00475         if(m_supplier){
00476                 retval.interfaces.setCMode(C_SUPPLIER);
00477         }else{
00478                 retval.interfaces.setCMode(C_CONSUMER);
00479         }
00480         if(m_supplier){
00481                 retval.managedType.setCMode(C_SUPPLIER);
00482         }else{
00483                 retval.managedType.setCMode(C_CONSUMER);
00484         }
00485               retval.source="NULL";
00486               if(m_supplier){
00487                      if(m_requestVector.empty()){
00488                             return requests;
00489                      }
00490                      retval.moduleName=m_requestVector.front().moduleName;
00491 retval.source=m_requestVector.front().source;
00492 retval.locale=m_requestVector.front().locale;
00493 retval.state=m_requestVector.front().state;
00494 retval.mType=m_requestVector.front().mType;
00495 retval.target=m_requestVector.front().target;
00496 retval.interfaces=m_requestVector.front().interfaces;
00497 retval.timestamp=m_requestVector.front().timestamp;
00498 retval.managedType=m_requestVector.front().managedType;
00499 
00500                               //                     retval=m_requestVector.front();
00501         if(m_supplier){
00502                 retval.moduleName.setCMode(C_SUPPLIER);
00503         }else{
00504                 retval.moduleName.setCMode(C_CONSUMER);
00505         }
00506         if(m_supplier){
00507                 retval.locale.setCMode(C_SUPPLIER);
00508         }else{
00509                 retval.locale.setCMode(C_CONSUMER);
00510         }
00511         if(m_supplier){
00512                 retval.state.setCMode(C_SUPPLIER);
00513         }else{
00514                 retval.state.setCMode(C_CONSUMER);
00515         }
00516         if(m_supplier){
00517                 retval.interfaces.setCMode(C_SUPPLIER);
00518         }else{
00519                 retval.interfaces.setCMode(C_CONSUMER);
00520         }
00521         if(m_supplier){
00522                 retval.managedType.setCMode(C_SUPPLIER);
00523         }else{
00524                 retval.managedType.setCMode(C_CONSUMER);
00525         }
00526                      m_requestVector.pop();
00527                      requests=retval;
00528                      return retval;
00529               }else{
00530                      if(m_infoVector.empty()){
00531                             return info;
00532                      }
00533                      retval.moduleName=m_infoVector.front().moduleName;
00534 retval.source=m_infoVector.front().source;
00535 retval.locale=m_infoVector.front().locale;
00536 retval.state=m_infoVector.front().state;
00537 retval.mType=m_infoVector.front().mType;
00538 retval.target=m_infoVector.front().target;
00539 retval.interfaces=m_infoVector.front().interfaces;
00540 retval.timestamp=m_infoVector.front().timestamp;
00541 retval.managedType=m_infoVector.front().managedType;
00542 
00543         if(m_supplier){
00544                 retval.moduleName.setCMode(C_SUPPLIER);
00545         }else{
00546                 retval.moduleName.setCMode(C_CONSUMER);
00547         }
00548         if(m_supplier){
00549                 retval.locale.setCMode(C_SUPPLIER);
00550         }else{
00551                 retval.locale.setCMode(C_CONSUMER);
00552         }
00553         if(m_supplier){
00554                 retval.state.setCMode(C_SUPPLIER);
00555         }else{
00556                 retval.state.setCMode(C_CONSUMER);
00557         }
00558         if(m_supplier){
00559                 retval.interfaces.setCMode(C_SUPPLIER);
00560         }else{
00561                 retval.interfaces.setCMode(C_CONSUMER);
00562         }
00563         if(m_supplier){
00564                 retval.managedType.setCMode(C_SUPPLIER);
00565         }else{
00566                 retval.managedType.setCMode(C_CONSUMER);
00567         }
00568                      m_infoVector.pop();
00569                      info=retval;
00570                      // the requests struct really should get the values in 
00571                      // info
00572                      requests=info;
00573                      
00574                      return retval;
00575               }
00576        }else{
00577               myCOMObject->resetInfo();
00578               if(m_supplier){
00579         if(m_supplier){
00580                 requests.moduleName.setCMode(C_SUPPLIER);
00581         }else{
00582                 requests.moduleName.setCMode(C_CONSUMER);
00583         }
00584         if(m_supplier){
00585                 requests.locale.setCMode(C_SUPPLIER);
00586         }else{
00587                 requests.locale.setCMode(C_CONSUMER);
00588         }
00589         if(m_supplier){
00590                 requests.state.setCMode(C_SUPPLIER);
00591         }else{
00592                 requests.state.setCMode(C_CONSUMER);
00593         }
00594         if(m_supplier){
00595                 requests.interfaces.setCMode(C_SUPPLIER);
00596         }else{
00597                 requests.interfaces.setCMode(C_CONSUMER);
00598         }
00599         if(m_supplier){
00600                 requests.managedType.setCMode(C_SUPPLIER);
00601         }else{
00602                 requests.managedType.setCMode(C_CONSUMER);
00603         }
00604                      return requests;
00605               }else{
00606         if(m_supplier){
00607                 info.moduleName.setCMode(C_SUPPLIER);
00608         }else{
00609                 info.moduleName.setCMode(C_CONSUMER);
00610         }
00611         if(m_supplier){
00612                 info.locale.setCMode(C_SUPPLIER);
00613         }else{
00614                 info.locale.setCMode(C_CONSUMER);
00615         }
00616         if(m_supplier){
00617                 info.state.setCMode(C_SUPPLIER);
00618         }else{
00619                 info.state.setCMode(C_CONSUMER);
00620         }
00621         if(m_supplier){
00622                 info.interfaces.setCMode(C_SUPPLIER);
00623         }else{
00624                 info.interfaces.setCMode(C_CONSUMER);
00625         }
00626         if(m_supplier){
00627                 info.managedType.setCMode(C_SUPPLIER);
00628         }else{
00629                 info.managedType.setCMode(C_CONSUMER);
00630         }
00631                      // the requests struct really should get the values in 
00632                      // info
00633                      requests=info;
00634                      return info;
00635               }
00636        }
00637 }
00638 
00639 void Heartbeat::setStreamName(const std::string & name){
00640 
00641        if(m_supplier&&name=="Any"){
00642               return;
00643        }
00644 
00645        m_streamname=name;
00646        m_haveStream=true;
00647 
00648        if(m_supplier){
00649               m_globalname=name;
00650        }
00651        
00652        if(myCOMObject){
00653               myCOMObject->setStreamName(name);
00654        }
00655 
00656        if(m_initialize){
00657               if(m_supplier){
00658                      activateSupplier();
00659               }else{
00660                      activateConsumer();
00661               }
00662        }
00663 
00664        m_streamRequest=STAT_OKAY;
00665 }
00666 
00667 void Heartbeat::setGlobalName(const std::string & name){
00668        
00669        if(name=="Any"){
00670               return;
00671        }
00672 
00673        m_globalname=name;
00674        m_haveGlobalname=true;
00675        if(m_supplier){
00676               m_streamname=name;
00677        }
00678 
00679        if(myCOMObject){
00680               myCOMObject->setGlobalName(name);
00681        }
00682 }
00683 
00684 void Heartbeat::doPing(){
00685        if(myCOMObject){
00686               myCOMObject->doPing();
00687        }
00688 }
00689 
00690 void Heartbeat::setAutoPing(const bool & val){
00691        m_doAutoPing=val; 
00692        if(myCOMObject){
00693               myCOMObject->setAutoPing(val);
00694        }
00695 }
00696 
00697 

Generated on Thu Feb 1 15:31:53 2007 for WURDE by  doxygen 1.5.1