00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "TaskManagerBundle.h"
00037 #include <core/ObjectFactory.h>
00038 #include <core/PersistenceManager.h>
00039 #include <core/BusManager.h>
00040 #include <core/FileSystem.h>
00041 #include <core/WoshKernel.h>
00042 #include <core/MethodsCommon.h>
00043 #include <core/DataSet.h>
00044 #include <core/BusRing.h>
00045 #include <core/SystemInfo.h>
00046
00047
00048 using namespace std;
00049 namespace wosh {
00050 namespace bundles {
00051
00052 WOSH_REGISTER(wosh::bundles::TaskManagerBundle, wosh::interfaces::automations::TaskManager, _TaskManager_VERSION, bundles_TaskManagerBundle1)
00053 WOSH_REGISTER(wosh::bundles::TaskManagerBundle, wosh::BundleGeneric, _TaskManager_VERSION, bundles_TaskManagerBundle2)
00054
00055
00056
00057
00058 TaskManagerBundle::TaskManagerBundle()
00059 : BundleGeneric(), scheduler(NULL) {
00060 BundleGeneric::setName( _TaskManager_NAME, false );
00061 Properties.update( _KEY_Version, _TaskManager_VERSION );
00062 Requirements.add<wosh::xtended::Scheduler>();
00063
00064 Object::getURI().fromString( _TaskManager_URI );
00065 Object::getURI().setKernelLocal();
00066 Object::getURI().registerGlobal();
00067
00068
00069
00070 Log(LOG_DEBUG, " Setting default properties and permissions.." );
00071 Properties.set( _TaskManager_KEY_DB, "$DATABASE/automations/", Permission::RW_RW_R__() );
00072
00073
00074
00075
00076
00077
00078
00079 Log(LOG_DEBUG, " Registering methods.." );
00080 MethodMessageResponse* mmExecTask = Methods.create<MethodMessageResponse>( _TaskManager_METHOD_exec, "exec Task", Permission::R_XR_XR__() );
00081 mmExecTask->setMethod( this, (MethodMessageResponsePtr)&TaskManagerBundle::mmDoExecTask );
00082
00083 MethodRequest* mmReloadTasks = Methods.create<MethodRequest>( _TaskManager_METHOD_reload, "Reload Tasks", Permission::R_XR__R__() );
00084 mmReloadTasks->setMethod( this, (MethodRequestPtr)&TaskManagerBundle::mmDoReloadTasks );
00085
00086 MethodMessageResponse* mmScheduleTask = Methods.create<MethodMessageResponse>( _TaskManager_METHOD_schedule, "schedule Task", Permission(Permission::RX) );
00087 mmScheduleTask->setMethod( this, (MethodMessageResponsePtr)&TaskManagerBundle::mmDoScheduleTask );
00088
00089 MethodDataSetFields* mmListTasks = Methods.create<MethodDataSetFields>( _METHOD_List, "List Tasks", Permission(Permission::RX) );
00090 mmListTasks->setRetriever( DataSetFieldsReader_X<Automation>::createFrom(&this->automations) );
00091
00092 MethodRetrieve* mmRetrieveTasks = Methods.create<MethodRetrieve>( _METHOD_Retrieve, "Retrieve Task(s)", Permission::R_XR_XR__() );
00093 mmRetrieveTasks->setRetriever( DataSetRetriever_X<Automation>::createFrom<int64>(&this->automations, &Automation::getID ) );
00094
00095 Log(LOG_DEBUG, " Setting up Scheduler.." );
00096 this->scheduler = ObjectFactory::createTypeOf<wosh::xtended::Scheduler>();
00097 if ( this->scheduler != NULL ) {
00098 this->scheduler->setListener(this);
00099 this->scheduler->setOwnerObject(this);
00100 }
00101
00102 setBundleState(Bundle::STATE_CREATED, false);
00103 }
00104
00105 TaskManagerBundle::~TaskManagerBundle() {
00106 Log(LOG_INFO, " Destroying.." );
00107 if ( isBundleRunning() ) {
00108 Log(LOG_WARNING, "~TaskManagerBundle() : Destroying while RUNNING! Trying to stop.." );
00109 bundleStop();
00110 }
00111 Log(LOG_DEBUG, ":~TaskManagerBundle() : Destroying Scheduler.." );
00112 ObjectFactory::destroy(this->scheduler, true);
00113
00114 this->automations.transactionBeginWrite();
00115 Log(LOG_DEBUG, ":~TaskManagerBundle() Freeing Automations [%d]..", this->automations.size() );
00116 ObjectFactory::destroyContainerSecond(this->automations);
00117 this->automations.clear();
00118 this->automations.transactionEnd();
00119
00120 Log(LOG_VERBOSE, ":~TaskManagerBundle() : Destroyed." );
00121 }
00122
00123
00124
00125
00126
00127 WRESULT TaskManagerBundle::bundleStart() {
00128 if ( !BundleGeneric::bundleValidate_StartStop(Bundle::STATE_STARTING) ) return WRET_ERR_WRONG_STATE;
00129 setBundleState( Bundle::STATE_STARTING );
00130
00131 if ( this->scheduler == NULL ) {
00132 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00133 "Internal Error",
00134 "bundleStart() FAILED Creating Scheduler.." );
00135 }
00136 WRESULT bus_ok = WRET_OK;
00137 if ( !WoshKernel::getInstance()->busses().existsBus(_Bus_Automation) ) {
00138 Log(LOG_INFO, "bundleStart() : Initializing BusRing ("_Bus_Automation").." );
00139 bus_ok = WRET_ERR_INTERNAL;
00140 Bus* automationBus = ObjectFactory::createTypeOf<wosh::BusRing>();
00141 if ( automationBus == NULL ) {
00142 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00143 "Unable to Create Bus",
00144 "bundleStart() FAILED#%d Creating BusRing instance!" );
00145 }
00146 else {
00147 automationBus->setName(_Bus_Automation);
00148 automationBus->setParentObject(this);
00149 automationBus->setSharedLocal(true);
00150 automationBus->setSharedNetwork(true);
00151 bus_ok = WoshKernel::getInstance()->busses().registerBus(automationBus);
00152 if ( WFAILED(bus_ok) ) {
00153 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00154 "Unable to Create Bus",
00155 "bundleStart() FAILED#%d Creating Bus ("_Bus_Automation")..", bus_ok );
00156 }
00157 else {
00158 Log(LOG_INFO, "bundleStart() : Created BusRing ("_Bus_Automation")." );
00159 }
00160 }
00161 }
00162 else {
00163 Log(LOG_VERBOSE, "bundleStart() : "_Bus_Automation" exists already." );
00164 }
00165 BusAutomation.setMessageHandler(this);
00166 bus_ok = BusAutomation.connect( _Bus_Automation, Bus::ACTION_DEFERRED );
00167 if ( WFAILED(bus_ok) ) {
00168 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00169 "Unable to Connect Bus",
00170 "bundleStart() FAILED#%d : Connecting Bus ("_Bus_Automation")..", bus_ok );
00171 }
00172 else
00173 Log(LOG_DEBUG, " Connected to "_Bus_Automation"." );
00174
00175 WRESULT ret = WRET_OK;
00176 ret += this->scheduler->setThreadPoolCount(3);
00177 ret += this->scheduler->startScheduler(30000);
00178
00179 loadAutomations();
00180
00181 if ( WSUCCEEDED(ret) )
00182 setBundleState( Bundle::STATE_STARTED );
00183 return ret;
00184 }
00185
00186 WRESULT TaskManagerBundle::bundleStop() {
00187 if ( !BundleGeneric::bundleValidate_StartStop(Bundle::STATE_STOPPING) ) return WRET_ERR_WRONG_STATE;
00188 setBundleState( Bundle::STATE_STOPPING );
00189
00190 saveAutomations();
00191
00192 WRESULT ret = this->scheduler->stopScheduler(30000);
00193
00194 if ( WSUCCEEDED(ret) )
00195 setBundleState( Bundle::STATE_STOPPED );
00196 return ret;
00197 }
00198
00199
00200
00201
00202 void TaskManagerBundle::busMessage( const Message& message, const Bus* source ) {
00203 if ( message.isEmpty() ) return;
00204 if ( !MessageFilter::filterReplica(message, &BusCore) ) return;
00205 BundleGeneric::busMessage(message, source);
00206 }
00207
00208
00209
00210
00211 bool TaskManagerBundle::updatingProperty( bool& do_update, const Variant& value_proposed, Property& property_current, const PropertiesProvider* source ) {
00212 return BundleGeneric::updatingProperty(do_update, value_proposed, property_current, source);
00213 }
00214
00215
00216
00217
00218
00219 WRESULT TaskManagerBundle::addAutomation( Automation* automation ) {
00220 this->automations.transactionBeginWrite();
00221 WRESULT ret = addAutomation_(automation);
00222 this->automations.transactionEnd();
00223 return ret;
00224 }
00225
00226 WRESULT TaskManagerBundle::addAutomation_( Automation* automation ) {
00227 if ( automation == NULL ) return WRET_ERR_PARAM;
00228 automation->setParentObject(this);
00229 automation->getURI().setKernelLocal();
00230 automation->getURI().setParent( this->getURI() );
00231 automation->getURI().setName( automation->getName() );
00232 automation->getURI().registerGlobal();
00233 if ( !automation->isInitialized() ) {
00234 Log(LOG_VERBOSE, ":addAutomation(%s) Forcing initialzation", automation->getName().c_str() );
00235 automation->init();
00236 }
00237 if ( this->automations.exists(automation->getID()) ) {
00238 Log(LOG_WARNING, ":addAutomation(%s) Duplicate ID found: %"PRId64" [Overwriting]", automation->getName().c_str(), automation->getID() );
00239 }
00240 else
00241 Log(LOG_INFO, ":addAutomation(%s) Added", automation->getName().c_str() );
00242 this->automations.set(automation->getID(), automation);
00243 return WRET_OK;
00244 }
00245
00246 void TaskManagerBundle::taskEvent( wosh::xtended::SchedulerTask* task, int64 interrupt, wosh::xtended::Scheduler* ) {
00247 if ( task == NULL ) return;
00248 if ( task->getID() == 0 ) {
00249 Log(LOG_WARNING, ":taskEvent() Task ID is 0. Removing Task" );
00250 return;
00251 }
00252 this->automations.transactionBeginRead();
00253 Automation* automation = this->automations.valueOf(task->getID());
00254 if ( automation == NULL ) {
00255 this->automations.transactionEnd();
00256 Log(LOG_CRITICAL, ":taskEvent(%"PRId64") : NO linked-Automation found. Removing Task", task->getID() );
00257 return;
00258 }
00259 automation->execute(NULL, NULL, interrupt);
00260 this->automations.transactionEnd();
00261 return;
00262 }
00263
00264
00265
00266
00267 WRESULT TaskManagerBundle::loadAutomations() {
00268 FilePath xmlFile;
00269 if ( !xmlFile.set( Properties.getValueOf(_TaskManager_KEY_DB).toString() ) ) {
00270 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00271 "Unable to load Automations",
00272 "loadAutomations() FAILED#1 Invalid Archive File [%s]", xmlFile.getPath().c_str() );
00273 return WRET_ERR_PARAM;
00274 }
00275 if ( !xmlFile.exists() ) {
00276 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00277 "Unable to load Automations",
00278 "loadAutomations() FAILED#1 Archive File doesn't exists %s [%s]!", xmlFile.getPath().c_str(), xmlFile.getPathLocal().c_str() );
00279 return WRET_ERR_PARAM;
00280 }
00281
00282 Log(LOG_VERBOSE, ":loadAutomations() File %s ..", xmlFile.getPathLocal().c_str() );
00283 std::vector<Automation*> objects;
00284 WRESULT ret = PersistenceManager::loadObjects( objects, "XML", xmlFile.getPathLocal() );
00285 if ( WFAILED(ret) ) {
00286 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00287 "Unable to load Automations",
00288 "loadAutomations() FAILED#%d loading Database %s..", ret, xmlFile.getPathLocal().c_str() );
00289 }
00290 if ( objects.empty() ) {
00291 Log(LOG_WARNING, ":loadAutomations() : No Items found in database.." );
00292 return WRET_OK;
00293 }
00294
00295 this->automations.transactionBeginWrite();
00296 std::vector<Automation*>::iterator it;
00297 for ( it=objects.begin(); it!=objects.end(); it++ ) {
00298 if ( *it == NULL ) continue;
00299 addAutomation_( *it );
00300 }
00301 this->automations.transactionEnd();
00302 return ret;
00303 }
00304
00305 WRESULT TaskManagerBundle::saveAutomations() {
00306 FilePath xmlFile;
00307 xmlFile.set("$DATABASE/automations/automations2.bak");
00308 Log(LOG_VERBOSE, ":saveAutomations() File %s ..", xmlFile.getPathLocal().c_str() );
00309 this->automations.transactionBeginRead();
00310 WRESULT ret = PersistenceManager::saveObjects( this->automations.getContainer(), "XML", xmlFile.getPathLocal() );
00311 this->automations.transactionEnd();
00312 if ( WFAILED(ret) ) {
00313 SystemInfo::raise(this, &Log, SystemInfo::TYPE_ERROR, SystemInfo::MODE_PERMANENT, SystemInfo::SCOPE_SYSTEM, SystemInfo::PRIORITY_FAILURE,
00314 "Unable to save Automations",
00315 "saveAutomations() FAILED#%d saving Database %s..", ret, xmlFile.getPathLocal().c_str() );
00316 }
00317 return ret;
00318 }
00319
00320
00321
00322
00323 Response* TaskManagerBundle::mmDoExecTask( const Message* messageRequest ) {
00324 if ( messageRequest == NULL || messageRequest->isEmpty() ) return NULL;
00325 const Request* request = messageRequest->getContent()->asRequest();
00326 if ( request == NULL ) return NULL;
00327 WRESULT ret = WRET_OK;
00328 Variant taskID;
00329 request->extractArguments(taskID);
00330 if ( taskID.isNumeric() ) {
00331 int64 id = taskID.toInt64(0);
00332 if ( id == 0 )
00333 return request->replyResponse(WRET_ERR_PARAM, "Invalid id (0) [int64]");
00334 Log(LOG_VERBOSE, ":mmDoExecTask(%"PRId64") (by ID)", id );
00335 this->automations.transactionBeginRead();
00336 Automation* automation = this->automations.valueOf(id);
00337 if ( automation == NULL ) {
00338 this->automations.transactionEnd();
00339 return request->replyResponse(WRET_ERR_PARAM, "Automation not found [int64]");
00340 }
00341 ret = automation->execute(NULL, NULL, 0);
00342 this->automations.transactionEnd();
00343 }
00344 else if ( taskID.isStringNotEmpty() ) {
00345 this->automations.transactionBeginRead();
00346 Log(LOG_VERBOSE, ":mmDoExecTask(%s) (by Name)", taskID.asString().c_str() );
00347 tInt64AutomationMap::ConstIterator it;
00348 const tInt64AutomationMap::ConstIterator it_end = this->automations.end();
00349 for ( it=this->automations.begin(); it!=it_end; ++it ) {
00350 if ( it->second == NULL ) continue;
00351 if ( it->second->getName() != taskID.asString() ) continue;
00352 ret = it->second->execute(NULL, NULL, 0);
00353 break;
00354 }
00355 this->automations.transactionEnd();
00356 }
00357 else
00358 return request->replyResponse(WRET_ERR_PARAM, "Invalid parameters: id[int64] | name[str]");
00359 return request->replyResponse(ret);
00360 }
00361
00362 Response* TaskManagerBundle::mmDoScheduleTask( const Message* messageRequest ) {
00363 if ( messageRequest == NULL || messageRequest->isEmpty() ) return NULL;
00364 const Request* request = messageRequest->getContent()->asRequest();
00365 if ( request == NULL ) return NULL;
00366 Variant taskID, nextInterrupt;
00367 request->extractArguments(taskID, nextInterrupt);
00368 if ( !taskID.isInt64() || (!nextInterrupt.isDateTime() && !nextInterrupt.isInt64()) )
00369 return request->replyResponse(WRET_ERR_PARAM, "Invalid Param [int64, int64]");
00370 Log(LOG_VERBOSE, ":mmScheduleTask(%"PRId64",%"PRId64")", taskID.asInt64(), nextInterrupt.asInt64() );
00371 WRESULT ret = this->scheduler->setupTask(taskID.asInt64(), nextInterrupt.asInt64());
00372 return request->replyResponse(ret);
00373 }
00374
00375
00376
00377 Response* TaskManagerBundle::mmDoReloadTasks( const Request* request ) {
00378 if ( request == NULL ) return NULL;
00379 std::string option = request->getArgument().toString();
00380 Log(LOG_VERBOSE, ":mmDoReloadBObj(%s)", option.c_str() );
00381 this->automations.free();
00382 WRESULT ret = loadAutomations();
00383 return request->replyResponse(ret);
00384 }
00385
00386
00387
00388 };
00389 };