Intel(R) Threading Building Blocks Doxygen Documentation
version 4.2.3
|
Go to the documentation of this file.
17 #ifndef __TBB__concurrent_queue_impl_H
18 #define __TBB__concurrent_queue_impl_H
20 #ifndef __TBB_concurrent_queue_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 #include "../tbb_stddef.h"
25 #include "../tbb_machine.h"
26 #include "../atomic.h"
27 #include "../spin_mutex.h"
28 #include "../cache_aligned_allocator.h"
29 #include "../tbb_exception.h"
30 #include "../tbb_profiling.h"
32 #include __TBB_STD_SWAP_HEADER
37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
40 namespace strict_ppl {
41 template<
typename T,
typename A>
class concurrent_queue;
44 template<
typename T,
typename A>
class concurrent_bounded_queue;
49 namespace strict_ppl {
73 static const size_t phi = 3;
77 static const size_t n_queue = 8;
103 return uintptr_t(
p)>1;
121 #if _MSC_VER && !defined(__INTEL_COMPILER)
123 #pragma warning( push )
124 #pragma warning( disable: 4146 )
133 typedef void (*item_constructor_t)(T* location,
const void* src);
145 void copy_item(
page& dst,
size_t dindex,
const void* src, item_constructor_t construct_item ) {
146 construct_item( &get_ref(dst, dindex), src );
150 item_constructor_t construct_item )
152 T& src_item = get_ref( const_cast<page&>(src), sindex );
153 construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
157 T& from = get_ref(src,index);
177 return (&static_cast<padded_page*>(static_cast<void*>(&
p))->
last)[index];
189 item_constructor_t construct_item ) ;
194 item_constructor_t construct_item ) ;
197 size_t end_in_page,
ticket& g_index, item_constructor_t construct_item ) ;
199 void invalidate_page_and_rethrow(
ticket k ) ;
216 item_constructor_t construct_item )
226 ++base.
my_rep->n_invalid_entries;
227 invalidate_page_and_rethrow( k );
233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.
my_rep );
249 copy_item( *
p, index, item, construct_item );
255 ++base.
my_rep->n_invalid_entries;
272 bool success =
false;
275 if(
p->mask & uintptr_t(1)<<index ) {
277 assign_and_destroy_item( dst, *
p, index );
279 --base.
my_rep->n_invalid_entries;
287 item_constructor_t construct_item )
294 ticket g_index = head_counter;
298 size_t end_in_first_page = (index+n_items<base.
my_rep->items_per_page)?(index+n_items):base.
my_rep->items_per_page;
300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301 page* cur_page = head_page;
305 cur_page->
next = make_copy( base, srcp, 0, base.
my_rep->items_per_page, g_index, construct_item );
306 cur_page = cur_page->
next;
311 if( last_index==0 ) last_index = base.
my_rep->items_per_page;
313 cur_page->
next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314 cur_page = cur_page->
next;
316 tail_page = cur_page;
318 invalidate_page_and_rethrow( g_index );
321 head_page = tail_page = NULL;
329 page* invalid_page = (
page*)uintptr_t(1);
335 q->
next = invalid_page;
337 head_page = invalid_page;
338 tail_page = invalid_page;
346 ticket& g_index, item_constructor_t construct_item )
350 new_page->
next = NULL;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->
mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
367 my_ticket(k), my_queue(queue), my_page(
p), allocator(b)
378 my_queue.head_page = q;
380 my_queue.tail_page = NULL;
385 allocator.deallocate_page(
p );
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
406 return k*phi%n_queue;
411 return array[index(k)];
441 return reinterpret_cast<page*>(allocate_block ( n ));
447 deallocate_block( reinterpret_cast<void*>(
p), n );
451 virtual void *allocate_block(
size_t n ) = 0;
454 virtual void deallocate_block(
void *
p,
size_t n ) = 0;
462 for(
size_t i=0; i<nq; i++ )
463 __TBB_ASSERT( my_rep->
array[i].tail_page==NULL,
"pages were not freed properly" );
472 r.
choose(k).push( src, k, *
this, construct_item );
477 bool internal_try_pop(
void* dst ) ;
480 size_t internal_size()
const ;
483 bool internal_empty()
const ;
487 void internal_finish_clear() ;
497 #if __TBB_CPP11_RVALUE_REF_PRESENT
507 const size_t item_size =
sizeof(T);
514 my_rep->item_size = item_size;
515 my_rep->items_per_page = item_size<= 8 ? 32 :
516 item_size<= 16 ? 16 :
536 #if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (push)
538 #pragma warning (disable: 4267)
541 #if defined(_MSC_VER) && defined(_Wp64)
542 #pragma warning (pop)
548 }
while( !r.
choose( k ).pop( dst, k, *
this ) );
555 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
560 ptrdiff_t sz = tc-hc-nie;
561 return sz<0 ? 0 : size_t(sz);
577 for(
size_t i=0; i<nq; ++i ) {
581 deallocate_page( tp );
582 r.
array[i].tail_page = NULL;
601 for(
size_t i = 0; i < r.
n_queue; ++i )
602 r.
array[i].assign( src.
my_rep->array[i], *
this, construct_item);
605 "the source concurrent queue should not be concurrently modified." );
618 head_counter(queue.my_rep->head_counter),
621 for(
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622 array[k] = queue.
my_rep->array[k].head_page;
626 bool get_item( T*& item,
size_t k ) ;
631 if( k==my_queue.my_rep->tail_counter ) {
639 return (
p->mask & uintptr_t(1)<<i)!=0;
645 template<
typename Value>
651 template<
typename C,
typename T,
typename U>
654 template<
typename C,
typename T,
typename U>
662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
669 : my_rep(NULL), my_item(NULL) {
694 template<
typename Value>
699 if( !my_rep->get_item(my_item, k) ) advance();
702 template<
typename Value>
704 if( my_rep!=other.
my_rep ) {
717 template<
typename Value>
719 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
720 size_t k = my_rep->head_counter;
724 my_rep->get_item(tmp,k);
733 my_rep->head_counter = ++k;
734 if( !my_rep->get_item(my_item, k) ) advance();
747 template<
typename Container,
typename Value>
749 public std::iterator<std::forward_iterator_tag,Value> {
750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
751 template<
typename T,
class A>
752 friend class ::tbb::strict_ppl::concurrent_queue;
779 return *static_cast<Value*>(this->my_item);
792 Value* result = &operator*();
799 template<
typename C,
typename T,
typename U>
804 template<
typename C,
typename T,
typename U>
854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
868 virtual void copy_item(
page& dst,
size_t index,
const void* src ) = 0;
869 virtual void assign_and_destroy_item(
void* dst,
page& src,
size_t index ) = 0;
915 #if __TBB_CPP11_RVALUE_REF_PRESENT
926 void internal_insert_item(
const void* src, copy_specifics op_type );
929 bool internal_insert_if_not_full(
const void* src, copy_specifics op_type );
934 virtual void copy_page_item(
page& dst,
size_t dindex,
const page& src,
size_t sindex ) = 0;
955 virtual void move_item(
page& dst,
size_t index,
const void* src ) = 0;
965 template<
typename C,
typename T,
typename U>
968 template<
typename C,
typename T,
typename U>
1011 template<
typename Container,
typename Value>
1013 public std::iterator<std::forward_iterator_tag,Value> {
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<
typename T,
class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1045 return *static_cast<Value*>(
my_item);
1065 template<
typename C,
typename T,
typename U>
1070 template<
typename C,
typename T,
typename U>
Value * operator++(int)
Post increment.
virtual page * allocate_page() __TBB_override
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
void internal_finish_clear()
free any remaining pages
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
A lock that occupies a single byte.
micro_queue< T >::padded_page padded_page
Value * operator->() const
static size_t index(ticket k)
Map ticket to an array index.
parts of concurrent_queue_rep that do not have references to micro_queue
concurrent_queue_rep * my_rep
Internal representation.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
Base class for types that should not be copied or assigned.
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
concurrent_queue_rep_base::page page
Class that implements exponential backoff.
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
Abstract class to define interface for page allocation/deallocation.
virtual void move_item(page &dst, size_t index, const void *src)=0
Value * my_item
Pointer to current item.
concurrent_queue_iterator()
representation of concurrent_queue_base
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Value * operator->() const
virtual ~concurrent_queue_base_v3()
concurrent_queue_base_v8(size_t item_sz)
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
micro_queue< T > array[n_queue]
Internal representation of a ConcurrentQueue.
void swap(atomic< T > &lhs, atomic< T > &rhs)
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
A queue using simple locking.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
size_t items_per_page
Always a power of 2.
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
micro_queue< T > & choose(ticket k)
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
Similar to C++0x std::remove_cv.
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
concurrent_queue_page_allocator & allocator
concurrent_queue_base_v3()
void internal_throw_exception() const
Obsolete.
const concurrent_queue_base_v3< T > & my_queue
ptrdiff_t my_capacity
Capacity of the queue.
~concurrent_queue_iterator_base_v3()
Destructor.
void pause()
Pause for a while.
Constness-independent portion of concurrent_queue_iterator.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
static const size_t n_queue
concurrent_queue_rep< T >::page page
concurrent_queue_rep< T > * my_rep
Internal representation.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
concurrent_queue_rep_base::page page
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
micro_queue< T > & my_queue
micro_queue< T >::padded_page padded_page
void assign_and_destroy_item(void *dst, page &src, size_t index)
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
concurrent_queue_iterator & operator++()
Advance to next item in queue.
atomic< page * > tail_page
virtual ~concurrent_queue_page_allocator()
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
void move(tbb_thread &t1, tbb_thread &t2)
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
atomic< ticket > head_counter
T last
Must be last field.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Type-independent portion of concurrent_queue_iterator.
bool internal_empty() const
check if the queue is empty; thread safe
micro_queue< T >::item_constructor_t item_constructor_t
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
atomic< page * > head_page
size_t item_size
Size of an item.
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
size_t items_per_page
Always a power of 2.
atomic< ticket > head_counter
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Class used to ensure exception-safety of method "pop".
Value & operator*() const
Reference to current item.
padded_page()
Not defined anywhere - exists to quiet warnings.
void itt_hide_store_word(T &dst, T src)
Meets requirements of a forward iterator for STL.
Value & operator*() const
Reference to current item.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Base class for types that should not be assigned.
void invalidate_page_and_rethrow(ticket k)
T last
Must be last field.
Value * operator++(int)
Post increment.
void advance()
Advance iterator one step towards tail of queue.
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
concurrent_queue_iterator_base_v3()
Default constructor.
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
static T & get_ref(page &p, size_t index)
base class of concurrent_queue
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
~micro_queue_pop_finalizer()
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
atomic< ticket > tail_counter
Meets requirements of a forward iterator for STL.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
Identifiers declared inside namespace internal should never be used directly by client code.
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
auto last(Container &c) -> decltype(begin(c))
void const char const char int ITT_FORMAT __itt_group_sync p
void * my_item
Pointer to current item.
#define __TBB_EXPORTED_METHOD
A queue using simple locking.
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
void call_itt_notify(notify_type, void *)
size_t item_size
Size of an item.
concurrent_queue_iterator()
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Represents acquisition of a mutex.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
concurrent_queue_iterator_base_v3()
Default constructor.
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
atomic< ticket > tail_counter
#define __TBB_compiler_fence()
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
virtual concurrent_queue_rep_base::page * allocate_page()=0
Copyright © 2005-2020 Intel Corporation. All Rights Reserved.
Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are
registered trademarks or trademarks of Intel Corporation or its
subsidiaries in the United States and other countries.
* Other names and brands may be claimed as the property of others.