ADTF  3.18.2
sample_collectors.h
Go to the documentation of this file.
1 
7 #pragma once
8 #include "samplereader.h"
9 
10 #include <mutex>
11 #include <deque>
12 #include <vector>
13 #include <memory>
14 #include <functional>
15 
16 namespace adtf
17 {
18 namespace streaming
19 {
20 namespace ant
21 {
22 
23 namespace detail
24 {
25 
27 {
28  private:
29  cStreamItem m_oLastReadSampleStreamItem;
30 
31  std::recursive_mutex m_oQueueMutex;
32  std::deque<cStreamItem> m_oItems;
33 
34  cExternalQueueSampleReader& m_oSampleReader;
35 
36  public:
39 
40  tResult GetSampleBeforeOrAt(tTimeStamp tmTime,
43  tResult GetNextSampleAndType(adtf::ucom::iobject_ptr<const ISample>& pSample,
45 
46  tResult Push(const IStreamItem& oStreamItem, tTimeStamp tmTime) override;
47  tResult Pop(IStreamItem& oStreamItem) override;
48  void Clear() override;
49 
50  size_t IsEmpty();
51 
52  cExternalQueueSampleReader& GetReader();
53 };
54 
55 }
56 
63 {
64  private:
65  std::vector<cLastSampleReader*> m_oSampleReaders;
66 
67  public:
68 
75  {
76  m_oSampleReaders.push_back(&oSampleReader);
78  }
79 
88  template<typename ... T>
90  {
91  constexpr size_t nSize = sizeof...(T);
92  if (m_oSampleReaders.size() > nSize)
93  {
94  RETURN_IF_FAILED(m_oSampleReaders[m_oSampleReaders.size() - nSize - 1]->GetLastSample(pSample));
95  }
96  else
97  {
98  RETURN_ERROR(ERR_INVALID_ARG);
99  }
100  return GetLastSamples(samples...);
101  }
102 
103  tResult GetLastSamples()
104  {
106  }
107 };
108 
123 {
124  public:
125  typedef std::function<tResult(cExternalQueueSampleReader&, const adtf::ucom::iobject_ptr<const IStreamType>& pStreamType)> tStreamTypeChangeCallback;
126 
127  private:
128  tStreamTypeChangeCallback m_pTypeChangeCallback;
129 
130  struct tQueue
131  {
132  std::shared_ptr<detail::cExternalSampleReaderQueue> pSampleQueue;
136  };
137 
138  std::vector<tQueue> m_oQueues;
139 
140  public:
141  cSampleSetCollector() = default;
142 
148  cSampleSetCollector(tStreamTypeChangeCallback pTypeChangeCallback);
149 
150  cSampleSetCollector(const cSampleSetCollector&) = delete;
152 
153  cSampleSetCollector& operator=(const cSampleSetCollector&) = delete;
154  cSampleSetCollector& operator=(cSampleSetCollector&&) = default;
155 
156 
163 
167  void ClearQueues();
168 
178  template<typename ...T>
179  tResult GetNextSampleSet(T& ...samples)
180  {
181  if (m_oQueues.size() <= 0)
182  {
183  RETURN_ERROR(ERR_EMPTY);
184  }
185 
186  constexpr size_t nSize = sizeof...(T);
187  if (m_oQueues.size() < nSize)
188  {
189  RETURN_ERROR(ERR_INVALID_ARG);
190  }
191 
192  RETURN_IF_FAILED(RetrieveSamples());
193 
194  return GetSamples(samples...);
195  }
196 
197  private:
198  tResult RetrieveSamples();
199 
200  template<typename ... T>
201  tResult GetSamples(adtf::ucom::iobject_ptr<const ISample>& pSample, T & ... samples)
202  {
203  constexpr size_t nSize = sizeof...(T);
204 
205  RETURN_IF_FAILED(pSample.Reset(m_oQueues[m_oQueues.size() - nSize - 1].pFirstSample));
206 
207  return GetSamples(samples...);
208  }
209 
210  tResult GetSamples()
211  {
213  }
214 };
215 
230 {
231  public:
232  typedef std::function<tResult(cExternalQueueSampleReader&, const adtf::ucom::iobject_ptr<const IStreamType>& pStreamType)> tStreamTypeChangeCallback;
233 
234  private:
235  std::vector<std::shared_ptr<detail::cExternalSampleReaderQueue>> m_oQueues;
236  tStreamTypeChangeCallback m_pTypeChangeCallback;
237 
238  public:
239  cTimeBasedSampleSetCollector() = default;
240 
246  cTimeBasedSampleSetCollector(tStreamTypeChangeCallback pTypeChangeCallback);
247 
250 
253 
260 
264  void ClearQueues();
265 
277  template<typename ... T>
278  tResult GetSampleSetAt(tTimeStamp tmTime, T& ... samples)
279  {
280  constexpr size_t nSize = sizeof...(T);
281 
282  if (m_oQueues.size() < nSize)
283  {
284  RETURN_ERROR(ERR_INVALID_ARG);
285  }
286 
287  bool bSampleFound = false;
288  RETURN_IF_FAILED(GetSampleSetWithCheckAt(tmTime, bSampleFound, samples...));
289 
290  if (!bSampleFound)
291  {
292  RETURN_ERROR(ERR_NOT_FOUND);
293  }
294 
296  }
297 
298  private:
299  template<typename ... T>
300  tResult GetSampleSetWithCheckAt(tTimeStamp tmTime, bool& bSampleFound, adtf::ucom::iobject_ptr<const ISample>& pSample, T & ... samples)
301  {
302  constexpr size_t nSize = sizeof...(T);
303 
304  auto& pQueue = m_oQueues[m_oQueues.size() - nSize - 1];
306  bSampleFound |= IS_OK(pQueue->GetSampleBeforeOrAt(tmTime, pSample, pNewStreamType));
307 
308  if (pNewStreamType && m_pTypeChangeCallback)
309  {
310  RETURN_IF_FAILED(m_pTypeChangeCallback(pQueue->GetReader(), pNewStreamType));
311  }
312 
313  return GetSampleSetWithCheckAt(tmTime, bSampleFound, samples...);
314  }
315 
316  tResult GetSampleSetWithCheckAt(tTimeStamp /* tmTime */, bool& /* bSampleFound */)
317  {
319  }
320 };
321 
322 
323 } //namespace ant
324 
325 namespace flash
326 {
327 
328 namespace detail
329 {
330 
332 {
333  public:
335  virtual ~cExternalSampleReaderQueue();
336 
337  tResult GetSampleBeforeOrAt(tTimeStamp tmTime,
340  tResult GetNextSampleAndType(adtf::ucom::iobject_ptr<const ant::ISample>& pSample,
342 
343  tResult Push(const IStreamItem& oStreamItem, tTimeStamp tmTime) override;
344  tResult Pop(IStreamItem& oStreamItem) override;
345  void Clear() override;
346 
347  size_t IsEmpty();
348 
349  ISampleReader& GetReader();
350 
351  private:
352  cStreamItem m_oLastReadSampleStreamItem;
353 
354  std::recursive_mutex m_oQueueMutex;
355  std::deque<cStreamItem> m_oItems;
356 
357  ISampleReader& m_oSampleReader;
358 };
359 
360 template<typename T>
362 {
363  using type = void;
364 };
365 
366 template<typename Ret, typename Class, typename... Args>
367 struct function_type_from_operator<Ret(Class::*)(Args...) const>
368 {
369  using type = std::function<Ret(Args...)>;
370 };
371 
372 template<typename F>
373 typename function_type_from_operator<decltype(&F::operator())>::type
374 function_from_lambda(F const &func)
375 {
376  return func;
377 }
378 
379 template <typename ReaderType>
381 get_type_change_callback_translator(std::function<tResult(ReaderType&, const adtf::ucom::iobject_ptr<const IStreamType>&)> fnCallback)
382 {
383  return [=](ISampleReader& oReader, const adtf::ucom::iobject_ptr<const IStreamType>& pStreamType) -> tResult
384  {
385  auto pReader = dynamic_cast<ReaderType*>(&oReader);
386  if (!pReader)
387  {
388  RETURN_ERROR_DESC(ERR_UNEXPECTED, "Reader is not of the expected class.");
389  }
390  return fnCallback(*pReader, pStreamType);
391  };
392 }
393 
394 template <typename Callable>
395 std::function<tResult(ISampleReader&, const adtf::ucom::iobject_ptr<const IStreamType>&)>
396 get_type_change_callback_translator(Callable&& fnCallback)
397 {
398  return get_type_change_callback_translator(function_from_lambda(fnCallback));
399 }
400 
401 }
402 
404 {
405  private:
406  std::vector<ISampleReader*> m_oSampleReaders;
407 
408  public:
409 
416  {
417  m_oSampleReaders.push_back(&oSampleReader);
419  }
420 
429  template<typename ... T>
431  {
432  constexpr size_t nSize = sizeof...(T);
433  if (m_oSampleReaders.size() > nSize)
434  {
435  RETURN_IF_FAILED(m_oSampleReaders[m_oSampleReaders.size() - nSize - 1]->GetLastSample(pSample));
436  }
437  else
438  {
439  RETURN_ERROR(ERR_INVALID_ARG);
440  }
441  return GetLastSamples(samples...);
442  }
443 
444  tResult GetLastSamples()
445  {
447  }
448 };
449 
451 {
452  public:
453  typedef std::function<tResult(ISampleReader&, const adtf::ucom::iobject_ptr<const IStreamType>& pStreamType)> tStreamTypeChangeCallback;
454 
455  public:
456  cSampleSetCollector() = default;
457 
458  template <typename Callback>
459  cSampleSetCollector(Callback pTypeChangeCallback):
461  {
462  m_pTypeChangeCallback = detail::get_type_change_callback_translator(pTypeChangeCallback);
463  }
464 
465  cSampleSetCollector(const cSampleSetCollector&) = delete;
467 
468  cSampleSetCollector& operator=(const cSampleSetCollector&) = delete;
469  cSampleSetCollector& operator=(cSampleSetCollector&&) = default;
470 
471  tResult CollectFrom(ISampleReader& oSampleReader);
472  void ClearQueues();
473 
474  template<typename ...T>
475  tResult GetNextSampleSet(T& ...samples)
476  {
477  if (m_oQueues.size() <= 0)
478  {
479  RETURN_ERROR(ERR_EMPTY);
480  }
481 
482  constexpr size_t nSize = sizeof...(T);
483  if (m_oQueues.size() < nSize)
484  {
485  RETURN_ERROR(ERR_INVALID_ARG);
486  }
487 
488  RETURN_IF_FAILED(RetrieveSamples());
489 
490  return GetSamples(samples...);
491  }
492 
493  private:
494  tResult RetrieveSamples();
495 
496  template<typename ... T>
497  tResult GetSamples(adtf::ucom::iobject_ptr<const ant::ISample>& pSample, T & ... samples)
498  {
499  constexpr size_t nSize = sizeof...(T);
500 
501  RETURN_IF_FAILED(pSample.Reset(m_oQueues[m_oQueues.size() - nSize - 1].pFirstSample));
502 
503  return GetSamples(samples...);
504  }
505 
506  tResult GetSamples()
507  {
509  }
510 
511  private:
512  tStreamTypeChangeCallback m_pTypeChangeCallback;
513 
514  struct tQueue
515  {
516  std::shared_ptr<detail::cExternalSampleReaderQueue> pSampleQueue;
520  };
521 
522  std::vector<tQueue> m_oQueues;
523 };
524 
526 {
527  public:
528  typedef std::function<tResult(ISampleReader&, const adtf::ucom::iobject_ptr<const IStreamType>& pStreamType)> tStreamTypeChangeCallback;
529 
530  public:
531  cTimeBasedSampleSetCollector() = default;
532 
533  template <typename Callback>
534  cTimeBasedSampleSetCollector(Callback pTypeChangeCallback):
536  {
537  m_pTypeChangeCallback = detail::get_type_change_callback_translator(pTypeChangeCallback);
538  }
539 
542 
545 
546  tResult CollectFrom(ISampleReader& oSampleReader);
547  void ClearQueues();
548 
549  template<typename ... T>
550  tResult GetSampleSetAt(tTimeStamp tmTime, T& ... samples)
551  {
552  constexpr size_t nSize = sizeof...(T);
553 
554  if (m_oQueues.size() < nSize)
555  {
556  RETURN_ERROR(ERR_INVALID_ARG);
557  }
558 
559  bool bSampleFound = false;
560  RETURN_IF_FAILED(GetSampleSetWithCheckAt(tmTime, bSampleFound, samples...));
561 
562  if (!bSampleFound)
563  {
564  RETURN_ERROR(ERR_NOT_FOUND);
565  }
566 
568  }
569 
570  private:
571  template<typename ... T>
572  tResult GetSampleSetWithCheckAt(tTimeStamp tmTime, bool& bSampleFound, adtf::ucom::iobject_ptr<const ant::ISample>& pSample, T & ... samples)
573  {
574  constexpr size_t nSize = sizeof...(T);
575 
576  auto& pQueue = m_oQueues[m_oQueues.size() - nSize - 1];
578  bSampleFound |= IS_OK(pQueue->GetSampleBeforeOrAt(tmTime, pSample, pNewStreamType));
579 
580  if (pNewStreamType && m_pTypeChangeCallback)
581  {
582  RETURN_IF_FAILED(m_pTypeChangeCallback(pQueue->GetReader(), pNewStreamType));
583  }
584 
585  return GetSampleSetWithCheckAt(tmTime, bSampleFound, samples...);
586  }
587 
588  tResult GetSampleSetWithCheckAt(tTimeStamp /* tmTime */, bool& /* bSampleFound */)
589  {
591  }
592 
593  private:
594  std::vector<std::shared_ptr<detail::cExternalSampleReaderQueue>> m_oQueues;
595  tStreamTypeChangeCallback m_pTypeChangeCallback;
596 };
597 
598 
599 }
600 
604 
605 } //namespace streaming
606 } //namespace adtf
A_UTILS_NS::cResult tResult
For backwards compatibility and to bring latest version into scope.
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
Interface to create a sample reader buffer.
The IStreamItem interface is the base type for all object which are passed through a stream.
Sample reader which allows the registration of external queue.
Definition: samplereader.h:498
Sample reader which always provides the last successful received sample.
Definition: samplereader.h:551
A class that collects the last samples from multiple sample readers.
tResult CollectFrom(cLastSampleReader &oSampleReader)
Adds a sample reader to the list of sample readers that data is read from.
tResult GetLastSamples(adtf::ucom::iobject_ptr< const ISample > &pSample, T &... samples)
Get the last samples from all sample readers if there is no new data in queue, the last sent data is ...
A class that collects the sample sets from multiple sample readers.
tResult GetNextSampleSet(T &...samples)
Get the next sample set from all sample readers at least one reader must have new data based on times...
void ClearQueues()
Clear all internal queues for all connected sample readers.
tResult CollectFrom(cExternalQueueSampleReader &oSampleReader)
Adds a sample reader to the list of sample readers that data is read from.
cSampleSetCollector(tStreamTypeChangeCallback pTypeChangeCallback)
Constructor with type change callback.
Default implementation of an StreamItem as container of the Sample Stream s Queue.
A class that collects the sample sets from multiple sample readers.
tResult GetSampleSetAt(tTimeStamp tmTime, T &... samples)
Get the next sample set from all sample readers.
void ClearQueues()
Clear all internal queues for all connected sample readers.
tResult CollectFrom(cExternalQueueSampleReader &oSampleReader)
Adds a sample reader to the list of sample readers that data is read from.
cTimeBasedSampleSetCollector(tStreamTypeChangeCallback pTypeChangeCallback)
Constructor with type change callback.
tResult Pop(IStreamItem &oStreamItem) override
Returns the next sample from the queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime) override
Push a new value to the internal sample queue.
Interface for sample reads that read from sample streams via input pins.
tResult GetLastSamples(adtf::ucom::iobject_ptr< const ant::ISample > &pSample, T &... samples)
Get the last samples from all sample readers if there is no new data in queue, the last sent data is ...
tResult CollectFrom(ISampleReader &oSampleReader)
Adds a sample reader to the list of sample readers that data is read from.
tResult Pop(IStreamItem &oStreamItem) override
Returns the next sample from the queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime) override
Push a new value to the internal sample queue.
virtual tResult Reset(const iobject_ptr< T > &i_oOther)=0
Reset this object_ptr<> with the content of another iobject_ptr<>
Base object pointer to realize binary compatible reference counting in interface methods.
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:163
Namespace for entire ADTF SDK.
Copyright © Audi Electronics Venture GmbH.