00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "WeaverImpl.h"
00032
00033 #include <QtCore/QObject>
00034 #include <QtCore/QMutex>
00035 #include <QtCore/QDebug>
00036
00037 #include "Job.h"
00038 #include "State.h"
00039 #include "Thread.h"
00040 #include "ThreadWeaver.h"
00041 #include "DebuggingAids.h"
00042 #include "WeaverObserver.h"
00043 #include "SuspendedState.h"
00044 #include "SuspendingState.h"
00045 #include "DestructedState.h"
00046 #include "WorkingHardState.h"
00047 #include "ShuttingDownState.h"
00048 #include "InConstructionState.h"
00049
00050 using namespace ThreadWeaver;
00051
00052 WeaverImpl::WeaverImpl( QObject* parent )
00053 : WeaverInterface(parent)
00054 , m_active(0)
00055 , m_inventoryMax( 4 )
00056 , m_mutex ( new QMutex( QMutex::Recursive ) )
00057 , m_finishMutex( new QMutex )
00058 , m_jobAvailableMutex ( new QMutex )
00059 , m_state (0)
00060 {
00061
00062 m_states[InConstruction] = new InConstructionState( this );
00063 setState ( InConstruction );
00064 m_states[WorkingHard] = new WorkingHardState( this );
00065 m_states[Suspending] = new SuspendingState( this );
00066 m_states[Suspended] = new SuspendedState( this );
00067 m_states[ShuttingDown] = new ShuttingDownState( this );
00068 m_states[Destructed] = new DestructedState( this );
00069
00070
00071 connect ( this, SIGNAL ( asyncThreadSuspended( ThreadWeaver::Thread* ) ),
00072 SIGNAL ( threadSuspended( ThreadWeaver::Thread* ) ),
00073 Qt::QueuedConnection );
00074 setState( WorkingHard );
00075 }
00076
00077 WeaverImpl::~WeaverImpl()
00078 {
00079
00080 REQUIRE( QThread::currentThread() == thread() );
00081 debug ( 3, "WeaverImpl dtor: destroying inventory.\n" );
00082 setState ( ShuttingDown );
00083
00084 m_jobAvailable.wakeAll();
00085
00086
00087
00088
00089
00090
00091
00092
00093 while (!m_inventory.isEmpty())
00094 {
00095 Thread* th=m_inventory.takeFirst();
00096 if ( !th->isFinished() )
00097 {
00098 for ( ;; )
00099 {
00100 m_jobAvailable.wakeAll();
00101 if ( th->wait( 100 ) ) break;
00102 debug ( 1, "WeaverImpl::~WeaverImpl: thread %i did not exit as expected, "
00103 "retrying.\n", th->id() );
00104 }
00105 }
00106 emit ( threadExited ( th ) );
00107 delete th;
00108 }
00109
00110 m_inventory.clear();
00111 delete m_mutex;
00112 delete m_finishMutex;
00113 delete m_jobAvailableMutex;
00114 debug ( 3, "WeaverImpl dtor: done\n" );
00115 setState ( Destructed );
00116
00117
00118 }
00119
00120 void WeaverImpl::setState ( StateId id )
00121 {
00122 if ( m_state==0 || m_state->stateId() != id )
00123 {
00124 m_state = m_states[id];
00125 debug ( 2, "WeaverImpl::setState: state changed to \"%s\".\n",
00126 m_state->stateName().toAscii().constData() );
00127 if ( id == Suspended )
00128 {
00129 emit ( suspended() );
00130 }
00131
00132 m_state->activated();
00133
00134 emit ( stateChanged ( m_state ) );
00135 }
00136 }
00137
00138 const State& WeaverImpl::state() const
00139 {
00140 return *m_state;
00141 }
00142
00143 void WeaverImpl::setMaximumNumberOfThreads( int cap )
00144 {
00145 Q_ASSERT_X ( cap > 0, "Weaver Impl", "Thread inventory size has to be larger than zero." );
00146 QMutexLocker l (m_mutex);
00147 m_inventoryMax = cap;
00148 }
00149
00150 int WeaverImpl::maximumNumberOfThreads() const
00151 {
00152 QMutexLocker l (m_mutex);
00153 return m_inventoryMax;
00154 }
00155
00156 int WeaverImpl::currentNumberOfThreads () const
00157 {
00158 QMutexLocker l (m_mutex);
00159 return m_inventory.count ();
00160 }
00161
00162 void WeaverImpl::registerObserver ( WeaverObserver *ext )
00163 {
00164 connect ( this, SIGNAL ( stateChanged ( ThreadWeaver::State* ) ),
00165 ext, SIGNAL ( weaverStateChanged ( ThreadWeaver::State* ) ) );
00166 connect ( this, SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ),
00167 ext, SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ) );
00168 connect ( this, SIGNAL ( threadBusy( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ),
00169 ext, SIGNAL ( threadBusy ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ) );
00170 connect ( this, SIGNAL ( threadSuspended ( ThreadWeaver::Thread* ) ),
00171 ext, SIGNAL ( threadSuspended ( ThreadWeaver::Thread* ) ) );
00172 connect ( this, SIGNAL ( threadExited ( ThreadWeaver::Thread* ) ) ,
00173 ext, SIGNAL ( threadExited ( ThreadWeaver::Thread* ) ) );
00174 }
00175
00176 void WeaverImpl::enqueue(Job* job)
00177 {
00178 adjustInventory ( 1 );
00179 if (job)
00180 {
00181 debug ( 3, "WeaverImpl::enqueue: queueing job %p of type %s.\n",
00182 (void*)job, job->metaObject()->className() );
00183 QMutexLocker l (m_mutex);
00184 job->aboutToBeQueued ( this );
00185
00186
00187
00188
00189 int i = m_assignments.size();
00190 if (i > 0)
00191 {
00192 while ( i > 0 && m_assignments.at(i - 1)->priority() < job->priority() ) --i;
00193 m_assignments.insert( i, (job) );
00194 } else {
00195 m_assignments.append (job);
00196 }
00197 assignJobs();
00198 }
00199 }
00200
00201 void WeaverImpl::adjustInventory ( int numberOfNewJobs )
00202 {
00203 QMutexLocker l (m_mutex);
00204
00205
00206 const int reserve = m_inventoryMax - m_inventory.count();
00207
00208 if ( reserve > 0 )
00209 {
00210 for ( int i = 0; i < qMin ( reserve, numberOfNewJobs ); ++i )
00211 {
00212 Thread *th = createThread();
00213 th->moveToThread( th );
00214 m_inventory.append(th);
00215 connect ( th, SIGNAL ( jobStarted ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ),
00216 SIGNAL ( threadBusy ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ) );
00217 connect ( th, SIGNAL ( jobDone( ThreadWeaver::Job* ) ),
00218 SIGNAL ( jobDone( ThreadWeaver::Job* ) ) );
00219 connect ( th, SIGNAL ( started ( ThreadWeaver::Thread* ) ),
00220 SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ) );
00221
00222 th->start ();
00223 debug ( 2, "WeaverImpl::adjustInventory: thread created, "
00224 "%i threads in inventory.\n", currentNumberOfThreads() );
00225 }
00226 }
00227 }
00228
00229 Thread* WeaverImpl::createThread()
00230 {
00231 return new Thread( this );
00232 }
00233
00234 bool WeaverImpl::dequeue ( Job* job )
00235 {
00236 bool result;
00237 {
00238 QMutexLocker l (m_mutex);
00239
00240 int i = m_assignments.indexOf ( job );
00241 if ( i != -1 )
00242 {
00243 job->aboutToBeDequeued( this );
00244
00245 m_assignments.removeAt( i );
00246 result = true;
00247 debug( 3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n",
00248 (void*)job, m_assignments.size() );
00249 } else {
00250 debug( 3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void*)job );
00251 result = false;
00252 }
00253 }
00254
00255
00256
00257 m_jobFinished.wakeOne();
00258 return result;
00259 }
00260
00261 void WeaverImpl::dequeue ()
00262 {
00263 debug( 3, "WeaverImpl::dequeue: dequeueing all jobs.\n" );
00264 QMutexLocker l (m_mutex);
00265 for ( int index = 0; index < m_assignments.size(); ++index )
00266 {
00267 m_assignments.at( index )->aboutToBeDequeued( this );
00268 }
00269 m_assignments.clear();
00270
00271 ENSURE ( m_assignments.isEmpty() );
00272 }
00273
00274 void WeaverImpl::suspend ()
00275 {
00276 m_state->suspend();
00277 }
00278
00279 void WeaverImpl::resume ( )
00280 {
00281 m_state->resume();
00282 }
00283
00284 void WeaverImpl::assignJobs()
00285 {
00286 m_jobAvailable.wakeAll();
00287 }
00288
00289 bool WeaverImpl::isEmpty() const
00290 {
00291 QMutexLocker l (m_mutex);
00292 return m_assignments.isEmpty();
00293 }
00294
00295
00296 void WeaverImpl::incActiveThreadCount()
00297 {
00298 adjustActiveThreadCount ( 1 );
00299 }
00300
00301 void WeaverImpl::decActiveThreadCount()
00302 {
00303 adjustActiveThreadCount ( -1 );
00304
00305
00306 m_jobFinished.wakeAll();
00307 }
00308
00309 void WeaverImpl::adjustActiveThreadCount( int diff )
00310 {
00311 QMutexLocker l (m_mutex);
00312 m_active += diff;
00313 debug ( 4, "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
00314 " in queue).\n", m_active, queueLength() );
00315
00316 if ( m_assignments.isEmpty() && m_active == 0)
00317 {
00318 P_ASSERT ( diff < 0 );
00319 emit ( finished() );
00320 }
00321 }
00322
00323 int WeaverImpl::activeThreadCount()
00324 {
00325 QMutexLocker l (m_mutex);
00326 return m_active;
00327 }
00328
00329 Job* WeaverImpl::takeFirstAvailableJob()
00330 {
00331 QMutexLocker l (m_mutex);
00332 Job *next = 0;
00333 for (int index = 0; index < m_assignments.size(); ++index)
00334 {
00335 if ( m_assignments.at(index)->canBeExecuted() )
00336 {
00337 next = m_assignments.at(index);
00338 m_assignments.removeAt (index);
00339 break;
00340 }
00341 }
00342 return next;
00343 }
00344
00345 Job* WeaverImpl::applyForWork(Thread *th, Job* previous)
00346 {
00347 if (previous)
00348 {
00349 decActiveThreadCount();
00350 }
00351 return m_state->applyForWork ( th, 0 );
00352 }
00353
00354 void WeaverImpl::waitForAvailableJob(Thread* th)
00355 {
00356 m_state->waitForAvailableJob ( th );
00357 }
00358
00359 void WeaverImpl::blockThreadUntilJobsAreBeingAssigned ( Thread *th )
00360 {
00361 Q_UNUSED ( th );
00362 debug ( 4, "WeaverImpl::blockThread...: thread %i blocked.\n", th->id());
00363 emit asyncThreadSuspended ( th );
00364 QMutexLocker l( m_jobAvailableMutex );
00365 m_jobAvailable.wait( m_jobAvailableMutex );
00366 debug ( 4, "WeaverImpl::blockThread...: thread %i resumed.\n", th->id());
00367 }
00368
00369 int WeaverImpl::queueLength() const
00370 {
00371 QMutexLocker l (m_mutex);
00372 return m_assignments.count();
00373 }
00374
00375 bool WeaverImpl::isIdle () const
00376 {
00377 QMutexLocker l (m_mutex);
00378 return isEmpty() && m_active == 0;
00379 }
00380
00381 void WeaverImpl::finish()
00382 {
00383 #ifdef QT_NO_DEBUG
00384 const int MaxWaitMilliSeconds = 200;
00385 #else
00386 const int MaxWaitMilliSeconds = 2000;
00387 #endif
00388
00389 while ( !isIdle() )
00390 {
00391 debug (2, "WeaverImpl::finish: not done, waiting.\n" );
00392 QMutexLocker l( m_finishMutex );
00393 if ( m_jobFinished.wait( m_finishMutex, MaxWaitMilliSeconds ) == false )
00394 {
00395 debug ( 2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n",
00396 queueLength() );
00397 m_jobAvailable.wakeAll();
00398 }
00399 }
00400 debug (2, "WeaverImpl::finish: done.\n\n\n" );
00401 }
00402
00403 void WeaverImpl::requestAbort()
00404 {
00405 QMutexLocker l (m_mutex);
00406 for ( int i = 0; i<m_inventory.size(); ++i )
00407 {
00408 m_inventory[i]->requestAbort();
00409 }
00410 }
00411
00412 void WeaverImpl::dumpJobs()
00413 {
00414 QMutexLocker l (m_mutex);
00415 debug( 0, "WeaverImpl::dumpJobs: current jobs:\n" );
00416 for ( int index = 0; index < m_assignments.size(); ++index )
00417 {
00418 debug( 0, "--> %4i: %p %s (priority %i)\n", index, (void*)m_assignments.at( index ),
00419 m_assignments.at( index )->metaObject()->className(),
00420 m_assignments.at(index)->priority() );
00421 }
00422 }
00423
00424 #include "WeaverImpl.moc"