00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "CommunicationManagerImpl.h"
00037 #include <core/UserManager.h>
00038 #include <core/SecurityManager.h>
00039 #include <core/Response.h>
00040 #include <core/Variant.h>
00041 #include <stdlib.h>
00042 #include <time.h>
00043 #include <algorithm>
00044
00045
00046 using namespace std;
00047 using namespace wosh;
00048
00049
00050
00051
00052 CommunicationManagerImpl::CommunicationManagerImpl( BundleGeneric& bundle ) : BundleGenericWorker(bundle)
00053 {
00054 this->listener = NULL;
00055
00056 this->flowTimeout = 0;
00057 this->flowRetryTime = 20;
00058 this->flowRetryCount = 10;
00059
00060 this->messageArchiveSize = -1;
00061 }
00062
00063
00064 CommunicationManagerImpl::~CommunicationManagerImpl() {
00065 if ( isThreadRunning() ) {
00066 quitThread(10000);
00067 }
00068
00069 if ( this->comms.count() != 0 ) {
00070 Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Communicators.." );
00071 this->comms.clear();
00072 }
00073 if ( this->transl.count() != 0 ) {
00074 Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Interpreters.." );
00075 this->transl.clear();
00076 }
00077 if ( this->messages.count() != 0 ) {
00078 Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Messages.." );
00079 this->messages.clear();
00080 }
00081 if ( this->messagesArchived.count() != 0 ) {
00082 Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Archived Messages.." );
00083 this->messagesArchived.clear();
00084 }
00085
00086 }
00087
00088
00089
00090
00091
00092 void CommunicationManagerImpl::runThread()
00093 {
00094 Log(LOG_VERBOSE, ":runThread(): STARTING" );
00095
00096 bool cProcessed = false;
00097 while( this->running )
00098 {
00099 ThreadImpl::sleepForMSec(100);
00100 setThreadAlive();
00101
00102
00103 this->messages.transactionBeginWrite();
00104 tLongNotificationFlowMap::Iterator it;
00105 tLongNotificationFlowMap::ConstIterator it_end = this->messages.end();
00106 for ( it=this->messages.begin(); it!=it_end; ++it ) {
00107 if ( it->second == NULL ) {
00108 this->messages.erase(it);
00109 continue;
00110 }
00111 cProcessed = processNotification_( it->second );
00112 if ( !cProcessed ) break;
00113 }
00114 this->messages.transactionEnd();
00115 }
00116
00117 Log(LOG_VERBOSE, ":runThread(): EXITING" );
00118 }
00119
00120
00121
00122
00123
00124 bool CommunicationManagerImpl::processNotification_( NotificationFlow* notification )
00125 {
00126 if ( notification == NULL ) return false;
00127 if ( !notification->hasOriginal() ) {
00128 notification->raiseError();
00129 Log(LOG_CRITICAL, ":processNotification_(%ld) : Notification MISSING [%ld errors] [ARCHIVING]", notification->getID(), notification->getErrorsCount() );
00130 archiveNotification_( notification );
00131 return false;
00132 }
00133
00134 if ( !notification->isProcessing() ) {
00135
00136 Log(LOG_VERBOSE, ":processNotification_(%ld) : Starting Processing! [of %ld]", notification->getID(), notification->getOriginal()->getID() );
00137
00138 notification->setProcessing(true);
00139 sendNotification_(notification);
00140 return true;
00141 }
00142
00143
00144
00145
00146
00147
00148
00149
00150 if ( this->flowRetryCount != 0 ) {
00151 if ( notification->getProcessingCount() >= this->flowRetryCount ) {
00152 Log(LOG_WARNING, ":processNotification_(%ld) : ARCHIVING due to retry-max [%ld] (sent=%ld)", notification->getID(), notification->getProcessingCount(), notification->getSentCount() );
00153 notification->raiseError();
00154 archiveNotification_( notification );
00155 return false;
00156 }
00157 }
00158
00159 if ( this->flowTimeout != 0 ) {
00160 long timedout = (DateTime::std_time_ms() - notification->getOriginal()->getTimeStamp()) / 1000;
00161 if ( timedout > this->flowTimeout ) {
00162 Log(LOG_WARNING, ":processNotification_(%ld) : ARCHIVING due to timeout [%lds] (sent=%ld)", notification->getID(), timedout, notification->getSentCount() );
00163 notification->raiseError();
00164
00165 archiveNotification_( notification );
00166 return false;
00167 }
00168 }
00169
00170
00171 if ( this->flowRetryTime != 0 ) {
00172 long timespan = (DateTime::std_time_ms() - notification->getProcessingTs()) / 1000;
00173 if ( timespan <= this->flowRetryTime ) {
00174 Log(LOG_VERBOSE, ":processNotification_(%ld) : WAITING due to reply-span [%lds] (sent=%ld)", notification->getID(), timespan, notification->getSentCount() );
00175 return true;
00176 }
00177 }
00178
00179
00180 notification->setProcessing(true);
00181
00182 sendNotification_(notification);
00183 return true;
00184 }
00185
00186
00187
00188
00189
00190 bool CommunicationManagerImpl::sendNotification_( NotificationFlow* notification )
00191 {
00192 if ( notification == NULL ) return false;
00193
00194 Notification* selectedNotification = notification->getFinal();
00195 if ( selectedNotification == NULL ) return false;
00196
00197 Log(LOG_VERBOSE, ":sendNotification_(%ld) : Sending.. [%ld]", notification->getID(), notification->getSentCount() );
00198
00199
00200
00201
00202
00203
00204
00205 UserInfoCommmunicators* usrComms = getCommunicatorsOf( selectedNotification->getRecipent_User() );
00206
00207
00208 string preferredCommURI = selectedNotification->getPreferredCommunicator().toString();
00209 if ( !preferredCommURI.empty() ) {
00210
00211 URI fcu(preferredCommURI);
00212 preferredCommURI = fcu.toString();
00213
00214 usrComms->updateRankingOf(preferredCommURI, 0.2, true);
00215 }
00216
00217
00218 string forceCommURI = UserManager::getUserProperty(selectedNotification->getRecipent_User(), "CommunicatorForce").toString();
00219 if ( !forceCommURI.empty() ) {
00220
00221 URI fcu(forceCommURI);
00222 forceCommURI = fcu.toString();
00223
00224 usrComms->updateRankingOf(forceCommURI, 1000, true);
00225 }
00226
00227 usrComms->filterTimedOut(0);
00228 if ( usrComms->getCommmunicators().empty() ) {
00229 Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED NO ONLINE communicators for user %s", notification->getID(), selectedNotification->getRecipent_User().c_str() );
00230 notification->raiseError();
00231 return false;
00232 }
00233
00234
00235 std::string forceProto = UserManager::getUserProperty(selectedNotification->getRecipent_User(), "ProtocolForce").toString();
00236 std::string location = UserManager::getUserLocation( selectedNotification->getRecipent_User() );
00237
00238 intersectCommunicators_(usrComms, selectedNotification, location );
00239 if ( usrComms->getCommmunicators().empty() ) {
00240 Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED No FILTERED communicators for user %s", notification->getID(), selectedNotification->getRecipent_User().c_str() );
00241 delete usrComms; usrComms = NULL;
00242 notification->raiseError();
00243 return false;
00244 }
00245
00246 Communicator* communicator = NULL;
00247 string communicatorURI = "";
00248 UserInfoCommmunicatorData communicatorData;
00249 usrComms->getCommmunicatorRank(communicatorURI, communicatorData);
00250
00251 this->comms.transactionBeginRead();
00252 communicator = this->comms.find( communicatorURI );
00253 if ( communicator == NULL ) {
00254 this->comms.transactionEnd();
00255 UserManager::setUserCommunicator( selectedNotification->getRecipent_User(), communicatorURI, 0, -0.15 );
00256 Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED accessing communicator %s for user %s ", notification->getID(), communicatorURI.c_str(), selectedNotification->getRecipent_User().c_str() );
00257 delete usrComms; usrComms = NULL;
00258 notification->raiseError();
00259 return false;
00260 }
00261
00262 Log(LOG_VERBOSE, ":sendNotification_(%ld) : Selected %s (out of %d) for user %s ", notification->getID(), communicatorURI.c_str(), usrComms->getCommmunicators().size(), selectedNotification->getRecipent_User().c_str() );
00263 delete usrComms; usrComms = NULL;
00264
00265 string interpeterURI = "";
00266
00267
00268 bool iRequired = isInterpreterRequired_( selectedNotification, communicator );
00269 if ( iRequired ) {
00270 this->transl.transactionBeginRead();
00271 Interpreter* inter = selectInterpreter_( selectedNotification, communicator );
00272 if ( inter == NULL ) {
00273 Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED selecting Interpreter! [for %s]", notification->getID(), communicatorURI.c_str() );
00274 }
00275 else {
00276 interpeterURI = inter->getURI().toString();
00277 Log(LOG_VERBOSE, ":sendNotification_(%ld) : Selected Interpreter %s [for %s]", notification->getID(), interpeterURI.c_str(), communicatorURI.c_str() );
00278 }
00279 this->transl.transactionEnd();
00280 }
00281
00282 this->comms.transactionEnd();
00283
00284 Message* message = new Message();
00285 message->setSource(this->getOwner()->getURI());
00286 message->setDestinationBus( _Bus_Core );
00287 SecurityManager::getCredentialImplicit().signMessage(message, notification->getOriginal()->getSender_User());
00288
00289 if ( !interpeterURI.empty() && communicator != NULL ) {
00290 Log(LOG_INFO, ":sendNotification_(%ld) : Flow [%s=>%s] selected", notification->getID(), interpeterURI.c_str(), communicatorURI.c_str() );
00291
00292 NotificationConversion* conv = new NotificationConversion();
00293 conv->setOriginal( selectedNotification->clone() );
00294 conv->setProtocol(communicator->getProtocol());
00295
00296 Request* request = new Request(_InterpreterService_METHOD_convert, conv );
00297 message->setContent(request);
00298 message->setDestination( interpeterURI );
00299 }
00300 else if ( !communicatorURI.empty() ) {
00301 Log(LOG_INFO, ":sendNotification_(%ld) : Communicator %s selected, sending[%ld]..", notification->getID(), communicatorURI.c_str(), notification->getSentCount() );
00302
00303 notification->sending();
00304 notification->setSenderCommunicator(communicatorURI);
00305 message->setContent( selectedNotification->clone() );
00306 message->setDestination( communicatorURI );
00307 BusCore.postMessage(message);
00308 return true;
00309 }
00310 else {
00311 Log(LOG_CRITICAL, ":sendNotification_(%ld) : Internal ERROR!", notification->getID() );
00312 delete message;
00313 notification->raiseError();
00314 return false;
00315 }
00316 return false;
00317 }
00318
00319
00320
00321
00322
00323 void CommunicationManagerImpl::evalMessage( const Message& message )
00324 {
00325 if ( message.isEmpty() ) return;
00326
00327 if ( message.getContent()->isNotification() ) {
00328 const Notification* notification = message.getContent()->asNotification();
00329 this->messages.transactionBeginWrite();
00330 evalNotification_(notification, &message);
00331 this->messages.transactionEnd();
00332 return;
00333 }
00334 else if ( message.getContent()->isResponse() ) {
00335 const Response* response = message.getContent()->asResponse();
00336 if ( !response->hasData() ) return;
00337
00338 if ( response->getMethod() == _Communicator_RESPONSE_notif_sent ) {
00339
00340 long not_id = 0;
00341 if ( response->getData()->isKindOf<Variant>() ) {
00342 not_id = dynamic_cast<const Variant*>(response->getData())->toLong(0);
00343 }
00344 if ( not_id == 0 ) {
00345 Log(LOG_WARNING, ":evalResponse() : Unsupported/Missing value in send_message response [by %s]", message.getSource().toString().c_str() );
00346 }
00347 else
00348 communicator_reply_sent(response->getReturnValue(), not_id, message.getSource() );
00349 return;
00350 }
00351 else if ( response->getMethod() == _Communicator_RESPONSE_notif_read ) {
00352
00353 long not_id = 0;
00354 if ( response->getData()->isKindOf<Variant>() ) {
00355 not_id = dynamic_cast<const Variant*>(response->getData())->toLong(0);
00356 }
00357 if ( not_id == 0 ) {
00358 Log(LOG_WARNING, ":evalResponse() : Unsupported/Missing value in send_message response [by %s]", message.getSource().toString().c_str() );
00359 }
00360 else
00361 communicator_reply_read(response->getReturnValue(), not_id, message.getSource() );
00362 return;
00363 }
00364 }
00365 }
00366
00367
00368
00369 NotificationFlow* CommunicationManagerImpl::evalNotification_( const Notification* notification, const Message* message )
00370 {
00371 if ( notification == NULL || message == NULL ) return NULL;
00372
00373 NotificationFlow* notificationEx = NULL;
00374 if ( notification->getFlowID() == 0 ) {
00375
00376 notificationEx = new NotificationFlow( notification->clone() );
00377 notificationEx->setSourceCommunicator(message->getSource());
00378
00379 Log(LOG_VERBOSE, ":evalNotification_(%ld) : New FLOW#%ld from %s to %s [%s]",
00380 notification->getID(), notificationEx->getID(), notification->getSender_User().c_str(), notification->getRecipent_User().c_str(),
00381 notification->getNotificationTypeAsString() );
00382
00383 this->messages.set( notificationEx->getID(), notificationEx );
00384 UserManager::setUserCommunicator( notification->getSender_User(), message->getSource().toString(), DateTime::std_time_ms(), 0.01 );
00385 }
00386 else {
00387
00388 notificationEx = this->messages.find(notification->getFlowID());
00389 if ( notificationEx == NULL ) {
00390 Log(LOG_WARNING, ":evalNotification_(%ld) : FAILED Flow#%ld not found!", notification->getID(), notification->getFlowID() );
00391 notificationEx = this->messagesArchived.find(notification->getFlowID());
00392 if ( notificationEx == NULL ) {
00393 Log(LOG_WARNING, ":evalNotification_(%ld) : FAILED Flow#%ld not found [even in Archived Cache]!", notification->getID(), notification->getFlowID() );
00394 return NULL;
00395 }
00396 }
00397
00398 Log(LOG_VERBOSE, ":evalNotification_(%ld) : Updating Flow#%ld!", notification->getID(), notification->getFlowID() );
00399 notificationEx->addInterpreted(notification->clone());
00400 notificationEx->setProcessing(false);
00401 }
00402
00403 return notificationEx;
00404 }
00405
00406
00407
00408 void CommunicationManagerImpl::communicator_reply_read( WRESULT ret, long notification_id, const URI& source )
00409 {
00410 this->messages.transactionBeginWrite();
00411 tLongNotificationFlowMap::Iterator it;
00412 tLongNotificationFlowMap::ConstIterator it_end = this->messages.end();
00413 for ( it=this->messages.begin(); it!=it_end; ++it ) {
00414 if ( it->second == NULL || !it->second->hasOriginal() ) continue;
00415 if ( it->second->getOriginal()->getID() != notification_id ) continue;
00416 if ( WSUCCEEDED(ret) ) {
00417 it->second->getOriginal()->setRead(true);
00418 UserManager::setUserCommunicator( it->second->getOriginal()->getRecipent_User(), source.toString(), DateTime::std_time_ms(), 0.10 );
00419 Log(LOG_INFO, ":communicator_reply_read(%ld) : Flow#%ld Archived", notification_id, it->second->getID() );
00420 archiveNotification_( it->second );
00421 }
00422 else {
00423 it->second->getOriginal()->setRead(false);
00424 Log(LOG_WARNING, ":communicator_reply_read(%ld) : ERROR#%d by %s", notification_id, ret, source.toString().c_str() );
00425 it->second->raiseError();
00426 }
00427 break;
00428 }
00429 this->messages.transactionEnd();
00430 }
00431
00432 void CommunicationManagerImpl::communicator_reply_sent( WRESULT ret, long notification_id, const URI& source )
00433 {
00434 this->messages.transactionBeginWrite();
00435 tLongNotificationFlowMap::Iterator it;
00436 tLongNotificationFlowMap::ConstIterator it_end = this->messages.end();
00437 for ( it=this->messages.begin(); it!=it_end; ++it ) {
00438 if ( it->second == NULL || !it->second->hasOriginal() ) continue;
00439 if ( it->second->getOriginal()->getID() != notification_id ) continue;
00440 if ( WSUCCEEDED(ret) ) {
00441 it->second->getOriginal()->setSent(true);
00442 it->second->setSenderCommunicator(source);
00443 Log(LOG_VERBOSE, ":communicator_reply_sent() : Notification#%ld of Flow#%ld has been Sent!", notification_id, it->second->getID() );
00444
00445
00446
00447 }
00448 else {
00449 Log(LOG_WARNING, ":communicator_reply_sent(%ld) : ERROR#%d by %s", notification_id, ret, source.toString().c_str() );
00450 it->second->raiseError();
00451 UserManager::setUserCommunicator( it->second->getOriginal()->getRecipent_User(), source.toString(), DateTime::std_time_ms(), -0.2 );
00452 }
00453 break;
00454 }
00455 this->messages.transactionEnd();
00456 }
00457
00458
00459
00460
00461 void CommunicationManagerImpl::archiveNotification_( NotificationFlow* notification )
00462 {
00463 if ( notification == NULL ) return;
00464 Log(LOG_INFO, ":archiveNotification_(%ld) : Archiving Notification [%ld errors]", notification->getID(), notification->getErrorsCount() );
00465
00466 notification->setProcessing(false);
00467 notification->setArchived(true);
00468 this->messages.erase( notification->getID(), false );
00469 this->messagesArchived.set( notification->getID(), notification );
00470 }
00471
00472
00473