CommunicationManagerImpl.cpp

Go to the documentation of this file.
00001 /** @file    CommunicationManagerImpl.cpp
00002  *  @author  Alessandro Polo
00003  *  @version $Id: CommunicationManagerImpl.cpp 3775 2011-01-01 16:38:17Z alex $
00004  *  @brief
00005  * File containing methods for the CommunicationManagerImpl class.
00006  * The header for this class can be found in CommunicationManagerImpl.h, check that file
00007  * for class description.
00008  ****************************************************************************/
00009 /* Copyright (c) 2007-2011, WOSH - Wide Open Smart Home 
00010  * by Alessandro Polo - OpenSmartHome.com
00011  * All rights reserved.
00012  *
00013  * Redistribution and use in source and binary forms, with or without
00014  * modification, are permitted provided that the following conditions are met:
00015  *     * Redistributions of source code must retain the above copyright
00016  *       notice, this list of conditions and the following disclaimer.
00017  *     * Redistributions in binary form must reproduce the above copyright
00018  *       notice, this list of conditions and the following disclaimer in the
00019  *       documentation and/or other materials provided with the distribution.
00020  *     * Neither the name of the OpenSmartHome.com WOSH nor the
00021  *       names of its contributors may be used to endorse or promote products
00022  *       derived from this software without specific prior written permission.
00023  *
00024  * THIS SOFTWARE IS PROVIDED BY Alessandro Polo ''AS IS'' AND ANY
00025  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00026  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00027  * DISCLAIMED. IN NO EVENT SHALL Alessandro Polo BE LIABLE FOR ANY
00028  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00029  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00030  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00031  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00032  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00033  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00034  ****************************************************************************/
00035 
00036  #include "CommunicationManagerImpl.h"
00037  #include <core/UserManager.h>
00038  #include <core/SecurityManager.h>
00039  #include <core/Response.h>
00040  #include <core/Variant.h>
00041  #include <stdlib.h>
00042  #include <time.h>
00043  #include <algorithm>
00044 
00045 
00046 using namespace std;
00047 using namespace wosh;
00048 
00049 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00050 //////////////////////////////////////////////////////////////////////////////////////////////// CONSTRUCTORS
00051 
00052 CommunicationManagerImpl::CommunicationManagerImpl( BundleGeneric& bundle ) : BundleGenericWorker(bundle)
00053  {
00054     this->listener = NULL;
00055 
00056     this->flowTimeout = 0;
00057     this->flowRetryTime = 20;
00058     this->flowRetryCount = 10;
00059     
00060     this->messageArchiveSize = -1;
00061  }
00062 
00063 
00064 CommunicationManagerImpl::~CommunicationManagerImpl() {
00065     if ( isThreadRunning() ) {
00066         quitThread(10000);
00067      }
00068 
00069     if ( this->comms.count() != 0 ) {
00070         Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Communicators.." );
00071         this->comms.clear();
00072      }
00073     if ( this->transl.count() != 0 ) {
00074         Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Interpreters.." );
00075         this->transl.clear();
00076      }
00077     if ( this->messages.count() != 0 ) {
00078         Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Messages.." );
00079         this->messages.clear();
00080      }
00081     if ( this->messagesArchived.count() != 0 ) {
00082         Log(LOG_VERBOSE, ":~CommunicationManagerImpl() : Freeing Archived Messages.." );
00083         this->messagesArchived.clear();
00084      }
00085 
00086  }
00087 
00088 //////////////////////////////////////////////////////////////////////////////////////////////// CONSTRUCTORS
00089 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00090 ////////////////////////////////////////////////////////////////////////////////////////////////// RUN THREAD
00091 
00092 void CommunicationManagerImpl::runThread()
00093  {
00094     Log(LOG_VERBOSE, ":runThread(): STARTING" );
00095 
00096     bool cProcessed = false;
00097     while( this->running )
00098      {
00099         ThreadImpl::sleepForMSec(100);
00100         setThreadAlive();
00101 
00102         // walk the notification queue
00103         this->messages.transactionBeginWrite();
00104         tLongNotificationFlowMap::Iterator it;
00105         tLongNotificationFlowMap::ConstIterator it_end = this->messages.end();
00106         for ( it=this->messages.begin(); it!=it_end; ++it ) {
00107             if ( it->second == NULL ) {
00108                 this->messages.erase(it);
00109                 continue;
00110              }
00111             cProcessed = processNotification_( it->second );
00112             if ( !cProcessed ) break; // reset the iterator
00113          }
00114         this->messages.transactionEnd();
00115      }
00116 
00117     Log(LOG_VERBOSE, ":runThread(): EXITING" );
00118  }
00119 
00120 ////////////////////////////////////////////////////////////////////////////////////////////////// RUN THREAD
00121 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00122 ////////////////////////////////////////////////////////////////////////////////////////////////// PROCESSING
00123 
00124 bool CommunicationManagerImpl::processNotification_( NotificationFlow* notification )
00125  {
00126     if ( notification == NULL ) return false;
00127     if ( !notification->hasOriginal() ) {
00128         notification->raiseError();
00129         Log(LOG_CRITICAL, ":processNotification_(%ld) : Notification MISSING [%ld errors] [ARCHIVING]", notification->getID(), notification->getErrorsCount() );
00130         archiveNotification_( notification );
00131         return false;
00132      }
00133 
00134     if ( !notification->isProcessing() ) {
00135         // start the processing for first time!
00136         Log(LOG_VERBOSE, ":processNotification_(%ld) : Starting Processing! [of %ld]", notification->getID(), notification->getOriginal()->getID() );
00137         // flag it as under-processing and try to send (process) it
00138         notification->setProcessing(true);
00139         sendNotification_(notification);
00140         return true;
00141      }
00142 
00143     // we are/were already processing the flow!
00144     // cases are:
00145     //  - we picked a communicator/interpreter and waiting for response.
00146     //  - we are waiting for READ flag 
00147     //  - something went wrong!?
00148 
00149     // check if we processed the flow too many times
00150     if ( this->flowRetryCount != 0 ) {
00151         if ( notification->getProcessingCount() >= this->flowRetryCount ) {
00152             Log(LOG_WARNING, ":processNotification_(%ld) : ARCHIVING due to retry-max [%ld] (sent=%ld)", notification->getID(), notification->getProcessingCount(), notification->getSentCount() );
00153             notification->raiseError();
00154             archiveNotification_( notification );
00155             return false;
00156          }
00157      }
00158     // check if the flow is timed out
00159     if ( this->flowTimeout != 0 ) {
00160         long timedout = (DateTime::std_time_ms() - notification->getOriginal()->getTimeStamp()) / 1000;
00161         if ( timedout > this->flowTimeout ) {
00162             Log(LOG_WARNING, ":processNotification_(%ld) : ARCHIVING due to timeout [%lds] (sent=%ld)", notification->getID(), timedout, notification->getSentCount() );
00163             notification->raiseError();
00164             ///@todo reply to sender that it is (čartially) archived
00165             archiveNotification_( notification );
00166             return false;
00167          }
00168      }
00169 
00170     // check if we 'just' (a while ago) processed the notification
00171     if ( this->flowRetryTime != 0 ) {
00172         long timespan = (DateTime::std_time_ms() - notification->getProcessingTs()) / 1000;
00173         if ( timespan <= this->flowRetryTime ) {
00174             Log(LOG_VERBOSE, ":processNotification_(%ld) : WAITING due to reply-span [%lds] (sent=%ld)", notification->getID(), timespan, notification->getSentCount() );
00175             return true;
00176          }
00177      }
00178     // retry again !?
00179 
00180     notification->setProcessing(true);
00181 //  notification->processing();
00182     sendNotification_(notification);
00183     return true;
00184  }
00185 
00186 ////////////////////////////////////////////////////////////////////////////////////////////////// PROCESSING
00187 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00188 ///////////////////////////////////////////////////////////////////////////////////////////////////// SENDING
00189 
00190 bool CommunicationManagerImpl::sendNotification_( NotificationFlow* notification )
00191  {
00192     if ( notification == NULL ) return false;
00193 
00194     Notification* selectedNotification = notification->getFinal();
00195     if ( selectedNotification == NULL ) return false;
00196 
00197     Log(LOG_VERBOSE, ":sendNotification_(%ld) : Sending.. [%ld]", notification->getID(), notification->getSentCount() );
00198 
00199     // FLOW:
00200     // - get user's communicator and merge with options (forced comm, preferred comm)
00201     // - filter results with local database and: communicator vs. notification check
00202     // - select communicator with best ranking
00203     // - use an interpreter when required
00204 
00205     UserInfoCommmunicators* usrComms = getCommunicatorsOf( selectedNotification->getRecipent_User() );
00206 
00207     // does the notification suggest a communicator? 
00208     string preferredCommURI = selectedNotification->getPreferredCommunicator().toString();
00209     if ( !preferredCommURI.empty() ) {
00210         // force URI intepreter
00211         URI fcu(preferredCommURI);
00212         preferredCommURI = fcu.toString();
00213         // update (increase) ranking of the communicator  (won't update timestamp)
00214         usrComms->updateRankingOf(preferredCommURI, 0.2, true);
00215      }
00216 
00217     // does the user force a communicator
00218     string forceCommURI = UserManager::getUserProperty(selectedNotification->getRecipent_User(), "CommunicatorForce").toString();
00219     if ( !forceCommURI.empty() ) {
00220         // force URI intepreter
00221         URI fcu(forceCommURI);
00222         forceCommURI = fcu.toString();
00223         // update (increase) ranking of the communicator  (won't update timestamp)
00224         usrComms->updateRankingOf(forceCommURI, 1000, true);
00225      }
00226 
00227     usrComms->filterTimedOut(0);
00228     if ( usrComms->getCommmunicators().empty() ) {
00229         Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED NO ONLINE communicators for user %s", notification->getID(), selectedNotification->getRecipent_User().c_str() );
00230 notification->raiseError();
00231         return false;
00232      }
00233 
00234     // does the user WANT a custom protocol? (example: 'WOSH' may talk only using PROTOCOL_SHELL!)
00235     std::string forceProto = UserManager::getUserProperty(selectedNotification->getRecipent_User(), "ProtocolForce").toString();
00236     std::string location = UserManager::getUserLocation( selectedNotification->getRecipent_User() );
00237 
00238     intersectCommunicators_(usrComms, selectedNotification, location );
00239     if ( usrComms->getCommmunicators().empty() ) {
00240         Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED No FILTERED communicators for user %s", notification->getID(), selectedNotification->getRecipent_User().c_str() );
00241         delete usrComms; usrComms = NULL;
00242 notification->raiseError();
00243         return false;
00244      }
00245 
00246     Communicator* communicator = NULL;
00247     string communicatorURI = "";
00248     UserInfoCommmunicatorData communicatorData;
00249     usrComms->getCommmunicatorRank(communicatorURI, communicatorData);
00250 
00251     this->comms.transactionBeginRead();
00252     communicator = this->comms.find( communicatorURI );
00253     if ( communicator == NULL ) {
00254         this->comms.transactionEnd();
00255         UserManager::setUserCommunicator( selectedNotification->getRecipent_User(), communicatorURI, 0, -0.15 );
00256         Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED accessing communicator %s for user %s ", notification->getID(), communicatorURI.c_str(), selectedNotification->getRecipent_User().c_str() );
00257         delete usrComms; usrComms = NULL;
00258 notification->raiseError();
00259         return false;
00260      }
00261 
00262     Log(LOG_VERBOSE, ":sendNotification_(%ld) : Selected %s (out of %d) for user %s ", notification->getID(), communicatorURI.c_str(), usrComms->getCommmunicators().size(), selectedNotification->getRecipent_User().c_str() );
00263     delete usrComms; usrComms = NULL;
00264 
00265     string interpeterURI = "";
00266 
00267     // interpeter needed? (rule: minimize use)
00268     bool iRequired = isInterpreterRequired_( selectedNotification, communicator );
00269     if ( iRequired ) {
00270         this->transl.transactionBeginRead();
00271         Interpreter* inter = selectInterpreter_( selectedNotification, communicator );
00272         if ( inter == NULL ) {
00273             Log(LOG_WARNING, ":sendNotification_(%ld) : FAILED selecting Interpreter! [for %s]", notification->getID(), communicatorURI.c_str() );
00274          }
00275         else {
00276             interpeterURI = inter->getURI().toString();
00277             Log(LOG_VERBOSE, ":sendNotification_(%ld) : Selected Interpreter %s [for %s]", notification->getID(), interpeterURI.c_str(), communicatorURI.c_str() );
00278          }
00279         this->transl.transactionEnd();
00280      }
00281 
00282     this->comms.transactionEnd();
00283 
00284     Message* message = new Message();
00285     message->setSource(this->getOwner()->getURI());
00286     message->setDestinationBus( _Bus_Core );
00287     SecurityManager::getCredentialImplicit().signMessage(message, notification->getOriginal()->getSender_User());
00288 
00289     if ( !interpeterURI.empty() && communicator != NULL ) {
00290         Log(LOG_INFO, ":sendNotification_(%ld) : Flow [%s=>%s] selected", notification->getID(), interpeterURI.c_str(), communicatorURI.c_str() );
00291 
00292         NotificationConversion* conv = new NotificationConversion();
00293         conv->setOriginal( selectedNotification->clone() );
00294         conv->setProtocol(communicator->getProtocol());
00295         //conv->setNotificationType(communicator->getChannelTypes());
00296         Request* request = new Request(_InterpreterService_METHOD_convert, conv );
00297         message->setContent(request);
00298         message->setDestination( interpeterURI );
00299      }
00300     else if ( !communicatorURI.empty() ) {
00301         Log(LOG_INFO, ":sendNotification_(%ld) : Communicator %s selected, sending[%ld]..", notification->getID(), communicatorURI.c_str(), notification->getSentCount() );
00302 
00303         notification->sending();
00304         notification->setSenderCommunicator(communicatorURI);
00305         message->setContent( selectedNotification->clone() );
00306         message->setDestination( communicatorURI );
00307         BusCore.postMessage(message);
00308         return true;
00309      }
00310     else {
00311         Log(LOG_CRITICAL, ":sendNotification_(%ld) : Internal ERROR!", notification->getID() );
00312         delete message;
00313 notification->raiseError();
00314         return false;
00315      }
00316     return false;
00317  }
00318 
00319 ///////////////////////////////////////////////////////////////////////////////////////////////////// SENDING
00320 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00321 //////////////////////////////////////////////////////////////////////////////////////////////////// UPDATING
00322 
00323 void CommunicationManagerImpl::evalMessage( const Message& message )
00324  {
00325     if ( message.isEmpty() ) return;
00326 
00327     if ( message.getContent()->isNotification() ) {
00328         const Notification* notification = message.getContent()->asNotification();
00329         this->messages.transactionBeginWrite();
00330         evalNotification_(notification, &message);
00331         this->messages.transactionEnd();
00332         return;
00333      }
00334     else if ( message.getContent()->isResponse() ) {
00335         const Response* response = message.getContent()->asResponse();
00336         if ( !response->hasData() ) return;
00337 
00338         if ( response->getMethod() == _Communicator_RESPONSE_notif_sent ) {
00339             // seems a communicator replied to our sending, means it was SENT
00340             long not_id = 0;
00341             if ( response->getData()->isKindOf<Variant>() ) {
00342                 not_id = dynamic_cast<const Variant*>(response->getData())->toLong(0);
00343              }
00344             if ( not_id == 0 ) {
00345                 Log(LOG_WARNING, ":evalResponse() : Unsupported/Missing value in send_message response [by %s]", message.getSource().toString().c_str() );
00346              }
00347             else
00348                 communicator_reply_sent(response->getReturnValue(), not_id, message.getSource() );
00349             return;
00350          }
00351         else if ( response->getMethod() == _Communicator_RESPONSE_notif_read ) {
00352             // seems a communicator replied to our sending, means it was READ
00353             long not_id = 0;
00354             if ( response->getData()->isKindOf<Variant>() ) {
00355                 not_id = dynamic_cast<const Variant*>(response->getData())->toLong(0);
00356              }
00357             if ( not_id == 0 ) {
00358                 Log(LOG_WARNING, ":evalResponse() : Unsupported/Missing value in send_message response [by %s]", message.getSource().toString().c_str() );
00359              }
00360             else
00361                 communicator_reply_read(response->getReturnValue(), not_id, message.getSource() );
00362             return;
00363          }
00364      }
00365  }
00366 
00367 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00368 
00369 NotificationFlow* CommunicationManagerImpl::evalNotification_( const Notification* notification, const Message* message )
00370  {
00371     if ( notification == NULL || message == NULL ) return NULL;
00372 
00373     NotificationFlow* notificationEx = NULL;
00374     if ( notification->getFlowID() == 0 ) {
00375         // notification is NEW (at least for myself)! => create a new flow and set as original (associating uid)
00376         notificationEx = new NotificationFlow( notification->clone() );
00377         notificationEx->setSourceCommunicator(message->getSource());
00378 
00379         Log(LOG_VERBOSE, ":evalNotification_(%ld) : New FLOW#%ld from %s to %s [%s]",
00380                         notification->getID(), notificationEx->getID(), notification->getSender_User().c_str(), notification->getRecipent_User().c_str(),
00381                         notification->getNotificationTypeAsString() );
00382 
00383         this->messages.set( notificationEx->getID(), notificationEx );
00384         UserManager::setUserCommunicator( notification->getSender_User(), message->getSource().toString(), DateTime::std_time_ms(), 0.01 );
00385      }
00386     else {
00387         // so, maybe we are updating a known Notification
00388         notificationEx = this->messages.find(notification->getFlowID());
00389         if ( notificationEx == NULL ) {
00390             Log(LOG_WARNING, ":evalNotification_(%ld) : FAILED Flow#%ld not found!", notification->getID(), notification->getFlowID() );
00391             notificationEx = this->messagesArchived.find(notification->getFlowID());
00392             if ( notificationEx == NULL ) {
00393                 Log(LOG_WARNING, ":evalNotification_(%ld) : FAILED Flow#%ld not found [even in Archived Cache]!", notification->getID(), notification->getFlowID() );
00394                 return NULL;
00395              }
00396          }
00397 
00398         Log(LOG_VERBOSE, ":evalNotification_(%ld) : Updating Flow#%ld!", notification->getID(), notification->getFlowID() );
00399         notificationEx->addInterpreted(notification->clone());
00400         notificationEx->setProcessing(false);
00401      }
00402 
00403     return notificationEx;
00404  }
00405 
00406 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00407 
00408 void CommunicationManagerImpl::communicator_reply_read( WRESULT ret, long notification_id, const URI& source )
00409  {
00410     this->messages.transactionBeginWrite();
00411     tLongNotificationFlowMap::Iterator it;
00412     tLongNotificationFlowMap::ConstIterator it_end = this->messages.end();
00413     for ( it=this->messages.begin(); it!=it_end; ++it ) {
00414         if ( it->second == NULL || !it->second->hasOriginal() ) continue;
00415         if ( it->second->getOriginal()->getID() != notification_id ) continue;
00416         if ( WSUCCEEDED(ret) ) {
00417             it->second->getOriginal()->setRead(true);
00418             UserManager::setUserCommunicator( it->second->getOriginal()->getRecipent_User(), source.toString(), DateTime::std_time_ms(), 0.10 );
00419             Log(LOG_INFO, ":communicator_reply_read(%ld) : Flow#%ld Archived", notification_id, it->second->getID() );
00420             archiveNotification_( it->second );
00421          }
00422         else {
00423             it->second->getOriginal()->setRead(false);
00424             Log(LOG_WARNING, ":communicator_reply_read(%ld) : ERROR#%d by %s", notification_id, ret, source.toString().c_str() );
00425             it->second->raiseError();
00426          }
00427         break;
00428      }
00429     this->messages.transactionEnd();
00430  }
00431 
00432 void CommunicationManagerImpl::communicator_reply_sent( WRESULT ret, long notification_id, const URI& source )
00433  {
00434     this->messages.transactionBeginWrite();
00435     tLongNotificationFlowMap::Iterator it;
00436     tLongNotificationFlowMap::ConstIterator it_end = this->messages.end();
00437     for ( it=this->messages.begin(); it!=it_end; ++it ) {
00438         if ( it->second == NULL || !it->second->hasOriginal() ) continue;
00439         if ( it->second->getOriginal()->getID() !=  notification_id ) continue;
00440         if ( WSUCCEEDED(ret) ) {
00441             it->second->getOriginal()->setSent(true);
00442             it->second->setSenderCommunicator(source);
00443             Log(LOG_VERBOSE, ":communicator_reply_sent() : Notification#%ld of Flow#%ld has been Sent!", notification_id, it->second->getID() );
00444 //          Log(LOG_INFO, ":communicator_reply_sent(%ld) : Flow#%ld Archived", notification_id, it->second->getID() );
00445 //          it->second->setArchived(true);
00446 //archiveNotification_( it->second );
00447          }
00448         else {
00449             Log(LOG_WARNING, ":communicator_reply_sent(%ld) : ERROR#%d by %s", notification_id, ret, source.toString().c_str() );
00450             it->second->raiseError();
00451             UserManager::setUserCommunicator( it->second->getOriginal()->getRecipent_User(), source.toString(), DateTime::std_time_ms(), -0.2 );
00452          }
00453         break;
00454      }
00455     this->messages.transactionEnd();
00456  }
00457 
00458 //////////////////////////////////////////////////////////////////////////////////////////////////// UPDATING
00459 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00460 
00461 void CommunicationManagerImpl::archiveNotification_( NotificationFlow* notification )
00462  {
00463     if ( notification == NULL ) return;
00464     Log(LOG_INFO, ":archiveNotification_(%ld) : Archiving Notification [%ld errors]", notification->getID(), notification->getErrorsCount() );
00465 
00466     notification->setProcessing(false);
00467     notification->setArchived(true);
00468     this->messages.erase( notification->getID(), false );
00469     this->messagesArchived.set( notification->getID(), notification );
00470  }
00471 
00472 //////////////////////////////////////////////////////////////////////////////////////////////////////////////
00473 //////////////////////////////////////////////////////////////////////////////////////////////////////////////

Generated on Tue Feb 8 2011 09:32:48 for WOSH system 0.8.888 [wolf] by Alessandro Polo, using DoxyGen 1.7.2 hosted by WOSH Framework