#include "synchronizer_filter.h"
cDemoSynchronizerFilter);
cDemoSynchronizerFilter::cDemoSynchronizerFilter()
{
m_pClockEventSource = ucom_object_ptr_cast<adtf::ucom::IEventSource>(pClock);
if (!m_pClockEventSource)
{
THROW_ERROR_DESC(ERR_POINTER,
"Reference clock missing or not implementing IEventSource");
}
m_nQueueTransferStartTimeout.SetDescription("Reference timespan AFTER which all Samples within the queue will be forwarded when the time difference between the first and last Sample exceeds the property value.");
RegisterPropertyVariable("queue_transfer_start_timeout", m_nQueueTransferStartTimeout);
m_nQueueTransferEndTimeout.SetDescription("Reference timespan UNTIL which all Samples within the queue will be forwarded when the time difference between the first and last Sample will be below the property value.");
RegisterPropertyVariable("queue_transfer_end_timeout", m_nQueueTransferEndTimeout);
m_bSynchronousQueueProcessing.SetDescription("if activated, this will force the queue to be forwarded synchronously when a Trigger is received on one of the Input Pins and the queue forwarding conditions are met.");
RegisterPropertyVariable("synchronous_queue_processing", m_bSynchronousQueueProcessing);
CreateRunner(
"process_queue", [&](tNanoSeconds) ->
tResult
{
for (auto& sStream: m_oStreams)
{
sStream.second.pReader->ReadAllAvailableItems();
}
CheckQueue();
});
SetDescription("process_queue", "Runner to periodically trigger the function which checks the queue.");
SetDescription("Use this filter to sort incoming samples according to their timestamp.");
SetHelpLink("$(ADTF_DIR)/doc/adtf_html/page_demo_synchronizer_filter.html");
}
tResult cDemoSynchronizerFilter::RequestDynamicInputPin(
const char* strName,
{
auto itStream = GetOrCreateInputStream(strName, pType);
itStream->second.pWriter->ChangeType(pType);
}
tResult cDemoSynchronizerFilter::RequestDynamicOutputPin(
const char* strName,
{
std::string strStreamName(strName);
const std::string_view strOutSuffix = "_out";
if (strStreamName.rfind(strOutSuffix) != strStreamName.size() - strOutSuffix.size())
{
RETURN_ERROR_DESC(ERR_INVALID_ARG,
"Invalid output pin name '%s'. Output pin names have to end with '_out'.", strName);
}
const auto strInputName = strStreamName.substr(0, strStreamName.size() - strOutSuffix.size());
GetOrCreateInputStream(strInputName, pType);
}
tResult cDemoSynchronizerFilter::Init(tInitStage eStage)
{
if (eStage == tInitStage::StagePostConnect)
{
}
}
tResult cDemoSynchronizerFilter::Shutdown(tInitStage eStage)
{
if (eStage == tInitStage::StagePostConnect)
{
Clear();
}
}
tResult cDemoSynchronizerFilter::HandleEvent(
const IEventSource& oSource,
const void* pvEventData)
{
if (&oSource == m_pClockEventSource.Get())
{
{
Clear();
}
}
}
cDemoSynchronizerFilter::tStreams::iterator cDemoSynchronizerFilter::GetOrCreateInputStream(const std::string& strName,
{
return m_oStreams.try_emplace(strName, *this, strName, pType, m_bSynchronousQueueProcessing).first;
}
tStream& oStream)
{
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
auto tmTimeStamp = oStream.tmLastSampleTimeStamp;
{
if (
IS_OK(oStreamItem.GetSample(pSample)))
{
oStream.tmLastSampleTimeStamp = tmTimeStamp;
}
}
m_oQueue.emplace(
std::upper_bound(
m_oQueue.begin(),
m_oQueue.end(),
tmTimeStamp,
[](tNanoSeconds tmTime, tQueueItem& sItem)
{
return tmTime < sItem.tmTimeStamp;
}
),
tmTimeStamp, std::move(oStreamItem), oStream
);
}
void cDemoSynchronizerFilter::CheckQueue()
{
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
if (m_oQueue.empty())
{
return;
}
if (m_oQueue.back().tmTimeStamp - m_oQueue.front().tmTimeStamp < duration_cast<tNanoSeconds>(
static_cast<tTimeStamp>(m_nQueueTransferStartTimeout)))
{
return;
}
while (!m_oQueue.empty() &&
m_oQueue.back().tmTimeStamp - m_oQueue.front().tmTimeStamp >= duration_cast<tNanoSeconds>(
static_cast<tTimeStamp>(m_nQueueTransferEndTimeout)))
{
ForwardItem(m_oQueue.front());
m_oQueue.pop_front();
}
}
void cDemoSynchronizerFilter::Clear()
{
std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
while (!m_oQueue.empty())
{
auto& oFront = m_oQueue.front();
{
ForwardItem(oFront);
}
m_oQueue.pop_front();
}
}
void cDemoSynchronizerFilter::ForwardItem(cDemoSynchronizerFilter::tQueueItem& sItem)
{
switch (sItem.oStreamItem.GetType())
{
{
if (
IS_OK(sItem.oStreamItem.GetSample(pSample)))
{
sItem.pStream->pWriter->Write(pSample);
}
break;
}
{
if (
IS_OK(sItem.oStreamItem.GetStreamType(pType)))
{
sItem.pStream->pWriter->ChangeType(pType);
}
break;
}
default:
{
sItem.pStream->pWriter->ManualTrigger(sItem.tmTimeStamp);
break;
}
}
}
#define ADTF_PLUGIN(__plugin_identifier,...)
The ADTF Plugin Macro will add the code of a adtf::ucom::ant::IPlugin implementation.
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define IS_OK(s)
Check if result is OK.
@ EVENT_TimeResetEnd
Event describes the Time reset after a new time was set.
@ StreamType
item is a IStreamType. Mind: All StreamType changes will be queue too !!
@ Sample
item is a queue item contains a ISample
virtual tResult GetObject(iobject_ptr< IObject > &pObject, const char *strNameOID) const =0
Get registered object from object registry.
base::flash::tNanoSeconds get_sample_time(const ucom::ant::iobject_ptr< const ant::ISample > &pSample)
Returns the sample time stamp with nanosecond precision.
adtf::ucom::IRuntime * _runtime
Global Runtime Pointer to reference to the current runtime.
This structure describes the functionality for the adtf::ucom::ant::IEventSource Communication Expans...
#define THROW_ERROR_DESC(_code,...)
throws a tResult exception
#define THROW_IF_FAILED(s)
throws if the expression returns a failed tResult