SeqAn3  3.2.0
The Modern C++ library for sequence analysis.
algorithm_executor_blocking.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <functional>
16 #include <optional>
17 #include <ranges>
18 #include <type_traits>
19 
22 
23 namespace seqan3::detail
24 {
25 
54 template <std::ranges::viewable_range resource_t,
55  std::semiregular algorithm_t,
56  std::semiregular algorithm_result_t,
57  typename execution_handler_t = execution_handler_sequential>
58  requires std::ranges::forward_range<resource_t>
59  && std::invocable<algorithm_t,
60  std::ranges::range_reference_t<resource_t>,
61  std::function<void(algorithm_result_t)>>
62 class algorithm_executor_blocking
63 {
64 private:
69  using resource_type = std::views::all_t<resource_t>;
71  using resource_iterator_type = std::ranges::iterator_t<resource_type>;
73  using resource_difference_type = std::iter_difference_t<resource_iterator_type>;
75 
80  using bucket_type = std::vector<algorithm_result_t>;
82  using bucket_iterator_type = std::ranges::iterator_t<bucket_type>;
84  using buffer_type = std::vector<bucket_type>;
86  using buffer_iterator_type = std::ranges::iterator_t<buffer_type>;
88 
90  enum fill_status
91  {
92  non_empty_buffer,
93  empty_buffer,
94  end_of_resource
95  };
96 
97 public:
103  algorithm_executor_blocking() = delete;
105  algorithm_executor_blocking(algorithm_executor_blocking const &) = delete;
106 
124  algorithm_executor_blocking(algorithm_executor_blocking && other) noexcept :
125  algorithm_executor_blocking{std::move(other), other.resource_position()}
126  {}
127 
129  algorithm_executor_blocking & operator=(algorithm_executor_blocking const &) = delete;
130 
133  algorithm_executor_blocking & operator=(algorithm_executor_blocking && other)
134  {
135  auto old_resource_position = other.resource_position();
136 
137  resource = std::move(other.resource);
138  move_initialise(std::move(other), old_resource_position);
139  return *this;
140  }
141 
143  ~algorithm_executor_blocking() = default;
144 
159  algorithm_executor_blocking(resource_t resource,
160  algorithm_t algorithm,
161  algorithm_result_t const SEQAN3_DOXYGEN_ONLY(result) = algorithm_result_t{},
162  execution_handler_t && exec_handler = execution_handler_t{}) :
163  exec_handler{std::move(exec_handler)},
164  resource{std::forward<resource_t>(resource)},
165  resource_it{std::ranges::begin(this->resource)},
166  algorithm{std::move(algorithm)}
167  {
168  if constexpr (std::same_as<execution_handler_t, execution_handler_parallel>)
169  buffer_size = static_cast<size_t>(std::ranges::distance(this->resource));
170 
171  buffer.resize(buffer_size);
172  buffer_it = buffer.end();
173  buffer_end_it = buffer_it;
174  }
176 
193  {
194  fill_status status;
195  // Each invocation of the algorithm might produce zero results (e.g. a search might not find a query)
196  // this repeats the algorithm until it produces the first result or the input resource was consumed.
197  do
198  {
199  status = fill_buffer();
200  }
201  while (status == fill_status::empty_buffer);
202 
203  if (status == fill_status::end_of_resource)
204  return {std::nullopt};
205 
206  assert(status == fill_status::non_empty_buffer);
207  assert(bucket_it != buffer_it->end());
208 
209  std::optional<algorithm_result_t> result = std::ranges::iter_move(bucket_it);
210  go_to_next_result(); // Go to next buffered result.
211  return result;
212  }
213 
215  bool is_eof() noexcept
216  {
217  return resource_it == std::ranges::end(resource);
218  }
219 
220 private:
226  algorithm_executor_blocking(algorithm_executor_blocking && other,
227  resource_difference_type old_resource_position) noexcept :
228  resource{std::move(other.resource)}
229  {
230  move_initialise(std::move(other), old_resource_position);
231  }
232 
239  resource_difference_type resource_position()
240  {
241  // Get the old resource position.
242  auto position = std::ranges::distance(std::ranges::begin(resource), resource_it);
243  assert(position >= 0);
244  return position;
245  }
246 
248  fill_status fill_buffer()
249  {
250  if (!is_buffer_empty()) // Not everything consumed yet.
251  return fill_status::non_empty_buffer;
252 
253  if (is_eof()) // Case: reached end of resource.
254  return fill_status::end_of_resource;
255 
256  // Reset the buckets and the buffer iterator.
257  reset_buffer();
258 
259  // Execute the algorithm (possibly asynchronous) and fill the buckets in this pre-assigned order.
260  for (buffer_end_it = buffer_it; buffer_end_it != buffer.end() && !is_eof(); ++buffer_end_it, ++resource_it)
261  {
262  exec_handler.execute(algorithm,
263  *resource_it,
264  [target_buffer_it = buffer_end_it](auto && algorithm_result)
265  {
266  target_buffer_it->push_back(std::move(algorithm_result));
267  });
268  }
269 
270  exec_handler.wait();
271 
272  // Move the results iterator to the next available result. (This skips empty results of the algorithm)
273  find_next_non_empty_bucket();
274 
275  if (is_buffer_empty())
276  return fill_status::empty_buffer;
277 
278  return fill_status::non_empty_buffer;
279  }
280 
284  bool is_buffer_empty() const
285  {
286  return buffer_it == buffer_end_it;
287  }
288 
297  void reset_buffer()
298  {
299  // Clear all buckets
300  for (auto & bucket : buffer)
301  bucket.clear();
302 
303  // Reset the iterator over the buckets.
304  buffer_it = buffer.begin();
305  }
306 
315  void find_next_non_empty_bucket()
316  {
317  assert(buffer_it <= buffer_end_it);
318  // find first buffered bucket that contains at least one element
319  buffer_it = std::find_if(buffer_it,
320  buffer_end_it,
321  [](auto const & buffer)
322  {
323  return !buffer.empty();
324  });
325 
326  if (buffer_it != buffer_end_it)
327  bucket_it = buffer_it->begin();
328  }
329 
337  void go_to_next_result()
338  {
339  if (++bucket_it == buffer_it->end())
340  {
341  ++buffer_it;
342  find_next_non_empty_bucket();
343  }
344  }
345 
347  void move_initialise(algorithm_executor_blocking && other, resource_difference_type old_resource_position) noexcept
348  {
349  algorithm = std::move(other.algorithm);
350  buffer_size = std::move(other.buffer_size);
351  exec_handler = std::move(other.exec_handler);
352  // Move the resource and set the iterator state accordingly.
353  resource_it = std::ranges::next(std::ranges::begin(resource), old_resource_position);
354 
355  // Get the old buffer and bucket iterator positions.
356  auto buffer_it_position = other.buffer_it - other.buffer.begin();
357  auto buffer_end_it_position = other.buffer_end_it - other.buffer.begin();
358 
359  std::ptrdiff_t bucket_it_position = 0;
360  if (buffer_it_position != buffer_end_it_position)
361  bucket_it_position = other.bucket_it - other.buffer_it->begin();
362 
363  // Move the buffer and set the buffer and bucket iterator accordingly.
364  buffer = std::move(other.buffer);
365  buffer_it = buffer.begin() + buffer_it_position;
366  buffer_end_it = buffer.begin() + buffer_end_it_position;
367 
368  if (buffer_it_position != buffer_end_it_position)
369  bucket_it = buffer_it->begin() + bucket_it_position;
370  }
371 
373  execution_handler_t exec_handler{};
374 
376  resource_type resource; // a std::ranges::view
378  resource_iterator_type resource_it{};
380  algorithm_t algorithm{};
381 
383  buffer_type buffer{};
385  buffer_iterator_type buffer_it{};
387  buffer_iterator_type buffer_end_it{};
389  bucket_iterator_type bucket_it{};
391  size_t buffer_size{1};
392 };
393 
400 template <typename resource_rng_t, std::semiregular algorithm_t, std::semiregular algorithm_result_t>
401 algorithm_executor_blocking(resource_rng_t &&, algorithm_t, algorithm_result_t const &)
402  -> algorithm_executor_blocking<resource_rng_t, algorithm_t, algorithm_result_t, execution_handler_sequential>;
404 } // namespace seqan3::detail
Provides seqan3::detail::execution_handler_parallel.
Provides seqan3::detail::execution_handler_sequential.
T find_if(T... args)
requires requires
The rank_type of the semi-alphabet; defined as the return type of seqan3::to_rank....
Definition: alphabet/concept.hpp:164
constexpr auto is_eof
Checks whether a given letter is equal to the EOF constant defined in <cstdio>.
Definition: predicate.hpp:75
The <ranges> header from C++20's standard library.