ADTF  3.14.3
samplereader.h
Go to the documentation of this file.
1 
7 #pragma once
8 #include "samplestreamer_intf.h"
9 #include "streamtype_intf.h"
10 #include "sample_intf.h"
11 #include "sample.h"
12 #include "streamtype.h"
14 #include "streammetatypeplain.h"
15 #include "pin.h"
16 #include "sampleinstream_intf.h"
17 #include "samplestream_intf.h"
18 #include "streamitem_intf.h"
20 #include "streamingrequests_intf.h"
21 
22 #include <adtf_utils.h>
23 #include <adtfucom3/adtf_ucom3.h>
24 #include <adtfbase/string_intf.h>
25 
26 namespace adtf
27 {
28 namespace streaming
29 {
30 namespace ant
31 {
32 
39 template <typename INTERFACE, typename PINTYPE>
40 class sample_streamer : public INTERFACE
41 {
42  public:
43  typedef PINTYPE pin_type;
44 protected:
46  adtf_util::cString m_strName;
51 
53  sample_streamer(const sample_streamer&) = delete;
60 
61 public:
63  sample_streamer() = default;
67  sample_streamer(const char* strName,
68  const ucom::ant::iobject_ptr<const IStreamType>& pStreamType) : m_strName(strName), m_pStreamType(pStreamType)
69  {
70  }
72  virtual ~sample_streamer()
73  {
74  ResetPin();
75  }
76 
77 public:
78 
84  void SetName(const char* strName)
85  {
86  m_strName = strName;
87  }
88 
96  {
97  if (m_poPin)
98  {
99  ResetPin();
100  }
101  if (pPin)
102  {
103  pPin->RegisterStreamer(*this);
104  m_poPin = pPin;
105  }
106  else
107  {
108  ResetPin();
109  }
110  }
116  void ResetPin()
117  {
118  if (m_poPin)
119  {
120  m_poPin->UnregisterStreamer(*this);
121  m_poPin = nullptr;
122  }
123  }
129  tResult GetName(base::ant::IString&& strName) const
130  {
131  return strName.Set(m_strName);
132  }
133 
141  {
142  if (pStreamType.Get())
143  {
144  m_pStreamType = ucom::make_object_ptr<cStreamType>(*pStreamType.Get());
145  }
146  else
147  {
148  m_pStreamType = nullptr;
149  }
150  RETURN_NOERROR;
151  }
152 
153 public: //implements ISampleStreamer
154  tResult GetType(ucom::ant::iobject_ptr<const IStreamType>& pStreamType) const override
155  {
156  RETURN_IF_FAILED(pStreamType.Reset(m_pStreamType));
157  RETURN_NOERROR;
158  }
159 
160  tResult EndStreaming() override { RETURN_NOERROR; }
161 };
162 
171 class cSampleReader : public sample_streamer<ISampleReader, cInPin>,
172  public ISampleReaderQueue,
173  public base::ant::runnable<base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink>
174 {
175 private:
179  std::mutex m_oReadLock;
183  std::function<tResult(const ucom::iobject_ptr<const IStreamType>& pStreamType)> m_fnAcceptTypeCallback;
185  std::function<tResult(tResult oStreamError)> m_fnStreamErrorCallback;
186 
191 
196  bool m_bValidType = true;
201 
204 
205 public:
212  {
213  }
214 
222  m_eAccessMode(eAccessMode)
223  {
224  }
225 
241  virtual tResult AcceptType(const ucom::ant::iobject_ptr<const IStreamType>& pStreamType)
242  {
243  m_pLastReadStreamType = pStreamType;
244  // may use a user implementation! which is set by function
246  {
247  return m_fnAcceptTypeCallback(pStreamType);
248  }
249  else if (m_pStreamType)
250  {
252  RETURN_IF_FAILED(m_pStreamType->GetMetaType(pMetaType));
253  return pMetaType->IsCompatible(*pStreamType.Get(), *m_pStreamType.Get());
254  }
255  else
256  {
257  RETURN_NOERROR;
258  }
259  }
260 
261 public:
262  tResult BeginStreaming(ISampleStream& oSampleStream) override
263  {
264  EndStreaming();
265 
266  // async mode is not working yet.
267  if (m_eAccessMode != ISampleStreamAccess::tMode::PushRead)
268  {
269  RETURN_ERROR_DESC(ERR_NOT_SUPPORTED, "Currently only PushRead mode is supported.");
270  }
271 
272  {
273  std::lock_guard<std::mutex> _sync(m_oReadLock);
274 
275  ISampleStream::IPushReadEventSink* pSink = this;
276  //@TODO : Something for TYPE Checking !!!
277  RETURN_IF_FAILED(oSampleStream.Open(m_strName,
278  m_pInStream,
280  pSink,
282  0));
283  }
284 
285  RETURN_NOERROR;
286  }
287 
288  tResult EndStreaming() override
289  {
290  {
291  std::lock_guard<std::mutex> _sync(m_oReadLock);
292  //this will automatically close
293  m_pInStream.Reset();
294  Clear();
295  }
296  RETURN_NOERROR;
297  }
298 
306  void SetAcceptTypeCallback(const std::function<
307  tResult(const ucom::iobject_ptr<const IStreamType>& pStreamType)> & fnAcceptTypeCallback )
308  {
309  m_fnAcceptTypeCallback = fnAcceptTypeCallback;
310  }
311 
316  void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback)
317  {
318  m_fnStreamErrorCallback = fnStreamErrorCallback;
319  }
320 
321 public:
323  tResult SetStreamError(const tResult& oError)
324  {
325  std::lock_guard<std::mutex> _sync(m_oReadLock);
326  if (m_pInStream)
327  {
328  //report the error !!
329  return m_pInStream->SetStreamError(oError);
330  }
331  RETURN_ERROR(ERR_NOT_CONNECTED);
332  }
333 
338  {
339  pSampleInStream.Reset(m_pInStream);
340  }
341 
346  {
348  {
350  }
351  else
352  {
353  pType.Reset(m_pStreamType);
354  }
355  }
356 
365  {
366  return ReadNextSample(pSample);
367  }
368 
369 protected:
371  tResult Push(tTimeStamp tmTimeofActivation)
372  {
373  //implementing Push
374  //this will be done sync
375  #ifdef _PIPES_DEBUG_LOG
376  LOG_DUMP("Run Push Event in Reader");
377  #endif
378  if (!m_pInStream)
379  {
380  RETURN_ERROR(ERR_NOT_CONNECTED);
381  }
382 
383  while (true)
384  {
385  cStreamItem oItem;
386  {
387  //very short LOCK !!
388  std::lock_guard<std::mutex> _sync(m_oReadLock);
389  if (IS_FAILED(m_pInStream->Read(oItem, tTimeStamp(0))))
390  {
391  break;
392  }
393  }
394 
395  RETURN_IF_FAILED(Push(oItem, tmTimeofActivation));
396  }
397 
398 
399  RETURN_NOERROR;
400  }
401 
402 protected:
413  {
414  if (!m_pInStream)
415  {
416  return ERR_NOT_CONNECTED;
417  }
418 
419  while (true)
420  {
421  cStreamItem oItem;
423  {
424  RETURN_IF_FAILED(m_pInStream->Read(oItem, tTimeStamp(0)));
425  }
426  else
427  {
428  RETURN_IF_FAILED(Pop(oItem));
429  }
430 
431  if (IS_OK(ProcessStreamItem(oItem)))
432  {
433  if (oItem.GetType() == IStreamItem::tType::Sample)
434  {
435  return oItem.GetSample(pSample);
436  }
437  }
438  }
439  }
440 
452  tResult ProcessStreamItem(const IStreamItem& oStreamItem)
453  {
454  switch (oStreamItem.GetType())
455  {
457  {
459  RETURN_IF_FAILED(oStreamItem.GetStreamType(pHelper));
460  tResult nError = AcceptType(pHelper);
461  if (IS_FAILED(nError))
462  {
463  m_bValidType = false;
464  RETURN_IF_FAILED(HandleStreamError(ADTF_BASE_COMPOSED_RESULT(nError, "Stream type is not accepted by reader.")));
465  }
466  else
467  {
468  m_bValidType = true;
469  }
470  break;
471  }
472  default:
473  {
474  if (!m_bValidType)
475  {
476  RETURN_ERROR(ERR_INVALID_TYPE);
477  }
478  break;
479  }
480  }
481 
482  RETURN_NOERROR;
483  }
484 
485 public:
486  void Reset() override
487  {
488  m_pLastReadStreamType.Reset();
489  Clear();
490  }
491 
492 public:
496 
497 protected:
498  tResult HandleStreamError(tResult nError)
499  {
501  {
502  return m_fnStreamErrorCallback(nError);
503  }
504  else
505  {
506  return SetStreamError(nError);
507  }
508  }
509 };
510 
511 
516 {
517 private:
519  std::set<ISampleReaderQueue*> m_lstExternalQueues;
520 
521 public:
523 
524  tResult Push(const IStreamItem& oStreamItem, tTimeStamp tsTime) override
525  {
526  for (auto pQueue : m_lstExternalQueues)
527  {
528  RETURN_IF_FAILED(pQueue->Push(oStreamItem, tsTime));
529  }
530 
531  RETURN_NOERROR;
532  };
533 
534  void Clear() override
535  {
536  for (auto pQueue: m_lstExternalQueues)
537  {
538  pQueue->Clear();
539  }
540  }
541 
542  tResult Pop(IStreamItem& /* oStreamItem */) override
543  {
544  RETURN_ERROR(ERR_EMPTY);
545  }
546 
547  tResult ReadAllAvailableItems()
548  {
549  return cSampleReader::Push(0);
550  }
551 
552 public:
553  void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
554  {
555  m_lstExternalQueues.insert(pExternalBuffer);
556  }
557 
558  void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
559  {
560  m_lstExternalQueues.erase(pExternalBuffer);
561  }
562 
563 };
564 
569 {
570 protected:
573 
574 public:
575 
577 
587  {
589  while (IS_OK(ReadNextSample(pDummy)));
590 
591  if (!m_pLastValidSample)
592  {
593  RETURN_ERROR(ERR_EMPTY);
594  }
595 
596  return pSample.Reset(m_pLastValidSample);
597  }
598 
599  void Reset() override
600  {
601  m_pLastValidSample = nullptr;
602  cSampleReader::Reset();
603  }
604 
605 protected:
606 
611  {
613  m_pLastValidSample.Reset(pSample);
614  RETURN_NOERROR;
615  }
616 
617 };
618 
623  public ISampleReaderQueue,
624  private adtf_util::lock_free_queue<cStreamItem>
625 {
627  typedef adtf_util::lock_free_queue<cStreamItem> base_type;
628 
629 public: // implements ISampleReaderQueue
630  tResult Push(const IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
631  {
632  return base_type::Push(oStreamItem);
633  }
634 
635  void Clear() override
636  {
637  cStreamItem oItem;
638  while (IS_OK(base_type::Pop(&oItem)));
639  }
640 
641  tResult Pop(IStreamItem& oStreamItem) override
642  {
643  cStreamItem oQueueItem;
644  auto oResult = base_type::Pop(&oQueueItem);
645  // this is a performance optimization in order to prevent the annotation
646  // in the RETURN_IF_FAILED macro below.
647  if (oResult == ERR_EMPTY)
648  {
649  return oResult;
650  }
651  RETURN_IF_FAILED(oResult);
652  return oQueueItem.CopyTo(oStreamItem);
653  }
654 
655 };
656 
657 
659  public ISampleReaderQueue
660 {
661  private:
662  std::mutex m_oQueueMutex;
663  std::deque<cStreamItem> m_oItems;
664  cStreamItem m_oLastType;
665  size_t m_nSampleCount = 0;
666 
667  public:
668  tResult Push(const IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
669  {
670  std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
671  m_oItems.emplace_back(oStreamItem);
672  if (oStreamItem.GetType() == IStreamItem::tType::Sample)
673  {
674  ++m_nSampleCount;
675  }
676  CheckQueue(m_oItems);
677  RETURN_NOERROR;
678  }
679 
680  void Clear() override
681  {
682  std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
683  m_oItems.clear();
684  m_oLastType = cStreamItem();
685  m_nSampleCount = 0;
686  }
687 
688  tResult Pop(IStreamItem& oItem)
689  {
690  std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
691  if (m_oLastType.GetType() == IStreamItem::tType::StreamType)
692  {
693  RETURN_IF_FAILED(m_oLastType.CopyTo(oItem));
694  m_oLastType = cStreamItem();
695  }
696  else
697  {
698  if (m_oItems.empty())
699  {
700  RETURN_ERROR(ERR_EMPTY);
701  }
702 
703  RETURN_IF_FAILED(m_oItems.front().CopyTo(oItem));
704  if (m_oItems.front().GetType() == IStreamItem::tType::Sample)
705  {
706  --m_nSampleCount;
707  }
708  m_oItems.pop_front();
709  }
710 
711  RETURN_NOERROR;
712  }
713 
714  protected:
715  virtual void CheckQueue(const std::deque<cStreamItem>& oItems) = 0;
716  void PopFront()
717  {
718  if (m_oItems.front().GetType() == IStreamItem::tType::StreamType)
719  {
720  m_oLastType = m_oItems.front();
721  }
722  else
723  {
724  --m_nSampleCount;
725  }
726  m_oItems.pop_front();
727  }
728 
729  size_t GetSampleCount()
730  {
731  return m_nSampleCount;
732  }
733 };
734 
735 template<size_t MaxSize>
738 {
739  protected:
740  void CheckQueue(const std::deque<cStreamItem>& oItems) override
741  {
742  while (!oItems.empty() &&
743  (GetSampleCount() > MaxSize ||
744  oItems.front().GetType() == IStreamItem::tType::StreamType))
745  {
746  PopFront();
747  }
748  }
749 };
750 
751 template<tTimeStamp TimeRange>
754 {
755  protected:
756  void CheckQueue(const std::deque<cStreamItem>& oItems) override
757  {
758  if (oItems.back().GetType() == IStreamItem::tType::Sample)
759  {
761  if (IS_OK(oItems.back().GetSample(pLastSample)))
762  {
763  tTimeStamp nLastTime = pLastSample->GetTime();
764  while (oItems.size() > 1)
765  {
766  auto& oItem = oItems.front();
767  if (oItem.GetType() == IStreamItem::tType::Sample)
768  {
770  if (IS_OK(oItem.GetSample(pSample)))
771  {
772  if (nLastTime - pSample->GetTime() < TimeRange)
773  {
774  break;
775  }
776  }
777 
778  }
779 
780  PopFront();
781  }
782  }
783  }
784  }
785 };
786 
792 template<typename INTERNAL_QUEUE,
793  bool STORE_LAST_SAMPLE = true,
794  ISampleStreamAccess::tMode ACCESS_MODE = ISampleStreamAccess::tMode::PushRead>
795 class sample_reader : public std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type
796 {
797 protected:
799  INTERNAL_QUEUE m_oInternalQueue;
800 
802  typedef typename std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type base_class;
803 public:
804 
805  sample_reader(): base_class(ACCESS_MODE)
806  {
807  }
808 
809  tResult Push(const IStreamItem& oItem, tTimeStamp tsTime) override
810  {
811  return m_oInternalQueue.Push(oItem, tsTime);
812  };
813 
814  void Clear() override
815  {
816  m_oInternalQueue.Clear();
817  };
818 
819  tResult Pop(IStreamItem& oItem) override
820  {
821  return m_oInternalQueue.Pop(oItem);
822  }
823 };
824 
832 
841 template<tTimeStamp TIME_RANGE, bool STORELASTSAMPLE = true>
843 
852 template<size_t MAX_SIZE, bool STORELASTSAMPLE=true>
854 
860 
870 {
872  pSample.Reset(pEmpty);
873  oSampleReader.GetNextSample(pSample);
874  return oSampleReader;
875 }
876 
885 template<typename DATATYPE>
887 {
889  if (IS_OK(oSampleReader.GetNextSample(pSample)))
890  {
891  oSampleData.Reset(pSample);
892  }
893  else
894  {
895  oSampleData.Reset();
896  }
897  return oSampleReader;
898 }
908 {
909  oSampleReader.GetLastType(pType);
910  return oSampleReader;
911 }
920 inline cSampleReader& operator>>(cSampleReader& oSampleReader, cSampleReader& (*pStreamfunc)(cSampleReader&))
921 {
922  return pStreamfunc(oSampleReader);
923 }
924 
932 inline tResult make_sample_reader(cSampleReader & oReader,
933  const char* strNameOfReader,
934  const ucom::iobject_ptr<const IStreamType>& pStreamType)
935 {
936  oReader.SetName(strNameOfReader);
937  return oReader.SetType(pStreamType);
938 }
939 
946 {
947 public:
948  using cExternalQueueSampleReader::cExternalQueueSampleReader;
949 
950  [[deprecated("The class 'cExternelQueueSampleReader' is deprecated. Please use 'cExternalQueueSampleReader' instead.")]]
953  {
954 
955  }
956 };
957 
958 } //namespace ant
959 
960 namespace flash
961 {
962 
967 {
968  public:
971  bool bStoreLastSample);
972 
973  ~cSampleReader() override;
974 
975  void SetName(const char* strName) override;
976  tResult GetName(base::ant::IString&& strName) override;
977 
978  tResult SetType(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType) override;
979  tResult GetType(ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType) const override;
980 
981  tResult BeginStreaming(ISampleStream& oSampleStream) override;
982  tResult EndStreaming() override;
983 
984  tResult SetStreamerPin(const ucom::ant::iobject_ptr<IStreamerPin>& pStreamerPin) override;
985 
989  void SetAcceptTypeCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)> & fnAcceptTypeCallback);
990 
994  void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback);
995  tResult SetStreamError(tResult oError) override;
996 
999 
1001  void Reset();
1002 
1003  tResult ReadAllAvailableItems() override;
1004 
1009  uint32_t nSubStreamId,
1010  const base::ant::IProperties* pRequestProperties = nullptr);
1011  void SetSynchronousTypeUpdateCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)>& fnRequestTypeUpdateCallback);
1012 
1013  protected:
1014  class cImplementation;
1015  std::unique_ptr<cImplementation> m_pImplementation;
1016 
1017 };
1018 
1026 template<typename INTERNAL_QUEUE,
1027  bool STORE_LAST_SAMPLE = true,
1028  ant::ISampleStreamAccess::tMode ACCESS_MODE = ant::ISampleStreamAccess::tMode::Push>
1030 {
1031  protected:
1032  INTERNAL_QUEUE m_oQueue;
1033 
1034  public:
1035  sample_reader(): cSampleReader(m_oQueue, ACCESS_MODE, STORE_LAST_SAMPLE)
1036  {}
1037 };
1038 
1039 
1040 
1042 {
1043  public:
1044  void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
1045  {
1046  m_oExternalQueues.insert(pExternalBuffer);
1047  }
1048 
1049  void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
1050  {
1051  m_oExternalQueues.erase(pExternalBuffer);
1052  }
1053 
1054  public:
1055  tResult Push(const IStreamItem& oStreamItem, tTimeStamp tsTime) override
1056  {
1057  for (auto pQueue : m_oExternalQueues)
1058  {
1059  RETURN_IF_FAILED(pQueue->Push(oStreamItem, tsTime));
1060  }
1061 
1062  RETURN_NOERROR;
1063  };
1064 
1065  void Clear() override
1066  {
1067  for (auto pQueue: m_oExternalQueues)
1068  {
1069  pQueue->Clear();
1070  }
1071  }
1072 
1073  tResult Pop(IStreamItem& /* oStreamItem */) override
1074  {
1075  RETURN_ERROR(ERR_EMPTY);
1076  }
1077 
1078  protected:
1080  std::set<ISampleReaderQueue*> m_oExternalQueues;
1081 };
1082 
1084 {
1085  protected:
1086  virtual ~IExternalReaderQueues() = default;
1087 
1088  public:
1089  virtual void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer) = 0;
1090  virtual void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer) = 0;
1091 };
1092 
1093 class cExternalQueueSampleReader : public sample_reader<cExternalQueuesWrapper, false>,
1094  public IExternalReaderQueues
1095 {
1096  public:
1097  void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer) override
1098  {
1099  m_oQueue.RegisterExternalQueue(pExternalBuffer);
1100  }
1101 
1102  void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer) override
1103  {
1104  m_oQueue.UnregisterExternalQueue(pExternalBuffer);
1105  }
1106 };
1107 
1113 
1114 
1119 template<tTimeStamp TIME_RANGE, bool STORELASTSAMPLE = true>
1121 
1126 template<size_t MAX_SIZE, bool STORELASTSAMPLE=true>
1128 
1134 
1135 inline tResult make_sample_reader(ISampleReader& oReader,
1136  const char* strNameOfReader,
1137  const ucom::iobject_ptr<const IStreamType>& pStreamType)
1138 {
1139  oReader.SetName(strNameOfReader);
1140  return oReader.SetType(pStreamType);
1141 }
1142 
1143 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::ISample>& pSample)
1144 {
1146  pSample.Reset(pEmpty);
1147  oSampleReader.GetNextSample(pSample);
1148  return oSampleReader;
1149 }
1150 
1151 template<typename DATATYPE>
1152 ISampleReader& operator>>(ISampleReader& oSampleReader, sample_data<DATATYPE>& oSampleData)
1153 {
1155  if (IS_OK(oSampleReader.GetNextSample(pSample)))
1156  {
1157  oSampleData.Reset(pSample);
1158  }
1159  else
1160  {
1161  oSampleData.Reset();
1162  }
1163  return oSampleReader;
1164 }
1165 
1166 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::IStreamType>& pType)
1167 {
1168  oSampleReader.GetLastType(pType);
1169  return oSampleReader;
1170 }
1171 
1172 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ISampleReader& (*pStreamfunc)(ISampleReader&))
1173 {
1174  return pStreamfunc(oSampleReader);
1175 }
1176 
1177 }
1178 
1179 namespace kiwi
1180 {
1181 
1186 {
1187 public:
1192 
1196  ~cNullReader() override;
1197 
1198  void SetName(const char* strName) override;
1199  tResult GetName(base::ant::IString&& strName) override;
1200  tResult SetType(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType) override;
1201  tResult GetType(ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType) const override;
1202  tResult BeginStreaming(ant::ISampleStream& pStream) override;
1203  tResult EndStreaming() override;
1204  tResult SetStreamerPin(const ucom::ant::iobject_ptr<flash::IStreamerPin>& pStreamerPin) override;
1205 
1209  tResult SetStreamError(tResult oError) override;
1210  tResult ReadAllAvailableItems() override;
1211 
1212  void SetAcceptTypeCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)> & fnAcceptTypeCallback);
1213  void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback);
1214 
1215 protected:
1216  class cImplementation;
1217  std::unique_ptr<cImplementation> m_pImplementation;
1218 };
1219 
1220 }
1221 
1225 
1226 using flash::cSampleReader;
1229 using flash::sample_reader;
1234 
1235 using flash::make_sample_reader;
1236 
1237 using kiwi::cNullReader;
1238 
1239 
1240 } //namespace streaming
1241 } //namespace adtf
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_BASE_COMPOSED_RESULT(_result,...)
for internal use
Defintion of a property set container interface.
The IString interface provides methods for getting and setting strings through abstract interfaces.
Definition: string_intf.h:24
Runnable helper implementaton template.
Definition: runnable.h:58
Interface to create a sample reader buffer.
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
Access definiton for the ISampleStream::Open.
@ AsyncQueue
Asynch Operation mode for the ISampleStream::Open (Not implemented yet). See Asynchronous Mode - Asyn...
Interface of the SampleStream.
virtual tResult Open(const char *strName, adtf::ucom::ant::iobject_ptr< ISampleInStream > &pInStream, const adtf::ucom::ant::iobject_ptr< const IStreamType > &pInitialAcceptedStreamType, IPushReadEventSink *&pPushEventSink, ISampleStreamAccess::tMode ui32Mode, size_t szQueueSize)=0
Opens The SampleStream for reading access.
The IStreamItem interface is the base type for all object which are passed through a stream.
virtual tType GetType() const =0
Retrieves the type of the sample item.
virtual tResult GetStreamType(ucom::ant::iobject_ptr< const IStreamType > &pStreamType) const =0
Retrieves the StreamType of the StreamItem if GetType is tType::StreamType.
@ StreamType
item is a IStreamType. Mind: All StreamType changes will be queue too !!
@ Sample
item is a queue item contains a ISample
Implementation of a adtf::streaming::ant::ISampleReaderQueue with dynamic growing sample queue.
Definition: samplereader.h:625
adtf_util::lock_free_queue< cStreamItem > base_type
base type of cDynamicSampleReaderQueue
Definition: samplereader.h:627
tResult Pop(IStreamItem &oStreamItem) override
Returns the next sample from the queue.
Definition: samplereader.h:641
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
Definition: samplereader.h:630
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Definition: samplereader.h:635
Sample reader which allows the registration of external queue.
Definition: samplereader.h:516
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
Definition: samplereader.h:542
std::set< ISampleReaderQueue * > m_lstExternalQueues
A set of other registered buffer.
Definition: samplereader.h:519
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
Definition: samplereader.h:524
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Definition: samplereader.h:534
Sample reader which allows the registration of external queue.
Definition: samplereader.h:946
Sample reader which always provides the last successful received sample.
Definition: samplereader.h:569
tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample) override
Collect the next sample by overriding the adtf::streaming::ant::cSampleReader::ReadNextSample.
Definition: samplereader.h:610
ucom::object_ptr< const ISample > m_pLastValidSample
Last Sample Reference.
Definition: samplereader.h:572
tResult GetLastSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Returns the last Sample received.
Definition: samplereader.h:586
tResult Pop(IStreamItem &oItem)
Returns the next sample from the queue.
Definition: samplereader.h:688
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
Definition: samplereader.h:668
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Definition: samplereader.h:680
The default Sample Reader will read the incomung Stream of a IInPin.
Definition: samplereader.h:174
ISampleStreamAccess::tMode m_eAccessMode
current access method
Definition: samplereader.h:203
void SetAcceptTypeCallback(const std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
Definition: samplereader.h:306
sample_streamer< ISampleReader, cInPin > base_type
base type of cSampleReader
Definition: samplereader.h:181
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Gets the next Sample within internal queue which not has been read.
Definition: samplereader.h:364
void GetSampleInStream(ucom::ant::iobject_ptr< ISampleInStream > &pSampleInStream)
Returns the connected ISampleInStream.
Definition: samplereader.h:337
std::mutex m_oReadLock
Read synchronization.
Definition: samplereader.h:179
ucom::object_ptr< ISampleInStream > m_pInStream
Reader where to read.
Definition: samplereader.h:177
std::function< tResult(tResult oStreamError)> m_fnStreamErrorCallback
Callback to react on stream errors.
Definition: samplereader.h:185
bool m_bValidType
state wether the current type is a valid type or not this will be set to false in AcceptType fails.
Definition: samplereader.h:196
base::ant::runnable< base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink > runnable_type
internal base type.
Definition: samplereader.h:190
ucom::object_ptr< const IStreamType > m_pLastReadStreamType
Last accepted incoming type.
Definition: samplereader.h:200
tResult SetStreamError(const tResult &oError)
Forward an error to the corresponding stream.
Definition: samplereader.h:323
tResult EndStreaming() override
Sample Stream disconnected.
Definition: samplereader.h:288
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> m_fnAcceptTypeCallback
Callback to reject type changes.
Definition: samplereader.h:183
virtual tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
This will read the stream items until a sample is reached and return it.
Definition: samplereader.h:412
void GetLastType(ucom::ant::iobject_ptr< const IStreamType > &pType)
Returns the connected ISampleInStream.
Definition: samplereader.h:345
virtual tResult AcceptType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Accept or reject a new stream type - see also AcceptType and IsCompatible implementations.
Definition: samplereader.h:241
cSampleReader(ISampleStreamAccess::tMode eAccessMode)
Default CTOR which defines the access method.
Definition: samplereader.h:220
tResult ProcessStreamItem(const IStreamItem &oStreamItem)
This will process the stream items.
Definition: samplereader.h:452
tResult Push(tTimeStamp tmTimeofActivation)
internal Push operation to implement pushread mode
Definition: samplereader.h:371
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
Definition: samplereader.h:316
tResult BeginStreaming(ISampleStream &oSampleStream) override
BeginStreaming will open the given Sample Stream for Reading while a connection is establishing.
Definition: samplereader.h:262
Default implementation of an StreamItem as container of the Sample Stream s Queue.
tResult GetSample(ucom::ant::iobject_ptr< const ISample > &pSample) const
Retrieves the sample of the StreamItem if GetType is tType::Sample.
tType GetType() const
Retrieves the type of the sample item.
tResult CopyTo(IStreamItem &oDest) const
Copy its content to the oDest container.
Sample Data getter for an easy use of samples with samplebuffer set to the type T.
Definition: sample.h:167
tResult Reset(const ucom::ant::iobject_ptr< const ISample > &pSample)
Resets the sample the sample data reference to with a new reference to a sample.
Definition: sample.h:285
A Possible Sample Reader of a Trigger Function! Sample reader with a internal queue,...
Definition: samplereader.h:796
INTERNAL_QUEUE m_oInternalQueue
A internal sample queue.
Definition: samplereader.h:799
std::conditional< STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader >::type base_class
base class
Definition: samplereader.h:802
Helper template can be used to implement ISampleStreamer.
Definition: samplereader.h:41
void ResetPin()
Resets the pin reference to nullptr.
Definition: samplereader.h:116
sample_streamer(const char *strName, const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
CTOR initializer.
Definition: samplereader.h:67
sample_streamer(const sample_streamer &)=delete
deleted copy CTOR
ucom::object_ptr< PINTYPE > m_poPin
pin reference reading/writing from
Definition: samplereader.h:50
void SetName(const char *strName)
Sets the name of the streamer.
Definition: samplereader.h:84
sample_streamer & operator=(sample_streamer &&)=delete
deleted move operator
tResult SetType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Sets the StreamType of the streamer.
Definition: samplereader.h:140
sample_streamer(sample_streamer &&)=delete
deleted move CTOR
ucom::object_ptr< const IStreamType > m_pStreamType
stream type of the streamer
Definition: samplereader.h:48
void ResetPin(ucom::ant::object_ptr< PINTYPE > &pPin)
Resets the pin reference This is only internaly used.
Definition: samplereader.h:95
adtf_util::cString m_strName
name of the streamer (used i.e. to create the pins name)
Definition: samplereader.h:46
sample_streamer & operator=(const sample_streamer &)=delete
deleted copy operator
tResult GetName(base::ant::IString &&strName) const
Gets the name of the streamer.
Definition: samplereader.h:129
Interface for sample reads that read from sample streams via input pins.
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample)=0
Reads the next available sample from the associated sample stream.
virtual void SetName(const char *strName)=0
Sets the name of the streamer.
virtual tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)=0
Sets the initial stream type of a streamer.
std::set< ISampleReaderQueue * > m_oExternalQueues
A set of other registered buffer.
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
The default Sample Reader will read the incomung Stream of a IInPin.
Definition: samplereader.h:967
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the last available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult RequestSamples(ucom::ant::iobject_ptr< hollow::IStreamingRequest > &pRequest, uint32_t nSubStreamId, const base::ant::IProperties *pRequestProperties=nullptr)
RequestSamples of the given Substream to be generated and/or transmitted.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
void SetAcceptTypeCallback(const std::function< tResult(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
tResult BeginStreaming(ISampleStream &oSampleStream) override
Begin streaming on the given sample stream.
Reads and stores Samples within the given queue implementation INTERNAL_QUEUE.
A reader that does not read anything.
tResult BeginStreaming(ant::ISampleStream &pStream) override
Begin streaming on the given sample stream.
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the last available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< flash::IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
~cNullReader() override
Destructor.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
virtual tResult Reset(const iobject_ptr< T > &i_oOther)=0
Reset this object_ptr<> with the content of another iobject_ptr<>
virtual T * Get() const =0
Get raw pointer to shared object.
Base object pointer to realize binary compatible reference counting in interface methods.
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:158
#define LOG_DUMP(...)
Log a message with LogLevel Dump (only in Debug mode)
Definition: log.h:119
tResult make_sample_reader(cSampleReader &oReader, const char *strNameOfReader, const ucom::iobject_ptr< const IStreamType > &pStreamType)
Initializes a cSampleReader with given name and StreamType param[in] oReader Reader to intialize para...
Definition: samplereader.h:932
const ISampleInStream & operator>>(const ISampleInStream &oStreamReader, IStreamItem &oItem)
Streaming Operator>> to read a sample from the readers queue.
sample_reader< ant::cDynamicSampleReaderQueue > cDynamicSampleReader
The cDynamicSampleReader will create a sample reader which will create a internal sample queue with u...
sample_reader< ant::time_limited_sample_reader_queue< TIME_RANGE >, STORELASTSAMPLE > time_limited_sample_reader
The time_limited_sample_reader will create a sample reader which will create a internal sample queue ...
size_limited_sample_reader< 1 > cSingleSampleReader
The cSingleSampleReader will create a sample reader which will create a internal sample queue with on...
sample_reader< ant::size_limited_sample_reader_queue< MAX_SIZE >, STORELASTSAMPLE > size_limited_sample_reader
The size_limited_sample_reader will create a sample reader which will create a internal sample queue ...
Namespace for entire ADTF SDK.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_RUN_FUNCTION(_fcName_)
Helper Macro to define Run function for adtf::base::ant::runnable<>
Definition: runnable.h:196
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define RETURN_IF_FAILED(s)
returns if the expression returns a failed tResult or throws an exception.

Copyright © Audi Electronics Venture GmbH. All rights reserved. (Generated on Thu Jun 9 2022 by doxygen 1.9.1)