ADTF
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
samplereader.h
Go to the documentation of this file.
1
7#pragma once
9#include "streamtype_intf.h"
10#include "sample_intf.h"
11#include "sample.h"
12#include "streamtype.h"
14#include "streammetatypeplain.h"
15#include "pin.h"
16#include "sampleinstream_intf.h"
17#include "samplestream_intf.h"
18#include "streamitem_intf.h"
21
22#include <adtf_utils.h>
25
26namespace adtf
27{
28namespace streaming
29{
30namespace ant
31{
32
39template <typename INTERFACE, typename PINTYPE>
40class sample_streamer : public INTERFACE
41{
42 public:
43 typedef PINTYPE pin_type;
44protected:
46 adtf_util::cString m_strName;
51
60
61public:
63 sample_streamer() = default;
67 sample_streamer(const char* strName,
68 const ucom::ant::iobject_ptr<const IStreamType>& pStreamType) : m_strName(strName), m_pStreamType(pStreamType)
69 {
70 }
71
73 {
74 ResetPin();
75 }
76
77public:
78
84 void SetName(const char* strName)
85 {
86 m_strName = strName;
87 }
88
96 {
97 if (m_poPin)
98 {
99 ResetPin();
100 }
101 if (pPin)
102 {
103 pPin->RegisterStreamer(*this);
104 m_poPin = pPin;
105 }
106 else
107 {
108 ResetPin();
109 }
110 }
111
116 void ResetPin()
117 {
118 if (m_poPin)
119 {
120 m_poPin->UnregisterStreamer(*this);
121 m_poPin = nullptr;
122 }
123 }
124
130 {
131 return strName.Set(m_strName);
132 }
133
141 {
142 if (pStreamType.Get())
143 {
145 }
146 else
147 {
148 m_pStreamType = nullptr;
149 }
151 }
152
153public: //implements ISampleStreamer
154 tResult GetType(ucom::ant::iobject_ptr<const IStreamType>& pStreamType) const override
155 {
156 RETURN_IF_FAILED(pStreamType.Reset(m_pStreamType));
158 }
159
160 tResult EndStreaming() override { RETURN_NOERROR; }
161};
162
171class cSampleReader : public sample_streamer<ISampleReader, cInPin>,
172 public ISampleReaderQueue,
173 public base::ant::runnable<base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink>
174{
175private:
179 std::mutex m_oReadLock;
185 std::function<tResult(tResult oStreamError)> m_fnStreamErrorCallback;
186
191
196 bool m_bValidType = true;
201
204
205public:
214
225
242
243public:
244 tResult BeginStreaming(ISampleStream& oSampleStream) override
245 {
246 EndStreaming();
247
248 // async mode is not working yet.
250 {
251 RETURN_ERROR_DESC(ERR_NOT_SUPPORTED, "Currently only PushRead mode is supported.");
252 }
253
254 {
255 std::lock_guard<std::mutex> _sync(m_oReadLock);
256
258 //@TODO : Something for TYPE Checking !!!
259 RETURN_IF_FAILED(oSampleStream.Open(m_strName,
262 pSink,
264 0));
265 }
266
268 }
269
271 {
272 {
273 std::lock_guard<std::mutex> _sync(m_oReadLock);
274 //this will automatically close
275 m_pInStream.Reset();
276 Clear();
277 }
279 }
280
288 void SetAcceptTypeCallback(const std::function<
289 tResult(const ucom::iobject_ptr<const IStreamType>& pStreamType)> & fnAcceptTypeCallback )
290 {
291 m_fnAcceptTypeCallback = fnAcceptTypeCallback;
292 }
293
298 void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback)
299 {
300 m_fnStreamErrorCallback = fnStreamErrorCallback;
301 }
302
303public:
306 {
307 std::lock_guard<std::mutex> _sync(m_oReadLock);
308 if (m_pInStream)
309 {
310 //report the error !!
311 return m_pInStream->SetStreamError(oError);
312 }
313 RETURN_ERROR(ERR_NOT_CONNECTED);
314 }
315
320 {
321 pSampleInStream.Reset(m_pInStream);
322 }
323
328 {
330 {
332 }
333 else
334 {
335 pType.Reset(m_pStreamType);
336 }
337 }
338
347 {
348 return ReadNextSample(pSample);
349 }
350
351protected:
353 tResult Push(tTimeStamp tmTimeofActivation)
354 {
355 //implementing Push
356 //this will be done sync
357 #ifdef _PIPES_DEBUG_LOG
358 LOG_DUMP("Run Push Event in Reader");
359 #endif
360 if (!m_pInStream)
361 {
362 RETURN_ERROR(ERR_NOT_CONNECTED);
363 }
364
365 while (true)
366 {
367 cStreamItem oItem;
368 {
369 //very short LOCK !!
370 std::lock_guard<std::mutex> _sync(m_oReadLock);
371 if (IS_FAILED(m_pInStream->Read(oItem, tTimeStamp(0))))
372 {
373 break;
374 }
375 }
376
377 RETURN_IF_FAILED(Push(oItem, tmTimeofActivation));
378 }
379
380
382 }
383
384protected:
395 {
396 if (!m_pInStream)
397 {
398 return ERR_NOT_CONNECTED;
399 }
400
401 while (true)
402 {
403 cStreamItem oItem;
405 {
406 RETURN_IF_FAILED(m_pInStream->Read(oItem, tTimeStamp(0)));
407 }
408 else
409 {
410 RETURN_IF_FAILED(Pop(oItem));
411 }
412
413 if (IS_OK(ProcessStreamItem(oItem)))
414 {
415 if (oItem.GetType() == IStreamItem::tType::Sample)
416 {
417 return oItem.GetSample(pSample);
418 }
419 }
420 }
421 }
422
435 {
436 switch (oStreamItem.GetType())
437 {
439 {
441 RETURN_IF_FAILED(oStreamItem.GetStreamType(pHelper));
442 tResult nError = AcceptType(pHelper);
443 if (IS_FAILED(nError))
444 {
445 m_bValidType = false;
446 RETURN_IF_FAILED(HandleStreamError(ADTF_BASE_COMPOSED_RESULT(nError, "Stream type is not accepted by reader.")));
447 }
448 else
449 {
450 m_bValidType = true;
451 }
452 break;
453 }
454 default:
455 {
456 if (!m_bValidType)
457 {
458 RETURN_ERROR(ERR_INVALID_TYPE);
459 }
460 break;
461 }
462 }
463
465 }
466
467public:
468 void Reset() override
469 {
470 m_pLastReadStreamType.Reset();
471 Clear();
472 }
473
474public:
478
479protected:
480 tResult HandleStreamError(tResult nError)
481 {
483 {
484 return m_fnStreamErrorCallback(nError);
485 }
486 else
487 {
488 return SetStreamError(nError);
489 }
490 }
491};
492
493
498{
499private:
501 std::set<ISampleReaderQueue*> m_lstExternalQueues;
502
503public:
505
506 tResult Push(const IStreamItem& oStreamItem, tTimeStamp tsTime) override
507 {
508 for (auto pQueue : m_lstExternalQueues)
509 {
510 RETURN_IF_FAILED(pQueue->Push(oStreamItem, tsTime));
511 }
512
514 };
515
516 void Clear() override
517 {
518 for (auto pQueue: m_lstExternalQueues)
519 {
520 pQueue->Clear();
521 }
522 }
523
524 tResult Pop(IStreamItem& /* oStreamItem */) override
525 {
526 RETURN_ERROR(ERR_EMPTY);
527 }
528
529 tResult ReadAllAvailableItems()
530 {
531 return cSampleReader::Push(0);
532 }
533
534public:
535 void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
536 {
537 m_lstExternalQueues.insert(pExternalBuffer);
538 }
539
540 void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
541 {
542 m_lstExternalQueues.erase(pExternalBuffer);
543 }
544
545};
546
551{
552protected:
555
556public:
557
559
569 {
571 while (IS_OK(ReadNextSample(pDummy)));
572
574 {
575 RETURN_ERROR(ERR_EMPTY);
576 }
577
578 return pSample.Reset(m_pLastValidSample);
579 }
580
581 void Reset() override
582 {
583 m_pLastValidSample = nullptr;
584 cSampleReader::Reset();
585 }
586
587protected:
588
598
599};
600
605 public ISampleReaderQueue,
606 private adtf_util::lock_free_queue<cStreamItem>
607{
609 typedef adtf_util::lock_free_queue<cStreamItem> base_type;
610
611public: // implements ISampleReaderQueue
612 tResult Push(const IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
613 {
614 return base_type::Push(oStreamItem);
615 }
616
617 void Clear() override
618 {
619 cStreamItem oItem;
620 while (IS_OK(base_type::Pop(&oItem)));
621 }
622
623 tResult Pop(IStreamItem& oStreamItem) override
624 {
625 cStreamItem oQueueItem;
626 auto oResult = base_type::Pop(&oQueueItem);
627 // this is a performance optimization in order to prevent the annotation
628 // in the RETURN_IF_FAILED macro below.
629 if (oResult == ERR_EMPTY)
630 {
631 return oResult;
632 }
633 RETURN_IF_FAILED(oResult);
634 return oQueueItem.CopyTo(oStreamItem);
635 }
636
637};
638
639
641 public ISampleReaderQueue
642{
643 private:
644 std::mutex m_oQueueMutex;
645 std::deque<cStreamItem> m_oItems;
646 cStreamItem m_oLastType;
647 size_t m_nSampleCount = 0;
648
649 public:
650 tResult Push(const IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
651 {
652 std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
653 m_oItems.emplace_back(oStreamItem);
654 if (oStreamItem.GetType() == IStreamItem::tType::Sample)
655 {
656 ++m_nSampleCount;
657 }
658 CheckQueue(m_oItems);
660 }
661
662 void Clear() override
663 {
664 std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
665 m_oItems.clear();
666 m_oLastType = cStreamItem();
667 m_nSampleCount = 0;
668 }
669
671 {
672 std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
673 if (m_oLastType.GetType() == IStreamItem::tType::StreamType)
674 {
675 RETURN_IF_FAILED(m_oLastType.CopyTo(oItem));
676 m_oLastType = cStreamItem();
677 }
678 else
679 {
680 if (m_oItems.empty())
681 {
682 RETURN_ERROR(ERR_EMPTY);
683 }
684
685 RETURN_IF_FAILED(m_oItems.front().CopyTo(oItem));
686 if (m_oItems.front().GetType() == IStreamItem::tType::Sample)
687 {
688 --m_nSampleCount;
689 }
690 m_oItems.pop_front();
691 }
692
694 }
695
696 protected:
697 virtual void CheckQueue(const std::deque<cStreamItem>& oItems) = 0;
698 void PopFront()
699 {
700 if (m_oItems.front().GetType() == IStreamItem::tType::StreamType)
701 {
702 m_oLastType = m_oItems.front();
703 }
704 else
705 {
706 --m_nSampleCount;
707 }
708 m_oItems.pop_front();
709 }
710
711 size_t GetSampleCount()
712 {
713 return m_nSampleCount;
714 }
715};
716
717template<size_t MaxSize>
720{
721 protected:
722 void CheckQueue(const std::deque<cStreamItem>& oItems) override
723 {
724 while (!oItems.empty() &&
725 (GetSampleCount() > MaxSize ||
726 oItems.front().GetType() == IStreamItem::tType::StreamType))
727 {
728 PopFront();
729 }
730 }
731};
732
733template<tTimeStamp TimeRange>
736{
737 protected:
738 void CheckQueue(const std::deque<cStreamItem>& oItems) override
739 {
740 if (oItems.back().GetType() == IStreamItem::tType::Sample)
741 {
743 if (IS_OK(oItems.back().GetSample(pLastSample)))
744 {
745 tTimeStamp nLastTime = pLastSample->GetTime();
746 while (oItems.size() > 1)
747 {
748 auto& oItem = oItems.front();
749 if (oItem.GetType() == IStreamItem::tType::Sample)
750 {
752 if (IS_OK(oItem.GetSample(pSample)))
753 {
754 if (nLastTime - pSample->GetTime() < TimeRange)
755 {
756 break;
757 }
758 }
759
760 }
761
762 PopFront();
763 }
764 }
765 }
766 }
767};
768
774template<typename INTERNAL_QUEUE,
775 bool STORE_LAST_SAMPLE = true,
777class sample_reader : public std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type
778{
779protected:
781 INTERNAL_QUEUE m_oInternalQueue;
782
784 typedef typename std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type base_class;
785public:
786
787 sample_reader(): base_class(ACCESS_MODE)
788 {
789 }
790
791 tResult Push(const IStreamItem& oItem, tTimeStamp tsTime) override
792 {
793 return m_oInternalQueue.Push(oItem, tsTime);
794 };
795
796 void Clear() override
797 {
798 m_oInternalQueue.Clear();
799 };
800
801 tResult Pop(IStreamItem& oItem) override
802 {
803 return m_oInternalQueue.Pop(oItem);
804 }
805};
806
814
823template<tTimeStamp TIME_RANGE, bool STORELASTSAMPLE = true>
825
834template<size_t MAX_SIZE, bool STORELASTSAMPLE=true>
836
842
852{
854 pSample.Reset(pEmpty);
855 oSampleReader.GetNextSample(pSample);
856 return oSampleReader;
857}
858
867template<typename DATATYPE>
869{
871 if (IS_OK(oSampleReader.GetNextSample(pSample)))
872 {
873 oSampleData.Reset(pSample);
874 }
875 else
876 {
877 oSampleData.Reset();
878 }
879 return oSampleReader;
880}
881
890{
891 oSampleReader.GetLastType(pType);
892 return oSampleReader;
893}
894
902inline cSampleReader& operator>>(cSampleReader& oSampleReader, cSampleReader& (*pStreamfunc)(cSampleReader&))
903{
904 return pStreamfunc(oSampleReader);
905}
906
915 const char* strNameOfReader,
916 const ucom::iobject_ptr<const IStreamType>& pStreamType)
917{
918 oReader.SetName(strNameOfReader);
919 return oReader.SetType(pStreamType);
920}
921
927class cExternelQueueSampleReader : public cExternalQueueSampleReader
928{
929public:
930 using cExternalQueueSampleReader::cExternalQueueSampleReader;
931
932 ADTF3_DEPRECATED("The class 'cExternelQueueSampleReader' is deprecated. Please use 'cExternalQueueSampleReader' instead.")
933 cExternelQueueSampleReader()
935 {
936
937 }
938};
939
940} //namespace ant
941
942namespace flash
943{
944
948class cSampleReader: public ISampleReader
949{
950 public:
951 cSampleReader(ant::ISampleReaderQueue& oQueue,
953 bool bStoreLastSample);
954
955 ~cSampleReader() override;
956
957 void SetName(const char* strName) override;
958 tResult GetName(base::ant::IString&& strName) override;
959
962
963 tResult BeginStreaming(ISampleStream& oSampleStream) override;
965
967
971 void SetAcceptTypeCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)> & fnAcceptTypeCallback);
972
976 void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback);
978
981
983 void Reset();
984
986
991 uint32_t nSubStreamId,
992 const base::ant::IProperties* pRequestProperties = nullptr);
993 void SetSynchronousTypeUpdateCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)>& fnRequestTypeUpdateCallback);
994
995 protected:
996 class cImplementation;
997 std::unique_ptr<cImplementation> m_pImplementation;
998
999};
1000
1008template<typename INTERNAL_QUEUE,
1009 bool STORE_LAST_SAMPLE = true,
1011class sample_reader : public cSampleReader
1012{
1013 protected:
1014 INTERNAL_QUEUE m_oQueue;
1015
1016 public:
1017 sample_reader(): cSampleReader(m_oQueue, ACCESS_MODE, STORE_LAST_SAMPLE)
1018 {}
1019};
1020
1021
1022
1024{
1025 public:
1026 void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
1027 {
1028 m_oExternalQueues.insert(pExternalBuffer);
1029 }
1030
1031 void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
1032 {
1033 m_oExternalQueues.erase(pExternalBuffer);
1034 }
1035
1036 public:
1037 tResult Push(const IStreamItem& oStreamItem, tTimeStamp tsTime) override
1038 {
1039 for (auto pQueue : m_oExternalQueues)
1040 {
1041 RETURN_IF_FAILED(pQueue->Push(oStreamItem, tsTime));
1042 }
1043
1045 };
1046
1047 void Clear() override
1048 {
1049 for (auto pQueue: m_oExternalQueues)
1050 {
1051 pQueue->Clear();
1052 }
1053 }
1054
1055 tResult Pop(IStreamItem& /* oStreamItem */) override
1056 {
1057 RETURN_ERROR(ERR_EMPTY);
1058 }
1059
1060 protected:
1062 std::set<ISampleReaderQueue*> m_oExternalQueues;
1063};
1064
1066{
1067 protected:
1068 virtual ~IExternalReaderQueues() = default;
1069
1070 public:
1071 virtual void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer) = 0;
1072 virtual void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer) = 0;
1073};
1074
1075class cExternalQueueSampleReader : public sample_reader<cExternalQueuesWrapper, false>,
1076 public IExternalReaderQueues
1077{
1078 public:
1079 void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer) override
1080 {
1081 m_oQueue.RegisterExternalQueue(pExternalBuffer);
1082 }
1083
1084 void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer) override
1085 {
1086 m_oQueue.UnregisterExternalQueue(pExternalBuffer);
1087 }
1088};
1089
1095
1096
1101template<tTimeStamp TIME_RANGE, bool STORELASTSAMPLE = true>
1103
1108template<size_t MAX_SIZE, bool STORELASTSAMPLE=true>
1110
1116
1117inline tResult make_sample_reader(ISampleReader& oReader,
1118 const char* strNameOfReader,
1119 const ucom::iobject_ptr<const IStreamType>& pStreamType)
1120{
1121 oReader.SetName(strNameOfReader);
1122 return oReader.SetType(pStreamType);
1123}
1124
1125inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::ISample>& pSample)
1126{
1128 pSample.Reset(pEmpty);
1129 oSampleReader.GetNextSample(pSample);
1130 return oSampleReader;
1131}
1132
1133template<typename DATATYPE>
1134ISampleReader& operator>>(ISampleReader& oSampleReader, sample_data<DATATYPE>& oSampleData)
1135{
1137 if (IS_OK(oSampleReader.GetNextSample(pSample)))
1138 {
1139 oSampleData.Reset(pSample);
1140 }
1141 else
1142 {
1143 oSampleData.Reset();
1144 }
1145 return oSampleReader;
1146}
1147
1148inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::IStreamType>& pType)
1149{
1150 oSampleReader.GetLastType(pType);
1151 return oSampleReader;
1152}
1153
1154inline ISampleReader& operator>>(ISampleReader& oSampleReader, ISampleReader& (*pStreamfunc)(ISampleReader&))
1155{
1156 return pStreamfunc(oSampleReader);
1157}
1158
1159}
1160
1161namespace kiwi
1162{
1163
1168{
1169public:
1174
1178 ~cNullReader() override;
1179
1180 void SetName(const char* strName) override;
1187
1193
1194 void SetAcceptTypeCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)> & fnAcceptTypeCallback);
1195 void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback);
1196
1197protected:
1198 class cImplementation;
1199 std::unique_ptr<cImplementation> m_pImplementation;
1200};
1201
1202}
1203
1207
1216
1217using flash::make_sample_reader;
1218
1219using kiwi::cNullReader;
1220
1221
1222} //namespace streaming
1223} //namespace adtf
#define ADTF3_DEPRECATED(_depr_message_)
Mark a function or variable as deprecated.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_BASE_COMPOSED_RESULT(_result,...)
for internal use
A_UTILS_NS::cResult tResult
For backwards compatibility and to bring latest version into scope.
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
Defintion of a property set container interface.
The IString interface provides methods for getting and setting strings through abstract interfaces.
Definition string_intf.h:28
Runnable helper implementaton template.
Definition runnable.h:58
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
Interface to create a sample reader buffer.
Access definiton for the ISampleStream::Open.
@ PushRead
PushRead Operation mode for the ISampleStream::Open. See PushRead Mode - Synchronous Data Pipes.
@ Push
Push Operation without intermediate reader queue, this provides the lowest latency....
@ AsyncQueue
Asynch Operation mode for the ISampleStream::Open (Not implemented yet). See Asynchronous Mode - Asyn...
Interface of the SampleStream.
virtual tResult Open(const char *strName, adtf::ucom::ant::iobject_ptr< ISampleInStream > &pInStream, const adtf::ucom::ant::iobject_ptr< const IStreamType > &pInitialAcceptedStreamType, IPushReadEventSink *&pPushEventSink, ISampleStreamAccess::tMode ui32Mode, size_t szQueueSize)=0
Opens The SampleStream for reading access.
The IStreamItem interface is the base type for all object which are passed through a stream.
virtual tType GetType() const =0
Retrieves the type of the sample item.
virtual tResult GetStreamType(ucom::ant::iobject_ptr< const IStreamType > &pStreamType) const =0
Retrieves the StreamType of the StreamItem if GetType is tType::StreamType.
@ StreamType
item is a IStreamType. Mind: All StreamType changes will be queue too !!
@ Sample
item is a queue item contains a ISample
Implementation of a ISampleReaderQueue with dynamic growing sample queue.
adtf_util::lock_free_queue< cStreamItem > base_type
base type of cDynamicSampleReaderQueue
tResult Pop(IStreamItem &oStreamItem) override
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Sample reader which allows the registration of external queue.
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
std::set< ISampleReaderQueue * > m_lstExternalQueues
A set of other registered buffer.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Sample reader which allows the registration of external queue.
Sample reader which always provides the last successful received sample.
tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample) override
Collect the next sample by overriding the ReadNextSample.
ucom::object_ptr< const ISample > m_pLastValidSample
Last Sample Reference.
tResult GetLastSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Returns the last Sample received.
tResult Pop(IStreamItem &oItem)
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
The default Sample Reader will read the incomung Stream of a IInPin.
ISampleStreamAccess::tMode m_eAccessMode
current access method
void SetAcceptTypeCallback(const std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
sample_streamer< ISampleReader, cInPin > base_type
base type of cSampleReader
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Gets the next Sample within internal queue which not has been read.
void GetSampleInStream(ucom::ant::iobject_ptr< ISampleInStream > &pSampleInStream)
Returns the connected ISampleInStream.
std::mutex m_oReadLock
Read synchronization.
ucom::object_ptr< ISampleInStream > m_pInStream
Reader where to read.
std::function< tResult(tResult oStreamError)> m_fnStreamErrorCallback
Callback to react on stream errors.
bool m_bValidType
state wether the current type is a valid type or not this will be set to false in AcceptType fails.
base::ant::runnable< base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink > runnable_type
internal base type.
ucom::object_ptr< const IStreamType > m_pLastReadStreamType
Last accepted incoming type.
tResult SetStreamError(const tResult &oError)
Forward an error to the corresponding stream.
tResult EndStreaming() override
Sample Stream disconnected.
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> m_fnAcceptTypeCallback
Callback to reject type changes.
virtual tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
This will read the stream items until a sample is reached and return it.
void GetLastType(ucom::ant::iobject_ptr< const IStreamType > &pType)
Returns the connected ISampleInStream.
virtual tResult AcceptType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Accept or reject a new stream type - see also AcceptType and IsCompatible implementations.
cSampleReader(ISampleStreamAccess::tMode eAccessMode)
Default CTOR which defines the access method.
tResult ProcessStreamItem(const IStreamItem &oStreamItem)
This will process the stream items.
tResult Push(tTimeStamp tmTimeofActivation)
internal Push operation to implement pushread mode
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
tResult BeginStreaming(ISampleStream &oSampleStream) override
BeginStreaming will open the given Sample Stream for Reading while a connection is establishing.
Default implementation of an StreamItem as container of the Sample Stream s Queue.
tResult GetSample(ucom::ant::iobject_ptr< const ISample > &pSample) const
Retrieves the sample of the StreamItem if GetType is tType::Sample.
tType GetType() const
Retrieves the type of the sample item.
tResult CopyTo(IStreamItem &oDest) const
Copy its content to the oDest container.
Sample Data getter for an easy use of samples with samplebuffer set to the type T.
Definition sample_data.h:33
A Possible Sample Reader of a Trigger Function!
std::conditional< STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader >::type base_class
base class
void ResetPin()
Resets the pin reference to nullptr.
sample_streamer(const char *strName, const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
CTOR initializer.
sample_streamer(const sample_streamer &)=delete
deleted copy CTOR
sample_streamer & operator=(const sample_streamer &)=delete
deleted copy operator
sample_streamer & operator=(sample_streamer &&)=delete
deleted move operator
void SetName(const char *strName)
Sets the name of the streamer.
tResult SetType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Sets the StreamType of the streamer.
sample_streamer(sample_streamer &&)=delete
deleted move CTOR
void ResetPin(ucom::ant::object_ptr< PINTYPE > &pPin)
Resets the pin reference This is only internaly used.
tResult GetName(base::ant::IString &&strName) const
Gets the name of the streamer.
Interface for sample reads that read from sample streams via input pins.
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample)=0
Reads the next available sample from the associated sample stream.
virtual tResult GetLastType(ucom::ant::iobject_ptr< const IStreamType > &pType)=0
Returns the last stream type that was read from the sample stream.
virtual void SetName(const char *strName)=0
Sets the name of the streamer.
virtual tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)=0
Sets the initial stream type of a streamer.
std::set< ISampleReaderQueue * > m_oExternalQueues
A set of other registered buffer.
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
The default Sample Reader will read the incomung Stream of a IInPin.
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the latest available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult RequestSamples(ucom::ant::iobject_ptr< hollow::IStreamingRequest > &pRequest, uint32_t nSubStreamId, const base::ant::IProperties *pRequestProperties=nullptr)
RequestSamples of the given Substream to be generated and/or transmitted.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
void SetAcceptTypeCallback(const std::function< tResult(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
tResult BeginStreaming(ISampleStream &oSampleStream) override
Begin streaming on the given sample stream.
Reads and stores Samples within the given queue implementation INTERNAL_QUEUE.
A reader that does not read anything.
tResult BeginStreaming(ant::ISampleStream &pStream) override
Begin streaming on the given sample stream.
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the latest available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< flash::IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
~cNullReader() override
Destructor.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
virtual tResult Reset(const iobject_ptr< T > &i_oOther)=0
Reset this object_ptr<> with the content of another iobject_ptr<>
Base object pointer to realize binary compatible reference counting in interface methods.
Namespace for all functionality of the ADTF Streaming SDK provided since v3.0.
sample_reader< time_limited_sample_reader_queue< TIME_RANGE >, STORELASTSAMPLE > time_limited_sample_reader
The time_limited_sample_reader will create a sample reader which will create a internal sample queue ...
tResult make_sample_reader(cSampleReader &oReader, const char *strNameOfReader, const ucom::iobject_ptr< const IStreamType > &pStreamType)
Initializes a cSampleReader with given name and StreamType param[in] oReader Reader to intialize para...
size_limited_sample_reader< 1 > cSingleSampleReader
The cSingleSampleReader will create a sample reader which will create a internal sample queue with on...
sample_reader< size_limited_sample_reader_queue< MAX_SIZE >, STORELASTSAMPLE > size_limited_sample_reader
The size_limited_sample_reader will create a sample reader which will create a internal sample queue ...
const ISampleInStream & operator>>(const ISampleInStream &oStreamReader, IStreamItem &oItem)
Streaming Operator>> to read a sample from the readers queue.
sample_reader< cDynamicSampleReaderQueue > cDynamicSampleReader
The cDynamicSampleReader will create a sample reader which will create a internal sample queue with u...
Namespace for all functionality of the ADTF Streaming SDK provided since v3.5.
sample_reader< ant::cDynamicSampleReaderQueue > cDynamicSampleReader
The cDynamicSampleReader will create a sample reader which will create a internal sample queue with u...
sample_reader< ant::time_limited_sample_reader_queue< TIME_RANGE >, STORELASTSAMPLE > time_limited_sample_reader
The time_limited_sample_reader will create a sample reader which will create a internal sample queue ...
size_limited_sample_reader< 1 > cSingleSampleReader
The cSingleSampleReader will create a sample reader which will create a internal sample queue with on...
sample_reader< ant::size_limited_sample_reader_queue< MAX_SIZE >, STORELASTSAMPLE > size_limited_sample_reader
The size_limited_sample_reader will create a sample reader which will create a internal sample queue ...
Namespace for all functionality of the ADTF Streaming SDK provided since v3.10.
Namespace for the ADTF Streaming SDK.
ant::iobject_ptr< T > iobject_ptr
Alias always bringing the latest version of ant::iobject_ptr into scope.
object_ptr< Implementation > make_object_ptr(Args &&... args)
Alias always bringing the latest version of ant::make_object_ptr() into scope.
Namespace for entire ADTF SDK.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_RUN_FUNCTION(_fcName_)
Helper Macro to define Run function for runnable<>
Definition runnable.h:196
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.