00001
00029 #include <Heartbeat.H>
00030 #include <CommsManager.H>
00031 #include <Logger.H>
00032
00033
00034
00035 using namespace WURDE;
00036
00037 Heartbeat::Heartbeat(const std::string & streamname){
00038 init(streamname,STRAT_NORMAL,"Any");
00039
00040
00041 m_haveGlobalname=false;
00042 }
00043
00044 Heartbeat::Heartbeat(const ConnectionStrategy & strategy,const std::string & gblName){
00045 init("Any",strategy,gblName);
00046
00047 if(strategy==STRAT_NORMAL){
00048
00049
00050
00051 m_haveGlobalname=false;
00052 }
00053
00054 m_haveStream=false;
00055
00056 }
00057
00058 Heartbeat::Heartbeat(const std::string &streamname,const std::string &gblName){
00059 init(streamname,STRAT_NORMAL,gblName);
00060 }
00061
00062 void Heartbeat::init(const std::string &streamname,const ConnectionStrategy &strategy,const std::string &gblName){
00063 m_globalname=gblName;
00064 m_streamname=streamname;
00065
00066 myCOMObject=NULL;
00067
00068
00069 requests=info;
00070
00071 m_queue=false;
00072 m_supplier=false;
00073 m_doTimestamp=true;
00074 m_doTag=true;
00075
00076 m_doPublishData=false;
00077 m_doPublishInfo=false;
00078
00079 m_initialize=false;
00080 m_strategy=strategy;
00081
00082 m_haveStream=true;
00083 m_haveGlobalname=true;
00084 m_doPublishData=false;
00085 m_doPublishInfo=false;
00086
00087 m_doAutoPing=true;
00088
00089 info.source=streamname;
00090 requests.source=m_globalname;
00091 info.target="Any";
00092 requests.target=streamname;
00093 info.timestamp.now();
00094 requests.timestamp.now();
00095
00096 m_streamRequest=STAT_NULL;
00097
00098 m_sourceModule="Unknown";
00099 }
00100
00101
00102 Heartbeat::~Heartbeat(){
00103 myManager=NULL;
00104 if(myCOMObject){
00105
00106
00107 }
00108 }
00109
00110
00111 void Heartbeat::activateSupplier(){
00112 if(m_haveStream){
00113 m_supplier=true;
00114 m_globalname=m_streamname;
00115 m_haveGlobalname=true;
00116 m_initialize=false;
00117 if(myCOMObject){
00118
00119 myCOMObject->setGlobalName(m_globalname);
00120 myCOMObject->setStreamName(m_streamname);
00121 myCOMObject->setInfo(&requests);
00122 myCOMObject->setRequests(&info);
00123 myCOMObject->setStaging(&data);
00124 myCOMObject->activateSupplier();
00125 myCOMObject->setAutoPing(m_doAutoPing);
00126 myCOMObject->setInfoVector(&m_requestVector);
00127 myCOMObject->setRequestVector(&m_infoVector);
00128 myCOMObject->setDataVector(&m_dataVector);
00129
00130 data.source=m_streamname;
00131 info.source=m_streamname;
00132
00133 if(m_queue){
00134 myCOMObject->setQueueMode(true);
00135 }
00136
00137 }else{
00138 g_fatal("Attempted to activateSupplier before setting a COMObject Please just use registerSupplier.");
00139 exit(-1);
00140 }
00141 data.TTL=0;
00142 if(m_supplier){
00143 info.moduleName.setCMode(C_SUPPLIER);
00144 }else{
00145 info.moduleName.setCMode(C_CONSUMER);
00146 }
00147 if(m_supplier){
00148 info.locale.setCMode(C_SUPPLIER);
00149 }else{
00150 info.locale.setCMode(C_CONSUMER);
00151 }
00152 if(m_supplier){
00153 info.state.setCMode(C_SUPPLIER);
00154 }else{
00155 info.state.setCMode(C_CONSUMER);
00156 }
00157 if(m_supplier){
00158 info.interfaces.setCMode(C_SUPPLIER);
00159 }else{
00160 info.interfaces.setCMode(C_CONSUMER);
00161 }
00162 if(m_supplier){
00163 info.managedType.setCMode(C_SUPPLIER);
00164 }else{
00165 info.managedType.setCMode(C_CONSUMER);
00166 }
00167 }else{
00168
00169
00170 m_initialize=true;
00171 m_supplier=true;
00172 }
00173
00174 requests=info;
00175
00176 }
00177
00178 void Heartbeat::activateConsumer(){
00179 std::string localname;
00180
00181 if(m_haveStream){
00182 m_initialize=false;
00183 m_supplier=false;
00184 if(myCOMObject&&myManager){
00185
00186 if(!m_haveGlobalname&&m_strategy==STRAT_NORMAL){
00187 m_globalname=myManager->getName()+HEARTBEAT+m_streamname+"Consumer";
00188 m_haveGlobalname=true;
00189 }
00190
00191 myCOMObject->setGlobalName(m_globalname);
00192 myCOMObject->setStreamName(m_streamname);
00193 requests.source=m_globalname;
00194 data.source=m_streamname;
00195 info.source=m_streamname;
00196 myCOMObject->setInfo(&info);
00197 myCOMObject->setRequests(&requests);
00198 myCOMObject->setStaging(&data);
00199 myCOMObject->setAutoPing(m_doAutoPing);
00200 myCOMObject->activateConsumer();
00201 myCOMObject->setInfoVector(&m_infoVector);
00202 myCOMObject->setRequestVector(&m_requestVector);
00203 myCOMObject->setDataVector(&m_dataVector);
00204
00205 if(m_queue){
00206 myCOMObject->setQueueMode(true);
00207 }
00208 }else{
00209 g_fatal("Attempted to activateConsumer before setting a COMObject and CommsManager. Please use registerConsumer instead.");
00210 exit(-1);
00211 }
00212
00213 data.TTL=0;
00214 if(m_supplier){
00215 info.moduleName.setCMode(C_SUPPLIER);
00216 }else{
00217 info.moduleName.setCMode(C_CONSUMER);
00218 }
00219 if(m_supplier){
00220 info.locale.setCMode(C_SUPPLIER);
00221 }else{
00222 info.locale.setCMode(C_CONSUMER);
00223 }
00224 if(m_supplier){
00225 info.state.setCMode(C_SUPPLIER);
00226 }else{
00227 info.state.setCMode(C_CONSUMER);
00228 }
00229 if(m_supplier){
00230 info.interfaces.setCMode(C_SUPPLIER);
00231 }else{
00232 info.interfaces.setCMode(C_CONSUMER);
00233 }
00234 if(m_supplier){
00235 info.managedType.setCMode(C_SUPPLIER);
00236 }else{
00237 info.managedType.setCMode(C_CONSUMER);
00238 }
00239 }else{
00240
00241
00242 m_initialize=true;
00243 }
00244
00245 }
00246
00247 void Heartbeat::runUpdate(){
00248 if(myCOMObject){
00249
00250 myCOMObject->runUpdate(m_doPublishData,m_doPublishInfo);
00251
00252 m_doPublishData=false;
00253 m_doPublishInfo=false;
00254
00255 }
00256 }
00257
00258 void Heartbeat::setManager(CommsManager *myManager_){
00259
00260 if(!myCOMObject){
00261 myManager=myManager_;
00262 }
00263
00264 }
00265
00266 void Heartbeat::setQueueMode(const bool & val){
00267
00268 if(!myCOMObject){
00269 m_queue=val;
00270 }else if(!m_queue&&val){
00271 m_queue=val;
00272 myCOMObject->setDataVector(&m_dataVector);
00273 myCOMObject->setQueueMode(true);
00274
00275 }else if(!val&&m_queue){
00276 m_queue=val;
00277 myCOMObject->setQueueMode(false);
00278 while(!m_dataVector.empty()){
00279 m_dataVector.pop();
00280 }
00281 while(!m_infoVector.empty()){
00282 m_infoVector.pop();
00283 }
00284 while(!m_requestVector.empty()){
00285 m_requestVector.pop();
00286 }
00287 }
00288 }
00289
00290 bool Heartbeat::newData(){
00291 if(m_queue){
00292 return !m_dataVector.empty();
00293 }else{
00294 if(myCOMObject){
00295 return myCOMObject->newData();
00296 }
00297 }
00298
00299 return false;
00300 }
00301
00302 bool Heartbeat::newInfo(){
00303 if(m_queue){
00304 if(m_supplier){
00305 return !m_requestVector.empty();
00306 }
00307
00308 return !m_infoVector.empty();
00309 }else{
00310 if(myCOMObject){
00311 return myCOMObject->newInfo();
00312 }
00313 }
00314
00315
00316 return false;
00317 }
00318
00319 void Heartbeat::publishData(){
00320 if(m_supplier){
00321 if(m_doTimestamp){
00322 data.timestamp.now();
00323 }
00324 if(m_doTag){
00325 data.source=m_globalname;
00326 }
00327 if(m_queue){
00328 HeartbeatDataStruct temp;
00329 temp.TTL=0;
00330 temp.source=data.source;
00331 temp.timestamp=data.timestamp;
00332 temp.TTL=data.TTL;
00333
00334 m_dataVector.push(temp);
00335
00336 }
00337 m_doPublishData=true;
00338 }
00339 }
00340
00341 void Heartbeat::publishInfo(){
00342 if(m_supplier){
00343
00344 info.mType=MESSAGE_INFO;
00345 if(m_doTimestamp){
00346 info.timestamp.now();
00347 }
00348
00349 if(m_doTag){
00350 info.source=m_globalname;
00351 info.target="Any";
00352 }
00353
00354 if(m_queue){
00355 HeartbeatInfoStruct temp;
00356
00357
00358 temp.moduleName=info.moduleName;
00359 temp.source=info.source;
00360 temp.locale=info.locale;
00361 temp.state=info.state;
00362 temp.mType=info.mType;
00363 temp.target=info.target;
00364 temp.timestamp=info.timestamp;
00365 temp.interfaces=info.interfaces;
00366 temp.managedType=info.managedType;
00367
00368 m_infoVector.push(temp);
00369
00370
00371 }
00372
00373 m_doPublishInfo=true;
00374 }else{
00375
00376 requests.mType=MESSAGE_REQUEST;
00377
00378 if(m_doTimestamp){
00379 requests.timestamp.now();
00380 }
00381
00382 if(m_doTag){
00383 requests.source=m_globalname;
00384 requests.target=m_streamname;
00385 }
00386
00387 if(m_queue){
00388 HeartbeatInfoStruct temp;
00389 if(m_supplier){
00390 temp.moduleName.setCMode(C_SUPPLIER);
00391 }else{
00392 temp.moduleName.setCMode(C_CONSUMER);
00393 }
00394 if(m_supplier){
00395 temp.locale.setCMode(C_SUPPLIER);
00396 }else{
00397 temp.locale.setCMode(C_CONSUMER);
00398 }
00399 if(m_supplier){
00400 temp.state.setCMode(C_SUPPLIER);
00401 }else{
00402 temp.state.setCMode(C_CONSUMER);
00403 }
00404 if(m_supplier){
00405 temp.interfaces.setCMode(C_SUPPLIER);
00406 }else{
00407 temp.interfaces.setCMode(C_CONSUMER);
00408 }
00409 if(m_supplier){
00410 temp.managedType.setCMode(C_SUPPLIER);
00411 }else{
00412 temp.managedType.setCMode(C_CONSUMER);
00413 }
00414 temp.moduleName=requests.moduleName;
00415 temp.source=requests.source;
00416 temp.locale=requests.locale;
00417 temp.state=requests.state;
00418 temp.mType=requests.mType;
00419 temp.target=requests.target;
00420 temp.interfaces=requests.interfaces;
00421 temp.timestamp=requests.timestamp;
00422 temp.managedType=requests.managedType;
00423
00424 m_requestVector.push(temp);
00425
00426
00427 }
00428
00429 m_doPublishInfo=true;
00430 }
00431 }
00432
00433 HeartbeatDataStruct Heartbeat::getNextData(){
00434 if(m_queue){
00435 HeartbeatDataStruct retval;
00436 retval.TTL=0;
00437 retval.source="NULL";
00438 if(m_dataVector.empty()){
00439 return data;
00440 }
00441
00442 retval.source=m_dataVector.front().source;
00443 retval.timestamp=m_dataVector.front().timestamp;
00444 retval.TTL=m_dataVector.front().TTL;
00445
00446
00447 m_dataVector.pop();
00448 data=retval;
00449
00450 return retval;
00451 }else{
00452 myCOMObject->resetData();
00453 return data;
00454 }
00455 }
00456
00457 HeartbeatInfoStruct Heartbeat::getNextInfo(){
00458 if(m_queue){
00459 HeartbeatInfoStruct retval;
00460 if(m_supplier){
00461 retval.moduleName.setCMode(C_SUPPLIER);
00462 }else{
00463 retval.moduleName.setCMode(C_CONSUMER);
00464 }
00465 if(m_supplier){
00466 retval.locale.setCMode(C_SUPPLIER);
00467 }else{
00468 retval.locale.setCMode(C_CONSUMER);
00469 }
00470 if(m_supplier){
00471 retval.state.setCMode(C_SUPPLIER);
00472 }else{
00473 retval.state.setCMode(C_CONSUMER);
00474 }
00475 if(m_supplier){
00476 retval.interfaces.setCMode(C_SUPPLIER);
00477 }else{
00478 retval.interfaces.setCMode(C_CONSUMER);
00479 }
00480 if(m_supplier){
00481 retval.managedType.setCMode(C_SUPPLIER);
00482 }else{
00483 retval.managedType.setCMode(C_CONSUMER);
00484 }
00485 retval.source="NULL";
00486 if(m_supplier){
00487 if(m_requestVector.empty()){
00488 return requests;
00489 }
00490 retval.moduleName=m_requestVector.front().moduleName;
00491 retval.source=m_requestVector.front().source;
00492 retval.locale=m_requestVector.front().locale;
00493 retval.state=m_requestVector.front().state;
00494 retval.mType=m_requestVector.front().mType;
00495 retval.target=m_requestVector.front().target;
00496 retval.interfaces=m_requestVector.front().interfaces;
00497 retval.timestamp=m_requestVector.front().timestamp;
00498 retval.managedType=m_requestVector.front().managedType;
00499
00500
00501 if(m_supplier){
00502 retval.moduleName.setCMode(C_SUPPLIER);
00503 }else{
00504 retval.moduleName.setCMode(C_CONSUMER);
00505 }
00506 if(m_supplier){
00507 retval.locale.setCMode(C_SUPPLIER);
00508 }else{
00509 retval.locale.setCMode(C_CONSUMER);
00510 }
00511 if(m_supplier){
00512 retval.state.setCMode(C_SUPPLIER);
00513 }else{
00514 retval.state.setCMode(C_CONSUMER);
00515 }
00516 if(m_supplier){
00517 retval.interfaces.setCMode(C_SUPPLIER);
00518 }else{
00519 retval.interfaces.setCMode(C_CONSUMER);
00520 }
00521 if(m_supplier){
00522 retval.managedType.setCMode(C_SUPPLIER);
00523 }else{
00524 retval.managedType.setCMode(C_CONSUMER);
00525 }
00526 m_requestVector.pop();
00527 requests=retval;
00528 return retval;
00529 }else{
00530 if(m_infoVector.empty()){
00531 return info;
00532 }
00533 retval.moduleName=m_infoVector.front().moduleName;
00534 retval.source=m_infoVector.front().source;
00535 retval.locale=m_infoVector.front().locale;
00536 retval.state=m_infoVector.front().state;
00537 retval.mType=m_infoVector.front().mType;
00538 retval.target=m_infoVector.front().target;
00539 retval.interfaces=m_infoVector.front().interfaces;
00540 retval.timestamp=m_infoVector.front().timestamp;
00541 retval.managedType=m_infoVector.front().managedType;
00542
00543 if(m_supplier){
00544 retval.moduleName.setCMode(C_SUPPLIER);
00545 }else{
00546 retval.moduleName.setCMode(C_CONSUMER);
00547 }
00548 if(m_supplier){
00549 retval.locale.setCMode(C_SUPPLIER);
00550 }else{
00551 retval.locale.setCMode(C_CONSUMER);
00552 }
00553 if(m_supplier){
00554 retval.state.setCMode(C_SUPPLIER);
00555 }else{
00556 retval.state.setCMode(C_CONSUMER);
00557 }
00558 if(m_supplier){
00559 retval.interfaces.setCMode(C_SUPPLIER);
00560 }else{
00561 retval.interfaces.setCMode(C_CONSUMER);
00562 }
00563 if(m_supplier){
00564 retval.managedType.setCMode(C_SUPPLIER);
00565 }else{
00566 retval.managedType.setCMode(C_CONSUMER);
00567 }
00568 m_infoVector.pop();
00569 info=retval;
00570
00571
00572 requests=info;
00573
00574 return retval;
00575 }
00576 }else{
00577 myCOMObject->resetInfo();
00578 if(m_supplier){
00579 if(m_supplier){
00580 requests.moduleName.setCMode(C_SUPPLIER);
00581 }else{
00582 requests.moduleName.setCMode(C_CONSUMER);
00583 }
00584 if(m_supplier){
00585 requests.locale.setCMode(C_SUPPLIER);
00586 }else{
00587 requests.locale.setCMode(C_CONSUMER);
00588 }
00589 if(m_supplier){
00590 requests.state.setCMode(C_SUPPLIER);
00591 }else{
00592 requests.state.setCMode(C_CONSUMER);
00593 }
00594 if(m_supplier){
00595 requests.interfaces.setCMode(C_SUPPLIER);
00596 }else{
00597 requests.interfaces.setCMode(C_CONSUMER);
00598 }
00599 if(m_supplier){
00600 requests.managedType.setCMode(C_SUPPLIER);
00601 }else{
00602 requests.managedType.setCMode(C_CONSUMER);
00603 }
00604 return requests;
00605 }else{
00606 if(m_supplier){
00607 info.moduleName.setCMode(C_SUPPLIER);
00608 }else{
00609 info.moduleName.setCMode(C_CONSUMER);
00610 }
00611 if(m_supplier){
00612 info.locale.setCMode(C_SUPPLIER);
00613 }else{
00614 info.locale.setCMode(C_CONSUMER);
00615 }
00616 if(m_supplier){
00617 info.state.setCMode(C_SUPPLIER);
00618 }else{
00619 info.state.setCMode(C_CONSUMER);
00620 }
00621 if(m_supplier){
00622 info.interfaces.setCMode(C_SUPPLIER);
00623 }else{
00624 info.interfaces.setCMode(C_CONSUMER);
00625 }
00626 if(m_supplier){
00627 info.managedType.setCMode(C_SUPPLIER);
00628 }else{
00629 info.managedType.setCMode(C_CONSUMER);
00630 }
00631
00632
00633 requests=info;
00634 return info;
00635 }
00636 }
00637 }
00638
00639 void Heartbeat::setStreamName(const std::string & name){
00640
00641 if(m_supplier&&name=="Any"){
00642 return;
00643 }
00644
00645 m_streamname=name;
00646 m_haveStream=true;
00647
00648 if(m_supplier){
00649 m_globalname=name;
00650 }
00651
00652 if(myCOMObject){
00653 myCOMObject->setStreamName(name);
00654 }
00655
00656 if(m_initialize){
00657 if(m_supplier){
00658 activateSupplier();
00659 }else{
00660 activateConsumer();
00661 }
00662 }
00663
00664 m_streamRequest=STAT_OKAY;
00665 }
00666
00667 void Heartbeat::setGlobalName(const std::string & name){
00668
00669 if(name=="Any"){
00670 return;
00671 }
00672
00673 m_globalname=name;
00674 m_haveGlobalname=true;
00675 if(m_supplier){
00676 m_streamname=name;
00677 }
00678
00679 if(myCOMObject){
00680 myCOMObject->setGlobalName(name);
00681 }
00682 }
00683
00684 void Heartbeat::doPing(){
00685 if(myCOMObject){
00686 myCOMObject->doPing();
00687 }
00688 }
00689
00690 void Heartbeat::setAutoPing(const bool & val){
00691 m_doAutoPing=val;
00692 if(myCOMObject){
00693 myCOMObject->setAutoPing(val);
00694 }
00695 }
00696
00697