ThreadWeaver
Job.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "Job.h"
00030 #include "Job_p.h"
00031
00032 #include <QtCore/QSet>
00033 #include <QtCore/QList>
00034 #include <QtCore/QMutex>
00035 #include <QtCore/QObject>
00036 #include <QtCore/QMap>
00037 #include <QtCore/QArgument>
00038 #include <QtCore/QWaitCondition>
00039 #include <DebuggingAids.h>
00040 #include <Thread.h>
00041
00042 #include "QueuePolicy.h"
00043 #include "DependencyPolicy.h"
00044
00045 using namespace ThreadWeaver;
00046
00047 class ThreadWeaver::QueuePolicyList : public QList<QueuePolicy*> {};
00048
00049 class Job::Private
00050 {
00051 public:
00052 Private ()
00053 : thread (0)
00054 , queuePolicies ( new QueuePolicyList )
00055 , mutex (new QMutex (QMutex::NonRecursive) )
00056 , finished (false)
00057 {}
00058
00059 ~Private()
00060 {
00061 delete queuePolicies;
00062 delete mutex;
00063 }
00064
00065
00066 Thread * thread;
00067
00068
00069 QueuePolicyList* queuePolicies;
00070
00071 QMutex *mutex;
00072
00073 bool finished;
00074 };
00075
00076 Job::Job ( QObject *parent )
00077 : QObject (parent)
00078 , d(new Private())
00079 {
00080 }
00081
00082 Job::~Job()
00083 {
00084 for ( int index = 0; index < d->queuePolicies->size(); ++index )
00085 {
00086 d->queuePolicies->at( index )->destructed( this );
00087 }
00088
00089 delete d;
00090 }
00091
00092 ThreadWeaver::JobRunHelper::JobRunHelper()
00093 : QObject ( 0 )
00094 {
00095 }
00096
00097 void ThreadWeaver::JobRunHelper::runTheJob ( Thread* th, Job* job )
00098 {
00099 P_ASSERT ( th == thread() );
00100 job->d->mutex->lock();
00101 job->d->thread = th;
00102 job->d->mutex->unlock();
00103
00104 emit ( started ( job ) );
00105
00106 job->run();
00107
00108 job->d->mutex->lock();
00109 job->d->thread = 0;
00110 job->setFinished (true);
00111 job->d->mutex->unlock();
00112 job->freeQueuePolicyResources();
00113
00114 if ( ! job->success() )
00115 {
00116 emit ( failed( job ) );
00117 }
00118
00119 emit ( done( job ) );
00120 }
00121
00122 void Job::execute(Thread *th)
00123 {
00124
00125 JobRunHelper helper;
00126 connect ( &helper, SIGNAL ( started ( ThreadWeaver::Job* ) ),
00127 SIGNAL ( started ( ThreadWeaver::Job* ) ) );
00128 connect ( &helper, SIGNAL ( done ( ThreadWeaver::Job* ) ),
00129 SIGNAL ( done ( ThreadWeaver::Job* ) ) );
00130 connect ( &helper, SIGNAL( failed( ThreadWeaver::Job* ) ),
00131 SIGNAL( failed( ThreadWeaver::Job* ) ) );
00132
00133 debug(3, "Job::execute: executing job of type %s %s in thread %i.\n",
00134 metaObject()->className(), objectName().isEmpty() ? "" : qPrintable( objectName() ), th->id());
00135 helper.runTheJob( th, this );
00136 debug(3, "Job::execute: finished execution of job in thread %i.\n", th->id());
00137 }
00138
00139 int Job::priority () const
00140 {
00141 return 0;
00142 }
00143
00144 bool Job::success () const
00145 {
00146 return true;
00147 }
00148
00149 void Job::freeQueuePolicyResources()
00150 {
00151 for ( int index = 0; index < d->queuePolicies->size(); ++index )
00152 {
00153 d->queuePolicies->at( index )->free( this );
00154 }
00155 }
00156
00157 void Job::aboutToBeQueued ( WeaverInterface* )
00158 {
00159 }
00160
00161 void Job::aboutToBeDequeued ( WeaverInterface* )
00162 {
00163 }
00164
00165 bool Job::canBeExecuted()
00166 {
00167 QueuePolicyList acquired;
00168
00169 bool success = true;
00170
00171 if ( d->queuePolicies->size() > 0 )
00172 {
00173 debug( 4, "Job::canBeExecuted: acquiring permission from %i queue %s.\n",
00174 d->queuePolicies->size(), d->queuePolicies->size()==1 ? "policy" : "policies" );
00175 for ( int index = 0; index < d->queuePolicies->size(); ++index )
00176 {
00177 if ( d->queuePolicies->at( index )->canRun( this ) )
00178 {
00179 acquired.append( d->queuePolicies->at( index ) );
00180 } else {
00181 success = false;
00182 break;
00183 }
00184 }
00185
00186 debug( 4, "Job::canBeExecuted: queue policies returned %s.\n", success ? "true" : "false" );
00187
00188 if ( ! success )
00189 {
00190
00191 for ( int index = 0; index < acquired.size(); ++index )
00192 {
00193 acquired.at( index )->release( this );
00194 }
00195 }
00196 } else {
00197 debug( 4, "Job::canBeExecuted: no queue policies, this job can be executed.\n" );
00198 }
00199
00200 return success;
00201 }
00202
00203 void Job::assignQueuePolicy( QueuePolicy* policy )
00204 {
00205 if ( ! d->queuePolicies->contains( policy ) )
00206 {
00207 d->queuePolicies->append( policy );
00208 }
00209 }
00210
00211 void Job::removeQueuePolicy( QueuePolicy* policy )
00212 {
00213 int index = d->queuePolicies->indexOf( policy );
00214 if ( index != -1 )
00215 {
00216 d->queuePolicies->removeAt( index );
00217 }
00218 }
00219
00220 bool Job::isFinished() const
00221 {
00222 return d->finished;
00223 }
00224
00225 Thread* Job::thread()
00226 {
00227 return d->thread;
00228 }
00229
00230 void Job::setFinished ( bool status )
00231 {
00232 d->finished = status;
00233 }
00234
00235
00236
00237
00238
00239
00240 #include "Job.moc"
00241 #include "Job_p.moc"