ADTF  3.18.4
Source Code for TimeStamp Synchronizer Plugin
Build Environment
To see how to set up the build environment have a look at ADTF CMake Environment
this implementation shows:
  • how to manually implement a Filter
  • how to implement a filter with dynamic Pins.
  • how to synchronize multiple data streams according to their tTimeStamp
#pragma once
#include <deque>
#include <unordered_map>
using namespace adtf::util;
using namespace adtf::ucom;
using namespace adtf::base;
using namespace adtf::streaming;
using namespace adtf::filter;
// we use the cDynamicFilter base class, as it export the IDynamicDataBinding interface for us
class cDemoSynchronizerFilter : public cFilter, private adtf::ucom::IEventSink
// give our filter a name and class id
"TimeStamp Synchronizer");
// ensure that our dependencies will be available withhin the runtime
// implement the method that is called when a new input pin is required
tResult RequestDynamicInputPin(const char* strName,
// implement the method that is called when a new output pin is required
tResult RequestDynamicOutputPin(const char* strName,
// stateful filters need to be aware of runlevel changes and prepare for clean reinitialization
tResult Init(tInitStage eStage) override;
tResult Shutdown(tInitStage eStage) override;
// stateful filters need to handle clock reset events and discard any temporal state
tResult HandleEvent(const adtf::ucom::IEventSource& oSource, const void* pvEventData) override;
// we use an cExternalQueueSampleReader in order to grab the
// stream items and put them in the queue. This class is a proxy between the sample streams
// and the queue within the filter.
struct tStream: public adtf::streaming::ISampleReaderQueue
tStream(cDemoSynchronizerFilter& oFilter,
std::string_view strName,
bool bSynchronousProcessing):
pReader = oFilter.CreateInputPinWithCallback<cExternalQueueSampleReader>(std::string(strName).c_str(), pType, [this, bSynchronousProcessing](tNanoSeconds /*tmTrigger*/) -> tResult
// just in case this was not a PushRead synchronous connection.
// an empty stream item signals a trigger
m_oFilter.InsertQueueItem(adtf::streaming::cStreamItem(), *this);
if (bSynchronousProcessing)
}, false);
pWriter = m_oFilter.CreateOutputPin((std::string(strName) + "_out").c_str(), pType);
tStream(const tStream&) = delete;
tStream(tStream&&) = delete;
tResult Push(const adtf::streaming::IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
m_oFilter.InsertQueueItem(oStreamItem, *this);
void Clear() override
tResult Pop(adtf::streaming::IStreamItem& /* oStreamItem */) override
cSampleWriter* pWriter = nullptr;
cExternalQueueSampleReader* pReader = nullptr;
// this will store the time stamp from the last sample so that we can use it for types
// or triggers, which do not have a timestamp of their own. In our context of types and
// triggers only ordering is relevant.
tNanoSeconds tmLastSampleTimeStamp;
cDemoSynchronizerFilter& m_oFilter;
using tStreams = std::unordered_map<std::string, tStream>;
tStreams m_oStreams;
struct tQueueItem
tQueueItem(tNanoSeconds tmTimeStamp,
const adtf::streaming::IStreamItem& oStreamItem,
tStream& oStream):
tNanoSeconds tmTimeStamp;
tStream* pStream;
tStreams::iterator GetOrCreateInputStream(const std::string& strName,
// add a new stream item, and if synchronous processing is configured, flush the queue
void InsertQueueItem(adtf::streaming::cStreamItem&& oStreamItem, tStream& oStream);
// Check if the queue exheeds the threshold and flush to output.
void CheckQueue();
// remove all pending elements from the queue
// discard samples and triggers, but maintain coherency with regard to stream types
void Clear();
// forward a single item
void ForwardItem(tQueueItem& sItem);
std::mutex m_oQueueMutex;
std::deque<tQueueItem> m_oQueue;
adtf::base::property_variable<int64_t> m_nQueueTransferStartTimeout = 20000;
adtf::base::property_variable<int64_t> m_nQueueTransferEndTimeout = 10000;
adtf::base::property_variable<bool> m_bSynchronousQueueProcessing = false;
#include "synchronizer_filter.h"
// this macro creates the plugin DLL or shared object entry methods
// and provides the filter class via its class factory
ADTF_PLUGIN("TimeStamp Synchronizer Plugin",
// obtain the clock object by its primary interface, IReferenceClock
// the clock object also implements the IEventSource interface.
m_pClockEventSource = ucom_object_ptr_cast<adtf::ucom::IEventSource>(pClock);
if (!m_pClockEventSource)
THROW_ERROR_DESC(ERR_POINTER, "Reference clock missing or not implementing IEventSource");
m_nQueueTransferStartTimeout.SetDescription("Reference timespan AFTER which all Samples within the queue will be forwarded when the time difference between the first and last Sample exceeds the property value.");
RegisterPropertyVariable("queue_transfer_start_timeout", m_nQueueTransferStartTimeout);
m_nQueueTransferEndTimeout.SetDescription("Reference timespan UNTIL which all Samples within the queue will be forwarded when the time difference between the first and last Sample will be below the property value.");
RegisterPropertyVariable("queue_transfer_end_timeout", m_nQueueTransferEndTimeout);
m_bSynchronousQueueProcessing.SetDescription("if activated, this will force the queue to be forwarded synchronously when a Trigger is received on one of the Input Pins and the queue forwarding conditions are met.");
RegisterPropertyVariable("synchronous_queue_processing", m_bSynchronousQueueProcessing);
CreateRunner("process_queue", [&](tNanoSeconds) -> tResult
// double check we have all available items in our queue
// if connections are not synchronous and do not provide triggers
for (auto& sStream: m_oStreams)
// check if we need to forward some items
SetDescription("process_queue", "Runner to periodically trigger the function which checks the queue.");
// sets a short description for the component
SetDescription("Use this filter to sort incoming samples according to their timestamp.");
// set help link to jump to documentation from ADTF Configuration Editor
// this method is called whenever a new input pin is requested.
tResult cDemoSynchronizerFilter::RequestDynamicInputPin(const char* strName,
auto itStream = GetOrCreateInputStream(strName, pType);
// the initial stream-type will be visible only *now* and will not be repeated as a stream item
// so it must be forwarded at this point
// this method is called whenever a new output pin is requested.
tResult cDemoSynchronizerFilter::RequestDynamicOutputPin(const char* strName,
std::string strStreamName(strName);
const std::string_view strOutSuffix = "_out";
if (strStreamName.rfind(strOutSuffix) != strStreamName.size() - strOutSuffix.size())
RETURN_ERROR_DESC(ERR_INVALID_ARG, "Invalid output pin name '%s'. Output pin names have to end with '_out'.", strName);
const auto strInputName = strStreamName.substr(0, strStreamName.size() - strOutSuffix.size());
GetOrCreateInputStream(strInputName, pType);
tResult cDemoSynchronizerFilter::Init(tInitStage eStage)
if (eStage == tInitStage::StagePostConnect)
// Via this interface, the clock informs about clock reset events (e.g. by track navigation backwards)
tResult cDemoSynchronizerFilter::Shutdown(tInitStage eStage)
if (eStage == tInitStage::StagePostConnect)
// Via this interface, the clock informs about clock reset events (e.g. by track navigation backwards)
// We don't care about clock resets while we don't hhave any state
// drop all still queued state when going from RL5 to RL4
// filters are still connected in this state and stream-type negotiation still happens
// no more samples and triggers though
tResult cDemoSynchronizerFilter::HandleEvent(const IEventSource& oSource, const void* pvEventData)
if (&oSource == m_pClockEventSource.Get())
const auto pEvent = static_cast<const adtf::services::IReferenceClock::tReferenceClockEvent*>(pvEventData);
// discard all triggers and samples belonging to the future.
cDemoSynchronizerFilter::tStreams::iterator cDemoSynchronizerFilter::GetOrCreateInputStream(const std::string& strName,
return m_oStreams.try_emplace(strName, *this, strName, pType, m_bSynchronousQueueProcessing).first;
void cDemoSynchronizerFilter::InsertQueueItem(adtf::streaming::cStreamItem&& oStreamItem,
tStream& oStream)
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
// stream types and trigger get the timestamp of the last sample
auto tmTimeStamp = oStream.tmLastSampleTimeStamp;
if (oStreamItem.GetType() == adtf::streaming::IStreamItem::tType::Sample)
if (IS_OK(oStreamItem.GetSample(pSample)))
tmTimeStamp = get_sample_time(pSample);
oStream.tmLastSampleTimeStamp = tmTimeStamp;
// we always insert new items sorted
[](tNanoSeconds tmTime, tQueueItem& sItem)
return tmTime < sItem.tmTimeStamp;
tmTimeStamp, std::move(oStreamItem), oStream
void cDemoSynchronizerFilter::CheckQueue()
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
if (m_oQueue.empty())
// if the queue has not reached the transfer start timeout we do nothing
if (m_oQueue.back().tmTimeStamp - m_oQueue.front().tmTimeStamp < duration_cast<tNanoSeconds>(static_cast<tTimeStamp>(m_nQueueTransferStartTimeout)))
// otherwise forward all items older than the transfer end timeout
while (!m_oQueue.empty() &&
m_oQueue.back().tmTimeStamp - m_oQueue.front().tmTimeStamp >= duration_cast<tNanoSeconds>(static_cast<tTimeStamp>(m_nQueueTransferEndTimeout)))
void cDemoSynchronizerFilter::Clear()
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
// all items are now pending removal from the queue
while (!m_oQueue.empty())
auto& oFront = m_oQueue.front();
// streamtype items still need to be forwarded as stream-type handling is stateful accross both clock resets and runlevel changes
// you may optimize and restrict yourself to forwarding only the *final* stream-type *per reader*
if (oFront.oStreamItem.GetType() == adtf::streaming::ant::IStreamItem::tType::StreamType)
void cDemoSynchronizerFilter::ForwardItem(cDemoSynchronizerFilter::tQueueItem& sItem)
switch (sItem.oStreamItem.GetType())
if (IS_OK(sItem.oStreamItem.GetSample(pSample)))
if (IS_OK(sItem.oStreamItem.GetStreamType(pType)))
