OpenMAXBellagio  0.9.3
omx_base_source.c
Go to the documentation of this file.
1 
29 #include <omxcore.h>
30 #include <omx_base_source.h>
31 
33 
35  omx_base_source_PrivateType* omx_base_source_Private;
36 
37  if (openmaxStandComp->pComponentPrivate) {
38  omx_base_source_Private = (omx_base_source_PrivateType*)openmaxStandComp->pComponentPrivate;
39  } else {
40  omx_base_source_Private = calloc(1,sizeof(omx_base_source_PrivateType));
41  if (!omx_base_source_Private) {
43  }
44  }
45 
46  // we could create our own port structures here
47  // fixme maybe the base class could use a "port factory" function pointer?
48  err = omx_base_component_Constructor(openmaxStandComp, cComponentName);
49 
50  /* here we can override whatever defaults the base_component constructor set
51  * e.g. we can override the function pointers in the private struct */
52  omx_base_source_Private = openmaxStandComp->pComponentPrivate;
53  omx_base_source_Private->BufferMgmtFunction = omx_base_source_BufferMgmtFunction;
54 
55  return err;
56 }
57 
59 
60  return omx_base_component_Destructor(openmaxStandComp);
61 }
62 
69 
70  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
71  omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
72  omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
73  omx_base_PortType *pOutPort = (omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
74  tsem_t* pOutputSem = pOutPort->pBufferSem;
75  queue_t* pOutputQueue = pOutPort->pBufferQueue;
76  OMX_BUFFERHEADERTYPE* pOutputBuffer = NULL;
77  OMX_COMPONENTTYPE* target_component;
78  OMX_BOOL isOutputBufferNeeded = OMX_TRUE;
79  int outBufExchanged = 0;
80 
81  omx_base_source_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid);
82  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_source_Private->bellagioThreads->nThreadBufferMngtID);
83 
84  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
85  while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting ||
86  omx_base_component_Private->state == OMX_StatePause || omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
87 
88  /*Wait till the ports are being flushed*/
89  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
90  while( PORT_IS_BEING_FLUSHED(pOutPort)) {
91  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
92 
93  if(isOutputBufferNeeded == OMX_FALSE) {
94  pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
95  outBufExchanged--;
96  pOutputBuffer = NULL;
97  isOutputBufferNeeded = OMX_TRUE;
98  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output buffer\n");
99  }
100  DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
101 
102  tsem_up(omx_base_source_Private->flush_all_condition);
103  tsem_down(omx_base_source_Private->flush_condition);
104  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
105  }
106  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
107 
108  /*No buffer to process. So wait here*/
109  if((isOutputBufferNeeded==OMX_TRUE && pOutputSem->semval==0) &&
110  (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
111  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer \n");
112  tsem_down(omx_base_source_Private->bMgmtSem);
113  }
114 
115  if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
116  DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
117  break;
118  }
119 
120  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer semval=%d \n",pOutputSem->semval);
121  if(pOutputSem->semval > 0 && isOutputBufferNeeded == OMX_TRUE ) {
122  tsem_down(pOutputSem);
123  if(pOutputQueue->nelem>0){
124  outBufExchanged++;
125  isOutputBufferNeeded = OMX_FALSE;
126  pOutputBuffer = dequeue(pOutputQueue);
127  if(pOutputBuffer == NULL){
128  DEBUG(DEB_LEV_ERR, "In %s Had NULL output buffer!!\n",__func__);
129  break;
130  }
131  }
132  }
133 
134  if(isOutputBufferNeeded == OMX_FALSE) {
135  if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS) {
136  pOutputBuffer->nFlags = 0;
137  }
138 
139  if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
140  pOutputBuffer->hMarkTargetComponent = omx_base_source_Private->pMark.hMarkTargetComponent;
141  pOutputBuffer->pMarkData = omx_base_source_Private->pMark.pMarkData;
142  omx_base_source_Private->pMark.hMarkTargetComponent = NULL;
143  omx_base_source_Private->pMark.pMarkData = NULL;
144  }
145 
146  target_component = (OMX_COMPONENTTYPE*)pOutputBuffer->hMarkTargetComponent;
147  if(target_component == (OMX_COMPONENTTYPE *)openmaxStandComp) {
148  /*Clear the mark and generate an event*/
149  (*(omx_base_component_Private->callbacks->EventHandler))
150  (openmaxStandComp,
151  omx_base_component_Private->callbackData,
152  OMX_EventMark, /* The command was completed */
153  1, /* The commands was a OMX_CommandStateSet */
154  0, /* The state has been changed in message->messageParam2 */
155  pOutputBuffer->pMarkData);
156  } else if(pOutputBuffer->hMarkTargetComponent != NULL) {
157  /*If this is not the target component then pass the mark*/
158  DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
159  }
160 
161  if(omx_base_source_Private->state == OMX_StateExecuting) {
162  if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer->nFilledLen == 0) {
163  (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer);
164  } else {
165  /*It no buffer management call back then don't produce any output buffer*/
166  pOutputBuffer->nFilledLen = 0;
167  }
168  } else {
169  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_source_Private->state);
170  }
171  if(omx_base_source_Private->state == OMX_StatePause && !PORT_IS_BEING_FLUSHED(pOutPort)) {
172  /*Waiting at paused state*/
173  tsem_wait(omx_base_source_Private->bStateSem);
174  }
175 
176  if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS) {
177  DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in output buffer\n");
178 
179  (*(omx_base_component_Private->callbacks->EventHandler))
180  (openmaxStandComp,
181  omx_base_component_Private->callbackData,
182  OMX_EventBufferFlag, /* The command was completed */
183  0, /* The commands was a OMX_CommandStateSet */
184  pOutputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
185  NULL);
186  //pOutputBuffer->nFlags = 0;
187  omx_base_source_Private->bIsEOSReached = OMX_TRUE;
188  }
189 
190  /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
191  if((pOutputBuffer->nFilledLen != 0) || ((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) || (omx_base_source_Private->bIsEOSReached == OMX_TRUE)) {
192  if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)
193  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s nFlags=%x Name=%s \n", __func__, (int)pOutputBuffer->nFlags, omx_base_source_Private->name);
194  pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
195  outBufExchanged--;
196  pOutputBuffer = NULL;
197  isOutputBufferNeeded = OMX_TRUE;
198  }
199  }
200  }
201  DEBUG(DEB_LEV_SIMPLE_SEQ, "Exiting Buffer Management Thread\n");
202  return NULL;
203 }
204 
212  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
213  omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
214  omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
215  omx_base_PortType *pOutPort[2];
216  tsem_t* pOutputSem[2];
217  queue_t* pOutputQueue[2];
218  OMX_BUFFERHEADERTYPE* pOutputBuffer[2];
219  OMX_COMPONENTTYPE* target_component;
220  OMX_BOOL isOutputBufferNeeded[2];
221  int i,outBufExchanged[2];
222 
223  pOutPort[0]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
224  pOutPort[1]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX_1];
225  pOutputSem[0] = pOutPort[0]->pBufferSem;
226  pOutputSem[1] = pOutPort[1]->pBufferSem;
227  pOutputQueue[0] = pOutPort[0]->pBufferQueue;
228  pOutputQueue[1] = pOutPort[1]->pBufferQueue;
229  pOutputBuffer[1]= pOutputBuffer[0]=NULL;
230  isOutputBufferNeeded[0]=isOutputBufferNeeded[1]=OMX_TRUE;
231  outBufExchanged[0]=outBufExchanged[1]=0;
232 
233  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
234  while(omx_base_source_Private->state == OMX_StateIdle || omx_base_source_Private->state == OMX_StateExecuting || omx_base_source_Private->state == OMX_StatePause ||
235  omx_base_source_Private->transientState == OMX_TransStateLoadedToIdle){
236 
237  /*Wait till the ports are being flushed*/
238  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
239  while( PORT_IS_BEING_FLUSHED(pOutPort[0]) ||
240  PORT_IS_BEING_FLUSHED(pOutPort[1])) {
241  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
242 
243  DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
244  __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
245 
246  if(isOutputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[1])) {
247  pOutPort[1]->ReturnBufferFunction(pOutPort[1],pOutputBuffer[1]);
248  outBufExchanged[1]--;
249  pOutputBuffer[1]=NULL;
250  isOutputBufferNeeded[1]=OMX_TRUE;
251  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 1 buffer\n");
252  }
253 
254  if(isOutputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[0])) {
255  pOutPort[0]->ReturnBufferFunction(pOutPort[0],pOutputBuffer[0]);
256  outBufExchanged[0]--;
257  pOutputBuffer[0]=NULL;
258  isOutputBufferNeeded[0]=OMX_TRUE;
259  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 0 buffer\n");
260  }
261 
262  DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
263  __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
264 
265  tsem_up(omx_base_source_Private->flush_all_condition);
266  tsem_down(omx_base_source_Private->flush_condition);
267  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
268  }
269  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
270 
271  /*No buffer to process. So wait here*/
272  if((isOutputBufferNeeded[0]==OMX_TRUE && pOutputSem[0]->semval==0) &&
273  (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
274  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
275  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 0\n");
276  tsem_down(omx_base_source_Private->bMgmtSem);
277 
278  }
279  if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
280  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
281  break;
282  }
283  if((isOutputBufferNeeded[1]==OMX_TRUE && pOutputSem[1]->semval==0) &&
284  (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid) &&
285  !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
286  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
287  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 1\n");
288  tsem_down(omx_base_source_Private->bMgmtSem);
289 
290  }
291  if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
292  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
293  break;
294  }
295 
296  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer 0 semval=%d \n",pOutputSem[0]->semval);
297  if(pOutputSem[0]->semval>0 && isOutputBufferNeeded[0]==OMX_TRUE ) {
298  tsem_down(pOutputSem[0]);
299  if(pOutputQueue[0]->nelem>0){
300  outBufExchanged[0]++;
301  isOutputBufferNeeded[0]=OMX_FALSE;
302  pOutputBuffer[0] = dequeue(pOutputQueue[0]);
303  if(pOutputBuffer[0] == NULL){
304  DEBUG(DEB_LEV_ERR, "Had NULL output buffer!!\n");
305  break;
306  }
307  }
308  }
309  /*When we have input buffer to process then get one output buffer*/
310  if(pOutputSem[1]->semval>0 && isOutputBufferNeeded[1]==OMX_TRUE) {
311  tsem_down(pOutputSem[1]);
312  if(pOutputQueue[1]->nelem>0){
313  outBufExchanged[1]++;
314  isOutputBufferNeeded[1]=OMX_FALSE;
315  pOutputBuffer[1] = dequeue(pOutputQueue[1]);
316  if(pOutputBuffer[1] == NULL){
317  DEBUG(DEB_LEV_ERR, "Had NULL output buffer!! op is=%d,iq=%d\n",pOutputSem[1]->semval,pOutputQueue[1]->nelem);
318  break;
319  }
320  }
321  }
322 
323  for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts +
324  omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts +
325  omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts +
326  omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts -1);i++) {
327 
328  if(omx_base_source_Private->ports[i]->sPortParam.eDomain!=OMX_PortDomainOther){ /* clock ports are not to be processed */
329  /*Process Output buffer of Port i */
330  if(isOutputBufferNeeded[i]==OMX_FALSE) {
331 
332  /*Pass the Mark to all outgoing buffers*/
333  if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
334  pOutputBuffer[i]->hMarkTargetComponent = omx_base_source_Private->pMark.hMarkTargetComponent;
335  pOutputBuffer[i]->pMarkData = omx_base_source_Private->pMark.pMarkData;
336  }
337 
338  target_component=(OMX_COMPONENTTYPE*)pOutputBuffer[i]->hMarkTargetComponent;
339  if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
340  /*Clear the mark and generate an event*/
341  (*(omx_base_source_Private->callbacks->EventHandler))
342  (openmaxStandComp,
343  omx_base_source_Private->callbackData,
344  OMX_EventMark, /* The command was completed */
345  1, /* The commands was a OMX_CommandStateSet */
346  i, /* The state has been changed in message->messageParam2 */
347  pOutputBuffer[i]->pMarkData);
348  } else if(pOutputBuffer[i]->hMarkTargetComponent!=NULL){
349  /*If this is not the target component then pass the mark*/
350  DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
351  }
352 
353  if(omx_base_source_Private->state == OMX_StateExecuting) {
354  if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer[i]->nFilledLen == 0) {
355  (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer[i]);
356  } else {
357  /*If no buffer management call back then don't produce any output buffer*/
358  pOutputBuffer[i]->nFilledLen = 0;
359  }
360  } else {
361  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_source_Private->state);
362  }
363 
364  if((pOutputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pOutputBuffer[i]->nFilledLen==0) {
365  DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pOutputBuffer[i]->nFilledLen);
366  (*(omx_base_source_Private->callbacks->EventHandler))
367  (openmaxStandComp,
368  omx_base_source_Private->callbackData,
369  OMX_EventBufferFlag, /* The command was completed */
370  i, /* The commands was a OMX_CommandStateSet */
371  pOutputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
372  NULL);
373  }
374  if(omx_base_source_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
375  /*Waiting at paused state*/
376  tsem_wait(omx_base_component_Private->bStateSem);
377  }
378 
379  /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
380  if(pOutputBuffer[i]->nFilledLen!=0 || (pOutputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS){
381  pOutPort[i]->ReturnBufferFunction(pOutPort[i],pOutputBuffer[i]);
382  outBufExchanged[i]--;
383  pOutputBuffer[i]=NULL;
384  isOutputBufferNeeded[i]=OMX_TRUE;
385  }
386  }
387  }
388  }
389 
390  /*Clear the Mark*/
391  if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
392  omx_base_source_Private->pMark.hMarkTargetComponent = NULL;
393  omx_base_source_Private->pMark.pMarkData = NULL;
394  }
395  }
396  DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
397  return NULL;
398 }
void tsem_wait(tsem_t *tsem)
Definition: tsemaphore.c:131
OMX_ERRORTYPE omx_base_component_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
The base constructor for the OpenMAX ST components.
#define DEB_LEV_SIMPLE_SEQ
#define OMX_BASE_SOURCE_OUTPUTPORT_INDEX
#define DEBUG(n, fmt, args...)
queue_t * pBufferQueue
void *(* BufferMgmtFunction)(void *param)
char * OMX_STRING
Definition: OMX_Types.h:206
OMX_BOOL
Definition: OMX_Types.h:189
#define DEB_LEV_ERR
OMX_ERRORTYPE(* EventHandler)(OMX_IN OMX_HANDLETYPE hComponent, OMX_IN OMX_PTR pAppData, OMX_IN OMX_EVENTTYPE eEvent, OMX_IN OMX_U32 nData1, OMX_IN OMX_U32 nData2, OMX_IN OMX_PTR pEventData)
Definition: OMX_Core.h:530
#define OMX_BASE_SOURCE_OUTPUTPORT_INDEX_1
void tsem_up(tsem_t *tsem)
Definition: tsemaphore.c:110
OMX_ERRORTYPE omx_base_source_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
void tsem_down(tsem_t *tsem)
Definition: tsemaphore.c:97
Definition: queue.h:43
void * omx_base_source_twoport_BufferMgmtFunction(void *param)
OMX_ERRORTYPE omx_base_source_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
#define OMX_BUFFERFLAG_EOS
Definition: OMX_Core.h:299
OMX_ERRORTYPE omx_base_component_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
The base destructor for ST OpenMAX components.
OMX_ERRORTYPE err
#define DEB_LEV_FULL_SEQ
OMX_HANDLETYPE hMarkTargetComponent
Definition: OMX_Core.h:417
OMX_PTR pComponentPrivate
#define DEB_LEV_FUNCTION_NAME
void * omx_base_source_BufferMgmtFunction(void *param)
OMX_ERRORTYPE(* ReturnBufferFunction)(omx_base_PortType *openmaxStandPort, OMX_BUFFERHEADERTYPE *pBuffer)
#define PORT_IS_BEING_FLUSHED(pPort)
Definition: omx_base_port.h:39
void * dequeue(queue_t *queue)
Definition: queue.c:122
OMX_PORT_PARAM_TYPE sPortTypesParam[4]
OMX_ERRORTYPE
Definition: OMX_Core.h:126

Generated for OpenMAX Bellagio rel. 0.9.3 by  doxygen 1.5.1
SourceForge.net Logo