SeqAn3  3.2.0
The Modern C++ library for sequence analysis.
suspendable_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
14 #pragma once
15 
16 #include <algorithm>
17 #include <cassert>
18 #include <condition_variable>
19 #include <iterator>
20 #include <mutex>
21 #include <ranges>
22 #include <span>
23 #include <thread>
24 #include <vector>
25 
26 #include <seqan3/core/platform.hpp>
27 
28 namespace seqan3::contrib
29 {
30 
31 // ============================================================================
32 // Forwards
33 // ============================================================================
34 
35 // ============================================================================
36 // Classes
37 // ============================================================================
38 
39 // ----------------------------------------------------------------------------
40 // Class ConcurrentQueue
41 // ----------------------------------------------------------------------------
42 
43 template <typename TSpec = void>
44 struct Tag{};
45 
46 template <typename TSpec = Tag<void>>
47 struct Suspendable;
48 
49 template <typename TValue, typename TSpec = Suspendable<>>
50 class ConcurrentQueue;
51 
52 struct Limit_;
53 using Limit = Tag<Limit_>;
54 
55 template <typename TValue, typename TSpec>
56 class ConcurrentQueue<TValue, Suspendable<TSpec> >
57 {
58 public:
59  typedef std::vector<TValue> TString;
60  typedef typename TString::size_type TSize;
61 
62  size_t readerCount;
63  size_t writerCount;
64 
65  TString data;
66  TSize occupied;
67  TSize back;
68  TSize front;
69 
70  std::mutex cs;
72 
73  bool virgin;
74 
75  ConcurrentQueue():
76  readerCount(0),
77  writerCount(0),
78  occupied(0),
79  back(0),
80  front(0),
81  virgin(true)
82  {}
83 
84  ~ConcurrentQueue()
85  {
86  assert(writerCount == 0u);
87 
88  // wait for all pending readers to finish
89  while (readerCount != 0u)
90  {}
91  }
92 };
93 
94 template <typename TValue>
95 class ConcurrentQueue<TValue, Suspendable<Limit> >:
96  public ConcurrentQueue<TValue, Suspendable<> >
97 {
98 public:
99  typedef ConcurrentQueue<TValue, Suspendable<> > TBase;
100  typedef typename TBase::TString TString;
101  typedef typename TBase::TSize TSize;
102 
104 
105  ConcurrentQueue(TSize maxSize):
106  TBase()
107  {
108  this->data.resize(maxSize);
109  // reserve(this->data, maxSize, Exact());
110  // _setLength(this->data, maxSize);
111  }
112 
113  ConcurrentQueue(ConcurrentQueue const & other):
114  TBase((TBase const &)other)
115  {}
116 };
117 
118 template <typename TValue, typename TSpec>
119 inline void
120 lockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
121 {}
122 
123 template <typename TValue, typename TSpec>
124 inline void
125 unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
126 {
127  {
129  if (--me.readerCount != 0u)
130  return;
131  }
132  me.less.notify_all(); // publish the condition that reader count is 0.
133 }
134 
135 template <typename TValue, typename TSpec>
136 inline void
137 lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
138 {}
139 
140 template <typename TValue, typename TSpec>
141 inline void
142 unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
143 {
144  {
145  std::lock_guard<std::mutex> lk(me.cs);
146  if (--me.writerCount != 0u)
147  return;
148  }
149  me.more.notify_all(); // publish the condition, that writer count is 0.
150 }
151 
152 template <typename TValue, typename TSize, typename TSpec>
153 inline void
154 setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize readerCount)
155 {
157  me.readerCount = readerCount;
158 }
159 
160 template <typename TValue, typename TSize, typename TSpec>
161 inline void
162 setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize writerCount)
163 {
165  me.writerCount = writerCount;
166 }
167 
168 template <typename TValue, typename TSize1, typename TSize2, typename TSpec>
169 inline void
170 setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize1 readerCount, TSize2 writerCount)
171 {
173  me.readerCount = readerCount;
174  me.writerCount = writerCount;
175 }
176 
177 template <typename TValue, typename TSize, typename TSpec>
178 inline bool
179 waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
180  TSize minSize)
181 {
183  while (me.occupied < minSize && me.writerCount > 0u)
184  me.more.wait(lock);
185  return me.occupied >= minSize;
186 }
187 
188 template <typename TValue, typename TSpec>
189 inline bool
190 empty(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
191 {
192  return me.occupied == 0;
193 }
194 
195 template <typename TValue, typename TSpec>
196 inline typename ConcurrentQueue<TValue, Suspendable<TSpec> >::SizeType
197 length(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
198 {
199  return me.occupied;
200 }
201 
202 template <typename TValue, typename TSpec>
203 inline bool
204 _popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
206 {
207  typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
208  typedef typename TQueue::TString TString;
209  typedef typename TString::size_type TSize;
210 
211  TSize cap = me.data.size();
212 
213  while (me.occupied == 0u && me.writerCount > 0u)
214  me.more.wait(lk);
215 
216  if (me.occupied == 0u)
217  return false;
218 
219  assert(me.occupied > 0u);
220 
221  // extract value and destruct it in the data string
222  // TIter it = me.data.begin() + me.front;
223  result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
224  // std::swap(result, *it);
225  // valueDestruct(it);
226 
227  me.front = (me.front + 1) % cap;
228  me.occupied--;
229 
230  /* now: either me.occupied > 0 and me.nextout is the index
231  of the next occupied slot in the buffer, or
232  me.occupied == 0 and me.nextout is the index of the next
233  (empty) slot that will be filled by a producer (such as
234  me.nextout == me.nextin) */
235 
236  return true;
237 }
238 
239 template <typename TValue, typename TSpec>
240 inline bool
241 _popBack(TValue & result,
242  ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
244 {
245  typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
246  typedef typename TQueue::TString TString;
247  typedef typename TString::size_type TSize;
248 
249  TSize cap = me.data.size();
250 
251  while (me.occupied == 0u && me.writerCount > 0u)
252  me.more.wait(lk);
253 
254  if (me.occupied == 0u)
255  return false;
256 
257  assert(me.occupied > 0u);
258 
259  me.back = (me.back + cap - 1) % cap;
260 
261  // extract value and destruct it in the data string
262  // TIter it = me.data.begin() + me.back;
263  result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
264  // std::swap(result, *it);
265  // valueDestruct(it);
266 
267  me.occupied--;
268 
269  /* now: either me.occupied > 0 and me.nextout is the index
270  of the next occupied slot in the buffer, or
271  me.occupied == 0 and me.nextout is the index of the next
272  (empty) slot that will be filled by a producer (such as
273  me.nextout == me.nextin) */
274 
275  return true;
276 }
277 
278 template <typename TValue, typename TSpec>
279 inline bool
280 popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
281 {
283  return _popFront(result, me, lock);
284 }
285 
286 template <typename TValue>
287 inline bool
288 popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
289 {
290  {
292  if (!_popFront(result, me, lk))
293  return false;
294  }
295  me.less.notify_all();
296  return true;
297 }
298 
299 template <typename TValue, typename TSpec>
300 inline bool
301 popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
302 {
304  return _popBack(result, me, lk);
305 }
306 
307 template <typename TValue>
308 inline bool
309 popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
310 {
311  {
313  if (!_popBack(result, me, lk))
314  return false;
315  }
316  me.less.notify_all();
317  return true;
318 }
319 
320 
321 template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
322 inline bool
323 appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
324  TValue2 && val,
325  [[maybe_unused]] Tag<TExpand> expandTag)
326 {
327  typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
328  typedef typename TQueue::TString TString;
329  typedef typename TString::size_type TSize;
330 
331  {
333  TSize cap = me.data.size();
334 
335  if (me.occupied >= cap)
336  {
337  // increase capacity
338  // _setLength(me.data, cap);
339  // reserve(me.data, cap + 1, expandTag);
340  me.data.resize(cap + 1);
341  TSize delta = me.data.size() - cap;
342  assert(delta == 1);
343 
344  // create a gap of delta many values between tail and head
345  // Why?
346  // _clearSpace(me.data, delta, me.back, me.back, expandTag);
347  std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
348  me.data.data() + me.data.size());
349  if (me.occupied != 0 && me.back <= me.front)
350  me.front += delta;
351 
352  cap += delta;
353  }
354 
355  // valueConstruct(begin(me.data, Standard()) + me.back, val);
356  *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
357  me.back = (me.back + 1) % cap;
358 
359  ++me.occupied;
360  }
361 
362  /* now: either me.occupied < BSIZE and me.nextin is the index
363  of the next empty slot in the buffer, or
364  me.occupied == BSIZE and me.nextin is the index of the
365  next (occupied) slot that will be emptied by a consumer
366  (such as me.nextin == me.nextout) */
367  me.more.notify_all();
368  return true;
369 }
370 
371 template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
372 inline bool
373 appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
374  TValue2 && val,
375  Tag<TExpand> expandTag);
376 
377 template <typename TValue, typename TValue2>
378 inline bool
379 appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
380  TValue2 && val,
381  Limit)
382 {
383  typedef ConcurrentQueue<TValue, Suspendable<Limit> > TQueue;
384  typedef typename TQueue::TString TString;
385  typedef typename TString::size_type TSize;
386 
387  {
389  TSize cap = me.data.size();
390 
391  while (me.occupied >= cap && me.readerCount > 0u)
392  me.less.wait(lock);
393 
394  if (me.occupied >= cap)
395  return false;
396 
397  assert(me.occupied < cap);
398 
399  // valueConstruct(begin(me.data, Standard()) + me.back, val);
400  *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
401  me.back = (me.back + 1) % cap;
402  me.occupied++;
403  }
404 
405  /* now: either me.occupied < BSIZE and me.nextin is the index
406  of the next empty slot in the buffer, or
407  me.occupied == BSIZE and me.nextin is the index of the
408  next (occupied) slot that will be emptied by a consumer
409  (such as me.nextin == me.nextout) */
410  me.more.notify_all();
411  return true;
412 }
413 
414 template <typename TValue, typename TValue2, typename TSpec>
415 inline bool
416 appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
417  TValue2 && val)
418 {
419  return appendValue(me, std::forward<TValue2>(val), TSpec{});
420 }
421 
422 } // namespace seqan3::contrib
T data(T... args)
T empty(T... args)
T lock(T... args)
Provides platform and dependency checks.
The <ranges> header from C++20's standard library.
Provides std::span from the C++20 standard library.