ThreadWeaver
JobCollection.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "JobCollection.h"
00030 #include "JobCollection_p.h"
00031
00032 #include "WeaverInterface.h"
00033 #include "DebuggingAids.h"
00034
00035 #include <QtCore/QList>
00036 #include <QtCore/QObject>
00037 #include <QtCore/QPointer>
00038
00039 #include "DependencyPolicy.h"
00040
00041 using namespace ThreadWeaver;
00042
00043 JobCollectionJobRunner::JobCollectionJobRunner ( JobCollection* collection, Job* payload, QObject* parent )
00044 : Job( parent )
00045 , m_payload( payload )
00046 , m_collection( collection )
00047 {
00048 Q_ASSERT ( payload );
00049
00050 if ( ! m_payload->objectName().isEmpty() )
00051 {
00052 setObjectName( tr( "JobRunner executing " ) + m_payload->objectName() );
00053 } else {
00054 setObjectName( tr( "JobRunner (unnamed payload)" ) );
00055 }
00056 }
00057
00058 bool JobCollectionJobRunner::canBeExecuted()
00059 {
00060 return m_payload->canBeExecuted();
00061 }
00062
00063 Job* JobCollectionJobRunner::payload ()
00064 {
00065 return m_payload;
00066 }
00067
00068 void JobCollectionJobRunner::aboutToBeQueued ( WeaverInterface *weaver )
00069 {
00070 m_payload->aboutToBeQueued( weaver );
00071 }
00072
00073 void JobCollectionJobRunner::aboutToBeDequeued ( WeaverInterface *weaver )
00074 {
00075 m_payload->aboutToBeDequeued( weaver );
00076 }
00077
00078 void JobCollectionJobRunner::execute ( Thread *t )
00079 {
00080 if ( m_payload )
00081 {
00082 m_payload->execute ( t );
00083 m_collection->internalJobDone ( m_payload);
00084 } else {
00085 debug ( 1, "JobCollection: job in collection has been deleted." );
00086 }
00087 Job::execute ( t );
00088 }
00089
00090 int JobCollectionJobRunner::priority () const
00091 {
00092 return m_payload->priority();
00093 }
00094
00095 void JobCollectionJobRunner::run ()
00096 {
00097 }
00098
00099 class JobList : public QList <JobCollectionJobRunner*> {};
00100
00101 class JobCollection::Private
00102 {
00103 public:
00104
00105 Private()
00106 : elements ( new JobList() )
00107 , weaver ( 0 )
00108 , jobCounter (0)
00109 {}
00110
00111 ~Private()
00112 {
00113 delete elements;
00114 }
00115
00116
00117 JobList* elements;
00118
00119
00120 WeaverInterface *weaver;
00121
00122
00123
00124
00125
00126 int jobCounter;
00127
00128 QMutex mutex;
00129 };
00130
00131 JobCollection::JobCollection ( QObject *parent )
00132 : Job ( parent )
00133 , d (new Private)
00134 {
00135 }
00136
00137 JobCollection::~JobCollection()
00138 {
00139 if ( d->weaver != 0 )
00140 dequeueElements();
00141
00142 delete d;
00143 }
00144
00145 void JobCollection::addJob ( Job *job )
00146 {
00147 REQUIRE( d->weaver == 0 );
00148 REQUIRE( job != 0);
00149
00150 JobCollectionJobRunner* runner = new JobCollectionJobRunner( this, job, this );
00151 d->elements->append ( runner );
00152 connect( runner , SIGNAL(done(ThreadWeaver::Job*)) , this , SLOT(jobRunnerDone()) );
00153 }
00154
00155 void JobCollection::stop( Job *job )
00156 {
00157
00158 Q_UNUSED( job );
00159 if ( d->weaver != 0 )
00160 {
00161 debug( 4, "JobCollection::stop: dequeueing %p.\n", (void*)this);
00162 d->weaver->dequeue( this );
00163 }
00164
00165 }
00166
00167 void JobCollection::aboutToBeQueued ( WeaverInterface *weaver )
00168 {
00169 REQUIRE ( d->weaver == 0 );
00170
00171 d->weaver = weaver;
00172
00173 if ( d->elements->size() > 0 )
00174 {
00175 d->elements->at( 0 )->aboutToBeQueued( weaver );
00176 }
00177
00178 ENSURE(d->weaver != 0);
00179 }
00180
00181 void JobCollection::aboutToBeDequeued( WeaverInterface* weaver )
00182 {
00183
00184
00185
00186 if ( d->weaver )
00187 {
00188 dequeueElements();
00189
00190 d->elements->at( 0 )->aboutToBeDequeued( weaver );
00191 }
00192
00193 d->weaver = 0;
00194 ENSURE ( d->weaver == 0 );
00195 }
00196
00197 void JobCollection::execute ( Thread *t )
00198 {
00199 REQUIRE ( d->weaver != 0);
00200
00201
00202
00203 emit (started (this));
00204
00205 if ( d->elements->isEmpty() )
00206 {
00207 Job::execute( t );
00208 return;
00209 }
00210
00211 {
00212
00213
00214 QMutexLocker l ( & d->mutex );
00215 d->jobCounter = d->elements->size();
00216
00217
00218 for (int index = 1; index < d->elements->size(); ++index)
00219 {
00220 d->weaver->enqueue (d->elements->at(index));
00221 }
00222 }
00223
00224
00225
00226
00227
00228 d->elements->at( 0 )->execute ( t );
00229
00230
00231
00232
00233
00234 }
00235
00236 Job* JobCollection::jobAt( int i )
00237 {
00238 QMutexLocker l( &d->mutex );
00239 REQUIRE ( i >= 0 && i < d->elements->size() );
00240 return d->elements->at( i )->payload();
00241 }
00242
00243 const int JobCollection::jobListLength()
00244 {
00245 QMutexLocker l( &d->mutex );
00246 return d->elements->size();
00247 }
00248
00249 bool JobCollection::canBeExecuted()
00250 {
00251 bool inheritedCanRun = true;
00252
00253 QMutexLocker l( &d->mutex );
00254
00255 if ( d->elements->size() > 0 )
00256 {
00257 inheritedCanRun = d->elements->at( 0 )->canBeExecuted();
00258 }
00259
00260 return Job::canBeExecuted() && inheritedCanRun;
00261 }
00262
00263 void JobCollection::jobRunnerDone()
00264 {
00265
00266
00267
00268 bool emitDone = false;
00269
00270 {
00271 QMutexLocker l(&d->mutex);
00272
00273 if ( d->jobCounter == 0 )
00274 {
00275
00276 d->weaver = 0;
00277 return;
00278 }
00279
00280 --d->jobCounter;
00281
00282 ENSURE (d->jobCounter >= 0);
00283
00284 if (d->jobCounter == 0)
00285 {
00286 if (! success())
00287 {
00288 emit failed(this);
00289 }
00290
00291 finalCleanup();
00292 emitDone = true;
00293 }
00294 }
00295
00296 if (emitDone)
00297 emit done(this);
00298 }
00299 void JobCollection::internalJobDone ( Job* job )
00300 {
00301 REQUIRE( job != 0 );
00302 Q_UNUSED (job);
00303 }
00304
00305 void JobCollection::finalCleanup()
00306 {
00307 freeQueuePolicyResources();
00308 setFinished(true);
00309 d->weaver = 0;
00310 }
00311
00312 void JobCollection::dequeueElements()
00313 {
00314
00315
00316
00317
00318 bool emitDone = false;
00319
00320 {
00321
00322 QMutexLocker l( &d->mutex );
00323
00324 if ( d->weaver != 0 )
00325 {
00326 for ( int index = 1; index < d->elements->size(); ++index )
00327 {
00328 if ( d->elements->at( index ) && ! d->elements->at( index )->isFinished() )
00329 {
00330 debug( 4, "JobCollection::dequeueElements: dequeueing %p.\n",
00331 (void*)d->elements->at( index ) );
00332 d->weaver->dequeue ( d->elements->at( index ) );
00333 } else {
00334 debug( 4, "JobCollection::dequeueElements: not dequeueing %p, already finished.\n",
00335 (void*)d->elements->at( index ) );
00336 }
00337 }
00338
00339 if (d->jobCounter != 0)
00340 {
00341
00342
00343 finalCleanup();
00344 }
00345 d->jobCounter = 0;
00346 }
00347 }
00348 if (emitDone)
00349 emit done(this);
00350 }
00351
00352 #include "JobCollection.moc"
00353 #include "JobCollection_p.moc"