ADTF  3.18.2
Source Code for TimeStamp Synchronizer Plugin
Location
./src/examples/src/adtf/filters/standard_filters/synchronizer_filter/
Namespace for entire ADTF SDK.
Build Environment
To see how to set up the build environment have a look at ADTF CMake Environment
this implementation shows:
  • how to manually implement a Filter
  • how to implement a filter with dynamic Pins.
  • how to synchronize multiple data streams according to their tTimeStamp
Header
#pragma once
#include <deque>
#include <unordered_map>
using namespace adtf::util;
using namespace adtf::ucom;
using namespace adtf::base;
using namespace adtf::streaming;
using namespace adtf::filter;
// we use the cDynamicFilter base class, as it export the IDynamicDataBinding interface for us
class cDemoSynchronizerFilter : public cFilter, private adtf::ucom::IEventSink
{
public:
// give our filter a name and class id
ADTF_CLASS_ID_NAME(cDemoSynchronizerFilter,
"demo_timestamp_synchronizer.filter.adtf.cid",
"TimeStamp Synchronizer");
// ensure that our dependencies will be available withhin the runtime
public:
cDemoSynchronizerFilter();
// implement the method that is called when a new input pin is required
tResult RequestDynamicInputPin(const char* strName,
// implement the method that is called when a new output pin is required
tResult RequestDynamicOutputPin(const char* strName,
// stateful filters need to be aware of runlevel changes and prepare for clean reinitialization
tResult Init(tInitStage eStage) override;
tResult Shutdown(tInitStage eStage) override;
// stateful filters need to handle clock reset events and discard any temporal state
tResult HandleEvent(const adtf::ucom::IEventSource& oSource, const void* pvEventData) override;
private:
// we use an cExternalQueueSampleReader in order to grab the
// stream items and put them in the queue. This class is a proxy between the sample streams
// and the queue within the filter.
struct tStream: public adtf::streaming::ISampleReaderQueue
{
tStream(cDemoSynchronizerFilter& oFilter,
std::string_view strName,
bool bSynchronousProcessing):
m_oFilter(oFilter)
{
pReader = oFilter.CreateInputPinWithCallback<cExternalQueueSampleReader>(std::string(strName).c_str(), pType, [this, bSynchronousProcessing](tNanoSeconds /*tmTrigger*/) -> tResult
{
// just in case this was not a PushRead synchronous connection.
pReader->ReadAllAvailableItems();
// an empty stream item signals a trigger
m_oFilter.InsertQueueItem(adtf::streaming::cStreamItem(), *this);
if (bSynchronousProcessing)
{
m_oFilter.CheckQueue();
}
}, false);
pReader->RegisterExternalQueue(this);
pWriter = m_oFilter.CreateOutputPin((std::string(strName) + "_out").c_str(), pType);
}
tStream(const tStream&) = delete;
tStream(tStream&&) = delete;
tResult Push(const adtf::streaming::IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
{
m_oFilter.InsertQueueItem(oStreamItem, *this);
}
void Clear() override
{
}
tResult Pop(adtf::streaming::IStreamItem& /* oStreamItem */) override
{
RETURN_ERROR(ERR_NOT_SUPPORTED);
}
cSampleWriter* pWriter = nullptr;
cExternalQueueSampleReader* pReader = nullptr;
// this will store the time stamp from the last sample so that we can use it for types
// or triggers, which do not have a timestamp of their own. In our context of types and
// triggers only ordering is relevant.
tNanoSeconds tmLastSampleTimeStamp;
cDemoSynchronizerFilter& m_oFilter;
};
using tStreams = std::unordered_map<std::string, tStream>;
tStreams m_oStreams;
struct tQueueItem
{
tQueueItem(tNanoSeconds tmTimeStamp,
const adtf::streaming::IStreamItem& oStreamItem,
tStream& oStream):
tmTimeStamp(tmTimeStamp),
oStreamItem(oStreamItem),
pStream(&oStream)
{
}
tNanoSeconds tmTimeStamp;
tStream* pStream;
};
tStreams::iterator GetOrCreateInputStream(const std::string& strName,
// add a new stream item, and if synchronous processing is configured, flush the queue
void InsertQueueItem(adtf::streaming::cStreamItem&& oStreamItem, tStream& oStream);
// Check if the queue exheeds the threshold and flush to output.
void CheckQueue();
// remove all pending elements from the queue
// discard samples and triggers, but maintain coherency with regard to stream types
void Clear();
// forward a single item
void ForwardItem(tQueueItem& sItem);
std::mutex m_oQueueMutex;
std::deque<tQueueItem> m_oQueue;
adtf::base::property_variable<int64_t> m_nQueueTransferStartTimeout = 20000;
adtf::base::property_variable<int64_t> m_nQueueTransferEndTimeout = 10000;
adtf::base::property_variable<bool> m_bSynchronousQueueProcessing = false;
};
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
#define REQUIRE_INTERFACE(_interface)
Macro usable with ADTF_CLASS_DEPENDENCIES() to require mandatory interfaces.
#define ADTF_CLASS_DEPENDENCIES(...)
Add interface ids (string literals,.
#define ADTF_CLASS_ID_NAME(_class, _strcid, _strclabel)
Common macro to enable correct treatment of class identifier AND Class Name by IClassInfo.
Definition: class_id.h:33
Property Variable template for the given T.
The IReferenceClock interface provides the reference time source for the filter graph.
Interface to create a sample reader buffer.
The IStreamItem interface is the base type for all object which are passed through a stream.
Default implementation of an StreamItem as container of the Sample Stream s Queue.
The class IEventSink provides a generic event listener interface for Internal Events.
The class IEventSource provides a generic event provider interface.
Base object pointer to realize binary compatible reference counting in interface methods.
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:163
Copyright © Audi Electronics Venture GmbH.
Namespace for the ADTF Base SDK.
Namespace for the ADTF Filter SDK.
Namespace for the ADTF Streaming SDK.
Namespace for the ADTF uCOM3 SDK.
alias namespace for the A_UTILS Library.
Implementation
#include "synchronizer_filter.h"
// this macro creates the plugin DLL or shared object entry methods
// and provides the filter class via its class factory
ADTF_PLUGIN("TimeStamp Synchronizer Plugin",
cDemoSynchronizerFilter);
cDemoSynchronizerFilter::cDemoSynchronizerFilter()
{
// obtain the clock object by its primary interface, IReferenceClock
// the clock object also implements the IEventSource interface.
m_pClockEventSource = ucom_object_ptr_cast<adtf::ucom::IEventSource>(pClock);
if (!m_pClockEventSource)
{
THROW_ERROR_DESC(ERR_POINTER, "Reference clock missing or not implementing IEventSource");
}
m_nQueueTransferStartTimeout.SetDescription("Reference timespan AFTER which all Samples within the queue will be forwarded when the time difference between the first and last Sample exceeds the property value.");
RegisterPropertyVariable("queue_transfer_start_timeout", m_nQueueTransferStartTimeout);
m_nQueueTransferEndTimeout.SetDescription("Reference timespan UNTIL which all Samples within the queue will be forwarded when the time difference between the first and last Sample will be below the property value.");
RegisterPropertyVariable("queue_transfer_end_timeout", m_nQueueTransferEndTimeout);
m_bSynchronousQueueProcessing.SetDescription("if activated, this will force the queue to be forwarded synchronously when a Trigger is received on one of the Input Pins and the queue forwarding conditions are met.");
RegisterPropertyVariable("synchronous_queue_processing", m_bSynchronousQueueProcessing);
CreateRunner("process_queue", [&](tNanoSeconds) -> tResult
{
// double check we have all available items in our queue
// if connections are not synchronous and do not provide triggers
for (auto& sStream: m_oStreams)
{
sStream.second.pReader->ReadAllAvailableItems();
}
// check if we need to forward some items
CheckQueue();
});
SetDescription("process_queue", "Runner to periodically trigger the function which checks the queue.");
// sets a short description for the component
SetDescription("Use this filter to sort incoming samples according to their timestamp.");
// set help link to jump to documentation from ADTF Configuration Editor
SetHelpLink("$(ADTF_DIR)/doc/adtf_html/page_demo_synchronizer_filter.html");
}
// this method is called whenever a new input pin is requested.
tResult cDemoSynchronizerFilter::RequestDynamicInputPin(const char* strName,
{
auto itStream = GetOrCreateInputStream(strName, pType);
// the initial stream-type will be visible only *now* and will not be repeated as a stream item
// so it must be forwarded at this point
itStream->second.pWriter->ChangeType(pType);
}
// this method is called whenever a new output pin is requested.
tResult cDemoSynchronizerFilter::RequestDynamicOutputPin(const char* strName,
{
std::string strStreamName(strName);
const std::string_view strOutSuffix = "_out";
if (strStreamName.rfind(strOutSuffix) != strStreamName.size() - strOutSuffix.size())
{
RETURN_ERROR_DESC(ERR_INVALID_ARG, "Invalid output pin name '%s'. Output pin names have to end with '_out'.", strName);
}
const auto strInputName = strStreamName.substr(0, strStreamName.size() - strOutSuffix.size());
GetOrCreateInputStream(strInputName, pType);
}
tResult cDemoSynchronizerFilter::Init(tInitStage eStage)
{
RETURN_IF_FAILED(cFilter::Init(eStage));
if (eStage == tInitStage::StagePostConnect)
{
// Via this interface, the clock informs about clock reset events (e.g. by track navigation backwards)
THROW_IF_FAILED(m_pClockEventSource->RegisterEventSink(*this));
}
}
tResult cDemoSynchronizerFilter::Shutdown(tInitStage eStage)
{
if (eStage == tInitStage::StagePostConnect)
{
// Via this interface, the clock informs about clock reset events (e.g. by track navigation backwards)
// We don't care about clock resets while we don't hhave any state
THROW_IF_FAILED(m_pClockEventSource->UnregisterEventSink(*this));
// drop all still queued state when going from RL5 to RL4
// filters are still connected in this state and stream-type negotiation still happens
// no more samples and triggers though
Clear();
}
RETURN_IF_FAILED(cFilter::Shutdown(eStage));
}
tResult cDemoSynchronizerFilter::HandleEvent(const IEventSource& oSource, const void* pvEventData)
{
if (&oSource == m_pClockEventSource.Get())
{
const auto pEvent = static_cast<const adtf::services::IReferenceClock::tReferenceClockEvent*>(pvEventData);
{
// discard all triggers and samples belonging to the future.
Clear();
}
}
}
cDemoSynchronizerFilter::tStreams::iterator cDemoSynchronizerFilter::GetOrCreateInputStream(const std::string& strName,
{
return m_oStreams.try_emplace(strName, *this, strName, pType, m_bSynchronousQueueProcessing).first;
}
void cDemoSynchronizerFilter::InsertQueueItem(adtf::streaming::cStreamItem&& oStreamItem,
tStream& oStream)
{
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
// stream types and trigger get the timestamp of the last sample
auto tmTimeStamp = oStream.tmLastSampleTimeStamp;
if (oStreamItem.GetType() == adtf::streaming::IStreamItem::tType::Sample)
{
if (IS_OK(oStreamItem.GetSample(pSample)))
{
tmTimeStamp = get_sample_time(pSample);
oStream.tmLastSampleTimeStamp = tmTimeStamp;
}
}
// we always insert new items sorted
m_oQueue.emplace(
std::upper_bound(
m_oQueue.begin(),
m_oQueue.end(),
tmTimeStamp,
[](tNanoSeconds tmTime, tQueueItem& sItem)
{
return tmTime < sItem.tmTimeStamp;
}
),
tmTimeStamp, std::move(oStreamItem), oStream
);
}
void cDemoSynchronizerFilter::CheckQueue()
{
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
if (m_oQueue.empty())
{
return;
}
// if the queue has not reached the transfer start timeout we do nothing
if (m_oQueue.back().tmTimeStamp - m_oQueue.front().tmTimeStamp < duration_cast<tNanoSeconds>(static_cast<tTimeStamp>(m_nQueueTransferStartTimeout)))
{
return;
}
// otherwise forward all items older than the transfer end timeout
while (!m_oQueue.empty() &&
m_oQueue.back().tmTimeStamp - m_oQueue.front().tmTimeStamp >= duration_cast<tNanoSeconds>(static_cast<tTimeStamp>(m_nQueueTransferEndTimeout)))
{
ForwardItem(m_oQueue.front());
m_oQueue.pop_front();
}
}
void cDemoSynchronizerFilter::Clear()
{
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
// all items are now pending removal from the queue
while (!m_oQueue.empty())
{
auto& oFront = m_oQueue.front();
// streamtype items still need to be forwarded as stream-type handling is stateful accross both clock resets and runlevel changes
// you may optimize and restrict yourself to forwarding only the *final* stream-type *per reader*
if (oFront.oStreamItem.GetType() == adtf::streaming::ant::IStreamItem::tType::StreamType)
{
ForwardItem(oFront);
}
m_oQueue.pop_front();
}
}
void cDemoSynchronizerFilter::ForwardItem(cDemoSynchronizerFilter::tQueueItem& sItem)
{
switch (sItem.oStreamItem.GetType())
{
{
if (IS_OK(sItem.oStreamItem.GetSample(pSample)))
{
sItem.pStream->pWriter->Write(pSample);
}
break;
}
{
if (IS_OK(sItem.oStreamItem.GetStreamType(pType)))
{
sItem.pStream->pWriter->ChangeType(pType);
}
break;
}
default:
{
sItem.pStream->pWriter->ManualTrigger(sItem.tmTimeStamp);
break;
}
}
}
#define ADTF_PLUGIN(__plugin_identifier,...)
The ADTF Plugin Macro will add the code of a adtf::ucom::ant::IPlugin implementation.
Definition: adtf_plugin.h:22
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
@ EVENT_TimeResetEnd
Event describes the Time reset after a new time was set.
@ StreamType
item is a IStreamType. Mind: All StreamType changes will be queue too !!
@ Sample
item is a queue item contains a ISample
virtual tResult GetObject(iobject_ptr< IObject > &pObject, const char *strNameOID) const =0
Get registered object from object registry.
base::flash::tNanoSeconds get_sample_time(const ucom::ant::iobject_ptr< const ant::ISample > &pSample)
Returns the sample time stamp with nanosecond precision.
adtf::ucom::IRuntime * _runtime
Global Runtime Pointer to reference to the current runtime.
This structure describes the functionality for the adtf::ucom::ant::IEventSource Communication Expans...
#define THROW_ERROR_DESC(_code,...)
throws a tResult exception
#define THROW_IF_FAILED(s)
throws if the expression returns a failed tResult