ADTF  3.18.2
Streaming Architecture
All about the Filter Graph

ADTF enables you to develop concrete (automotive) functionality and complex applications that transform, record and visualize data. To stay flexible, configurable and re-useable you will use within ADTF the ADTF Configuration Editor to create a Filter Graph. This Filter Graph is the major key point of ADTF - all about the Filter Graph.

The Filter Graph will give you the possibility to design an application with small processing parts (Filters) those fulfill a particular task. It is recommended to implement and design Filters to reuse them in more than one Filter Graph Configuration.

To clearify some terms in advance obtain following list:

Time

The term time is used in case of a continously raising value with clear defines time base. Time is not a value!

Timestamp

The term timestamp is used in case of describing a moment of Time. This is a static value with a static offset to its time base.

type tTimeStamp

The type tTimeStamp is usually a timestamp in microseconds (if not, the declaration of a use needs to mark this explicitly).

Data

Usually data are any kind of user defined signal, structs or memory.

Streaming Data Streaming Data is a continous data flow which have a time relation on each data by a timestamp.

Filter Graph

To create and setup a Filter Graph you have different building blocks and components. These components can be connected to each other.


Filter

The Filters are processing units that will provide the functionality of:

  • receiving data via so called In Pins (Input) and Sample Readers
  • processing data, manipulating or calculating them
  • containing control algorithms, kalman filters
  • visualizing data
  • sending processed data via so called Out Pins (Output) and Sample Writers

Interface API: streaming::IFilter
Base Class API: filter::cFilter
Delivered Binaries: Delivered Filters
Detailed Description: Filter


Sample Stream

The Sample Stream will forward the data of exactly one Out Pin to one or more In Pins of other Filters. It will always act as mediator between these Pins.

Interface API: streaming::ISampleStream
Base Class API: streaming::cSampleStream
Delivered Binaries: Delivered Sample Streams
Detailed Description: Sample Stream


Streaming Source

The Streaming Source is a system entry point for Samples and Data Triggers. These data are coming i.e. from a hardware. Usually you will implement any device link as a Streaming Source.

Interface API: streaming::ISampleStreamingSource
Base Class API: filter::cSampleStreamingSource
Delivered Binaries: Delivered Streaming Sources
Detailed Description: Streaming Source


Streaming Sink

The Streaming Sink is a system exit point for Samples and Triggers. Usually you will implement any device link as a Streaming Sink.

Interface API: streaming::ISampleStreamingSink
Base Class API: filter::cSampleStreamingSink
Delivered Binaries: Delivered Streaming Sinks
Detailed Description: Streaming Sink


Active Runner

The Active Runner is either a Timer or Thread. It will always be the start point of a Trigger Pipe.

Interface API: streaming::IActiveRunner
Delivered Binaries: Delivered Active Runners
Detailed Description: Active Runner


Binding Proxy

The Binding Proxy will connect formal ADTF Client/Server interfaces between Filters. It will always act as mediator between these clients and servers.

Interface API: streaming::IBindingProxy
Delivered Binaries: Delivered Binding Proxies
Detailed Description: Binding Proxy


Named Graph Objects

Each component that can be added to the Filter Graph uses the basic interface INamedGraphObject.

Filter

Filters are small processing units.
For getting started on programming your own Filter see: My First Filter.

Some predefined utility Filters are part of the ADTF delivery, see a summary here.

The Filter is able to:

  • receive data via In Pins and there corresponding Sample Reader
  • send data via Out Pins and there corresponding Sample Writer
  • processing data in the context of Runners (also called trigger contextes, runnables, runnable functions, callables)

Filters can be used to:

  • decode streaming data from streaming sources like i.e. can raw data, most or flexray cycles
  • pre-process incoming data and prepare it for a algorithm implementation
  • re-calculate and merge incoming data by a fancy algorithm implementation
  • implement a loop controller
  • receive incoming data and show, log or display it in a human readable way
  • ... and many things more.

You can setup a Filter with Properties to adjust its concrete behaviour or its initial state. A simple Filter State Machine is to create, initialize and start a Filter step-by-step.

The Filter itself is designed to separate the data transmissions and runtime behaviour from each other. This gives users of a specific Filter the flexibility of Trigger configuration while using the Filter in a specific Filter Graph and connecting the so called Runners. Users can decide while configuring and using if the Filter runs by

  • a Data Trigger ... the functionality runs on an incoming data event
  • or a Time Trigger ... the functionality runs on an incoming time event.
Note
Within the ADTF Streaming SDK a Filter Base class implementation does exist.
We recommend to use the convenience API in the ADTF Filter SDK.

Use the Filter SDK as follows:

//...
Copyright © Audi Electronics Venture GmbH.
class cMyFilter : public adtf::filter::cFilter
{
public:
//this will define an ID of the filter can be used to create the filter
ADTF_CLASS_ID_NAME(cMyFilter, "my_filter.filter.example.cid", "My Filter");
//the CTOR of the Filter
cMyFilter()
{
}
};
#define ADTF_CLASS_ID_NAME(_class, _strcid, _strclabel)
Common macro to enable correct treatment of class identifier AND Class Name by IClassInfo.
Definition: class_id.h:33

Usually the Filter will be packed into a Plugin. This Plugin Mechanism of ADTF allows you to enrich the frameworks functionality and enables it to load i.e. Filters into the Runtime of ADTF. To create a Plugin for the above Filter cMyFilter you must use following code:

//...
//this will create a plugin entry point for you if this is a shared object/dynamic library created by the adtf_add_filter() CMake macro
ADTF_PLUGIN("MyFilter Example Plugin", cMyFilter);
#define ADTF_PLUGIN(__plugin_identifier,...)
The ADTF Plugin Macro will add the code of a adtf::ucom::ant::IPlugin implementation.
Definition: adtf_plugin.h:22

Out Pins and Sample Writers/Pin Writers

The data a Filter can create and write out are Samples. The formal description and classification of those Samples and its content is done by a Stream Type. Within the Filter you need to create Out Pins to identify a connection point to this filter, where data can be written out. Write Samples and set Stream Types onto a connected stream with the help of a Sample Writer (interface streaming::ISampleWriter).

Have a look at the docummentation of the CreateOutputPin function to add a Out Pin to the Filter:

class cMyFilter: public adtf::filter::cFilter
{
public:
//this will define an ID of the filter can be used to create the filter
ADTF_CLASS_ID_NAME(cMyFilter, "my_filter.filter.example.cid", "My Filter");
cMyFilter()
{
//give the output pin a name
//store the created writer in a member function
m_pWriter = CreateOutputPin("OutPin_1");
}
private:
adtf::streaming::ISampleWriter* m_pWriter = nullptr;
};
WriterType * CreateOutputPin(const char *strName)
Creates a new output pin without an initial stream type.
Interface for sample writers that write to sample streams via output pins.

Runners and Triggers

Each Filter needs at least one function that reacts on Trigger events while the Filter is processing data in the running state. Within ADTF Filters these events are called Triggers. To "catch" a Trigger you should define a so called Runner. This Runner is an entry point to connect ...

The above Filter with one Out Pin and one Runner can be created by the following code:

#include <chrono>
#include <cinttypes>
class cMyFilter: public adtf::filter::cFilter
{
public:
//this will define an ID of the filter can be used to create the filter
ADTF_CLASS_ID_NAME(cMyFilter, "my_filter.filter.example.cid", "My Filter");
cMyFilter()
{
//give the output pin a name
m_pWriter = CreateOutputPin("OutPin_1");
//Create a runner and give a hint to connect a timer with a cycle time of 100ms
//as long as there is no connection within the Filter Graph to this Runner, no Trigger event is received!
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
}
//implement the Process method to react on the Trigger
//the default Filter implementation will register this method to the Runner
adtf::streaming::IRunner* pRunner) override
{
//you may check for the runner
if (m_pRunner == pRunner)
{
LOG_INFO("Time of triggering %" PRIi64, tmTimeOfTriggering.nCount);
//here: processing data is a counter raising
m_nCounter++;
//assign the counter value to the output_sample_data creating an sample for you
//and write the sample to the sample writer
*m_pWriter << adtf::streaming::output_sample_data(tmTimeOfTriggering, m_nCounter);
}
else
{
RETURN_ERROR_DESC(ERR_INVALID_ADDRESS, "Process called with unknown Runner");
}
return {};
}
private:
adtf::streaming::ISampleWriter* m_pWriter = nullptr;
adtf::streaming::IRunner* m_pRunner = nullptr;
uint64_t m_nCounter = 0;
};
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
virtual tResult Process(base::flash::tNanoSeconds tmTrigger, streaming::ant::IRunner *pRunner)
The default Runner function of the graph object.
The Interface defines a runnable item of the GraphObjects providing a IRuntimeBehaviour.
Definition: runner_intf.h:24
Wrapper class that facilitates the handling of output samples.
Definition: sample.h:347

Sample

Data are packed into so called Samples. These Samples are containers for any kind of data and refer to streaming data as well as some important meta information.

Interface API: streaming::ISample

Samples have:

Note
There is no classifier on the Sample that reveals what kind of data is being sent. A classification is done via Stream Type being also part of the stream.

Examples how to create and write Samples

Samples are memory buffers. You should always copy your data into these sample buffers.

The following examples of writing data to a sample are recommended:

Use of output_sample_data<T> template
using namespace adtf::streaming;
//Using output sample data
m_nCounter = 124;
//assign the counter value to a newly created sample with the help of output_sample_data
//and write the sample to the sample writer
*m_pWriter << output_sample_data<uint64_t>(tmTimeOfTriggering, m_nCounter);
Namespace for the ADTF Streaming SDK.
Use of output_sample_data<T> template and set the Sample Time
using namespace adtf::streaming;
// Using output sample data as stack variable
m_nCounter = 124;
output_sample_data<uint64_t> oSampleData(tmTimeOfTriggering, m_nCounter);
//write the sample to the sample writer by releasing oSampleData
*m_pWriter << oSampleData.Release();
Set a memory buffer (and so copy the content) to the sample
//allocate a sample and set the buffer
using namespace adtf::streaming;
using namespace adtf::ucom;
m_nCounter = 124;
//create a sample with time
object_ptr<flash::ISample> pSample;
RETURN_IF_FAILED(alloc_sample(pSample, tmTimeOfTriggering));
//this will copy memory buffer to the samples content, time can be changed here (but tmTimeOfTriggering is the best mostly)
RETURN_IF_FAILED(pSample->Set(tmTimeOfTriggering, &m_nCounter, sizeof(m_nCounter)));
*m_pWriter << ucom_object_ptr_cast<ISample>(pSample);
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
tResult alloc_sample(ucom::ant::iobject_ptr< ucom::ant::IObject > &pSampleObject, const char *strSampleCID)
Helper Function to get a Sample Instance through the adtf::ucom::ant::IRuntime.
Namespace for the ADTF uCOM3 SDK.
Write the content of a memory buffer to the samples buffer
//allocate a sample and set the buffer
using namespace adtf::streaming;
using namespace adtf::ucom;
m_nCounter = 124;
//create a sample
object_ptr<ISample> pSample;
//you should set the time immediatelly while creating the sample
RETURN_IF_FAILED(alloc_sample(pSample, tmTimeOfTriggering));
//to write data you must lock the samples content
//BUT CAUTION: You need also to unlock before writing the data to the Sample Writer !
{
//we automatically unlock if we exit this context
object_ptr_locked<ISampleBuffer> oBuffer;
RETURN_IF_FAILED(pSample->WriteLock(oBuffer, sizeof(m_nCounter)));
//wrapup the counter memory and copy it to the samples buffer
//it is always a copy!
RETURN_IF_FAILED(oBuffer->Write(adtf::base::adtf_memory<uint64_t>(&m_nCounter)));
}
*m_pWriter << pSample;
Template class implementation for the ant::IRawMemory interface (see Supported types for adtf_memory<...
Write a string content to the samples buffer
//allocate a sample and set the buffer
using namespace adtf::streaming;
using namespace adtf::ucom;
std::string strWriteToSampleString("This is a cString I want to write!");
//create a sample
object_ptr<ISample> pSample;
//you should set the time immediatelly while creating the sample
RETURN_IF_FAILED(alloc_sample(pSample, tmTimeOfTriggering));
//to write data you must lock the samples content
//BUT CAUTION: You need also to unlock before writing the data to the Sample Writer !
{
//we automatically unlock if we exit this context
object_ptr_locked<ISampleBuffer> oBuffer;
RETURN_IF_FAILED(pSample->WriteLock(oBuffer, strWriteToSampleString.size()));
//wrapup the string with a adtf_memory and so copy it to the samples buffer
//it is always a copy while writing!
RETURN_IF_FAILED(oBuffer->Write(adtf::base::adtf_memory<std::string>(&strWriteToSampleString)));
}
*m_pWriter << pSample;
Type based writing to a Pin Writer
//...
cMyFilterTypeBased()
{
//give the output pin a name
//create a pin writer to write the sample content directly to the writer!
m_pWriter = CreateOutputPin<adtf::filter::pin_writer<uint64_t>>("OutPin_1");
//Create a runner and give a hint to connect a timer with a cycle time of 100ms
//as long as there is no connection within the Filter Graph to this Runner, no Trigger event is received!
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
}
//implement the Process method to react on the Trigger
//the default Filter implementation will register this method to the Runner
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
//write the sample to the sample writer
//always use the tmTimeOfTriggering, if you have it!
RETURN_IF_FAILED(m_pWriter->Write(tmTimeOfTriggering, m_nCounter++));
return {};
}
private:
adtf::filter::pin_writer<uint64_t>* m_pWriter = nullptr;
adtf::streaming::IRunner* m_pRunner = nullptr;
uint64_t m_nCounter = 0;
//...

Additional Sample Information

There is a possibility to add additional information to a sample. Visit the ISample::GetSampleInfo method for getting to know how to add these additional values to the samples instance:

Remarks
Do not use this to add data! Using it with more than 2 values is expensive!
Set an additional Sample Info value
using namespace adtf::streaming;
//Using output sample data
double m_fValueToWrite;
m_fValueToWrite = 124;
//assign the counter value to a newly created sample!
output_sample_data<double> oSampleData(tmTimeOfTriggering, m_fValueToWrite);
//set the a counter if necessary ... use case: Frame Counter from a camera
RETURN_IF_FAILED(oSampleData.SetSampleInfo(ISampleInfo::SAI_Counter, ++m_nFrameCounter));
//write the sample to the sample writer
*m_pWriter << oSampleData.Release();
static constexpr tHashKey SAI_Counter
A Counter Hash Key to set as additional sample information.
Get an additional Sample Info value
using namespace adtf::streaming;
//read additional info from the sample
m_nFrameCounter = get_sample_info(*pSample, ISampleInfo::SAI_Counter);
adtf::util::cVariant get_sample_info(const ISampleInfo &oSampleInfo, const ISampleInfo::tHashKey &oHash, const adtf::util::cVariant oDefault)
Retrieves a variant value for the hash key oHash out of the sample information oSampleInfo .

Referenced Sample

There are possibilities to reference data instead of copying it, but usually the Filter receiving samples will queue them. For that, you have to make sure, the referenced data lives as long as the Sample lives within the Filter Graph. If you really need to use that have a look at cReferenceSample.

Note
So please keep in mind: If you are using referenced sample buffers, your whole application must be able to deal with that!
Usually this does not work for Filters you did not program by yourself.

Stream Type

A Stream Type has:

  • a Stream Meta Type name which can be i.e. "adtf/video", "adtf/default", "adtf/audio" or "adtf/plaintype".
  • Properties to describe the content of Samples.

A Stream Meta Type defines which properties are set within the Stream Type.

  • For a "adtf/video" this will be i.e. "width", "height", "bitsperpixel", "pixelformat".
  • For a "adtf/default" this will be a structural description of structral data with "md_struct" and "md_definitions"
  • see also the Overview of writing and reading samples

See Chapter: Stream Meta Types for more information on existing Stream Types.

In Pins and Sample Readers/Pin Readers

The incoming Samples can be read via In Pins and Sample Readers. In Pins identify a connection point to the Filter. The incoming Stream contains Samples and Stream Types. Sample Readers (interface streaming::ISampleReader) are queues to buffer the incoming data.

Usually you will create one Reader per In Pin or one In Pin per Reader. The simplest way to create an In Pin is to call the CreateInputPin function within the CTOR.

Example to create a In Pin in Filters CTOR
#include <chrono>
#include <cinttypes>
class cMyFilter : public adtf::filter::cFilter
{
public:
ADTF_CLASS_ID_NAME(cMyFilter, "my_filter.filter.example.cid", "My Filter");
cMyFilter()
{
//create the pins AND the readers
//we store them in member variables
//we use the second and third parameter with "false" to switch off the Inner Trigger Pipe (see chapter "Trigger Pipe")
m_pReader1 = CreateInputPin("InPin_1", false, false);
//Create a runner and give a hint to connect a timer with a cycle time of 100ms
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms), true);
}
//implement the process method to react on the trigger
adtf::streaming::IRunner* pRunner) override
{
//you may check for the runner
if (m_pRunner == pRunner)
{
// example to read from "InPin_1"
//it is important to read until the queue is empty
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
//get the content of the Sample
//we expect to deal with the counter has been writtem in the examples for sample writer (see example "Out Pins and Sample Writers/Pin Writers ")!
//we use the sample_data helper template to retrieve the sample buffers content
adtf::streaming::sample_data<uint64_t> oSampleCounter(pSample);
auto nTime = oSampleCounter.GetTime();
LOG_INFO("Received data on \"InPin_1\": %" PRIu64 " at time %" PRIi64, *oSampleCounter, nTime);
}
}
else
{
RETURN_ERROR_DESC(ERR_INVALID_ADDRESS, "Process called with unknown runner");
}
return {};
}
private:
adtf::streaming::IRunner* m_pRunner = nullptr;
adtf::streaming::ISampleReader* m_pReader1 = nullptr;
};
ReaderType * CreateInputPin(const char *strName, bool bDataInTrigger=true, bool bForwardTriggerViaOutputPins=true)
Creates a new input pin with an anonymous stream type.
Interface for sample reads that read from sample streams via input pins.
Easy data access for input samples of non trivial type @T (see Supported types for adtf_memory<T> for...
Definition: sample_data.h:428
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:163

Examples for Reading Samples

The following examples of reading data from Samples are recommended:

Reading Samples with GetNextSample
// example to read from "InPin_1"
//this will return an empty sample and error if the queue is empty
//it is important to read until the queue is empty
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
//get the content of the Sample
//we expect to deal with the counter has been writtem in the examples for sample writer before!
//we use the sample_data helper to retrieve the sample buffers content
adtf::streaming::sample_data<uint64_t> oSampleCounter(pSample);
auto nTime = oSampleCounter.GetTime();
LOG_INFO("Received data on \"InPin_1\": %" PRIu64 " at time %" PRIi64, *oSampleCounter, nTime);
}
Reading Samples with streaming operator
bool bEndOfStream = false;
while (!bEndOfStream)
{
//read from "InPin_1"
//read with streaming operator
*m_pReader1 >> pSample;
//this will return a valid sample if data where received
//this will return an empty sample if queue is empty
//it is important to read until the queue is empty
if (pSample)
{
//get the content of the Sample
//we expect to deal with the counter has been writtem in the examples for sample writer before!
//we use the sample_data helper to retrieve the sample buffers content
adtf::streaming::sample_data<uint64_t> oSampleCounter(pSample);
auto nTime = oSampleCounter.GetTime();
LOG_INFO("Received data on \"InPin_1\": %" PRIu64 " at time %" PRIi64, *oSampleCounter, nTime);
}
else
{
bEndOfStream = true;
}
}
Access Sample Buffer memory with static_cast
using namespace adtf::ucom;
using namespace adtf::streaming;
object_ptr<const ISample> pSample;
//this will return an empty sample and error if the queue is empty
//it is important to read until the queue is empty
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
//enter the sample buffer with a read lock
object_ptr_shared_locked<const ISampleBuffer> oBuffer;
RETURN_IF_FAILED(pSample->Lock(oBuffer));
auto nTime = pSample->GetTime();
//this is also possible with stuctured data
auto pValue = static_cast<const uint64_t*>(oBuffer->GetPtr());
LOG_INFO("Received data on \"InPin_1\": %" PRIu64 " at time %" PRIi64, *pValue, nTime);
}
Read Sample Buffers content with adtf_memory for trivial types
using namespace adtf::ucom;
using namespace adtf::streaming;
using namespace adtf::base;
object_ptr<const ISample> pSample;
//this will return an empty sample and error if the queue is empty
//it is important to read until the queue is empty
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
//enter the sample buffer with a read lock
object_ptr_shared_locked<const ISampleBuffer> oBuffer;
RETURN_IF_FAILED(pSample->Lock(oBuffer));
auto nTime = pSample->GetTime();
//Read a trivial type based value
//this is also possible with stuctured data that are trivially copyable
uint64_t nValue = {};
RETURN_IF_FAILED(oBuffer->Read(adtf_memory<uint64_t>(&nValue)));
LOG_INFO("Received data on \"InPin_1\": %" PRIu64 " at time %" PRIi64, nValue, nTime);
}
Namespace for the ADTF Base SDK.
Read Sample Buffers content with adtf_memory for string types
using namespace adtf::ucom;
using namespace adtf::streaming;
using namespace adtf::base;
object_ptr<const ISample> pSample;
//this will return an empty sample and error if the queue is empty
//it is important to read until the queue is empty
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
//enter the sample buffer with a read lock
object_ptr_shared_locked<const ISampleBuffer> oBuffer;
RETURN_IF_FAILED(pSample->Lock(oBuffer));
auto nTime = pSample->GetTime();
//Read a string value
//this will also work for std::string, std::vector and std::array
std::string strValue = {};
RETURN_IF_FAILED(oBuffer->Read(adtf_memory<std::string>(&strValue)));
LOG_INFO("Received data on \"InPin_1\": %s at time %" PRIi64, strValue.c_str(), nTime);
}

Reading Stream Types

All examples above are using untyped data (anonymous data)! The Filter implementation is receiving data while the code expects always some tUInt64 values. To make sure the Filter will only accept expected sample content, the data must be classified and stream typed by a Stream Type. The Stream Type itself is part of the stream. While receiving, you will always get a Stream Type first, then Samples and a Stream Type again if it changes. For more detailed information see also Stream Type and Stream Meta Type.

Reading Samples by Data Trigger

The examples above used only Runners which should be connected within the Filter Graph with an active Timer Runner (see Active Runner). But we can also use a Data Trigger in our code, so we will be called after an incoming data event on a specified pin.

#include <cinttypes>
class cMyFilter : public adtf::filter::cFilter
{
public:
ADTF_CLASS_ID_NAME(cMyFilter, "my_filter.filter.example.cid", "My Filter");
cMyFilter()
{
using namespace adtf::streaming;
using namespace adtf::filter;
//create the pins AND the readers
//we store them in member variables
//only streams with adtf::streaming::stream_type_plain<uint64_t>() ... otherwise a stream error is set
//we still use the third and fourth parameter with "false" to switch off the Inner Trigger Pipe (see chapter "Trigger Pipe")
auto pCreatedReader1 = CreateInputPin("InPin_1",
stream_type_plain<uint64_t>(),
false,
false);
//We setup our own AcceptType function
//This receives a type if there is one within the incoming stream
//How this works: ... each GetNextSample() call will also receive the Stream Types, if so, this function will be called
// usually it will only accept compatible types
//REMARK: to use SetAcceptTypeCallback the type "adtf::filter::cPinReader" is used instead of "adtf::streaming::ISampleReader"
pCreatedReader1->SetAcceptTypeCallback(
[this](const adtf::ucom::iobject_ptr<const IStreamType>& pStreamIncomingType) -> tResult
{
//this is the same functionality implemented if you do not set up a own AcceptTypeCallback
//see the chapter for "Stream Type and Stream Meta Type" what it means to be compatible
return is_compatible(pStreamIncomingType, stream_type_plain<uint64_t>());
});
//now we initialize the member
m_pReader1 = pCreatedReader1;
//Create a runner and connect it to the data trigger of the "InPin_1
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint("InPin_1"), true);
}
//implement the process method to react on the data trigger
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* pRunner) override
{
//you may check for the runner
if (m_pRunner == pRunner)
{
// example to read from "InPin_1"
//this will return an empty sample and error if the queue is empty
//it is still important to read until the queue is empty, since ADTF 3 may send many samples, but triggering it only once
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
//get the content of the Sample
//we expect to deal with the counter has been writtem in the examples for sample writer before!
//we use the sample_data helper to retrieve the sample buffers content
adtf::streaming::sample_data<uint64_t> oSampleCounter(pSample);
auto nTime = oSampleCounter.GetTime();
LOG_INFO("Received data on \"InPin_1\": %" PRIu64 " at time %" PRIi64, *oSampleCounter, nTime);
}
}
else
{
RETURN_ERROR_DESC(ERR_INVALID_ADDRESS, "Process called with unknown runner");
}
return {};
}
private:
adtf::streaming::IRunner* m_pRunner = nullptr;
adtf::filter::cPinReader* m_pReader1 = nullptr;
};
Base object pointer to realize binary compatible reference counting in interface methods.
adtf::streaming::ant::cDynamicSampleReader cPinReader
use cSampleReader as cPinReader
Namespace for the ADTF Filter SDK.
tResult is_compatible(const ant::IStreamType &oCheckedType, const ant::IStreamType &oExpectedType)
Checks whether oCheckedType is compatible with oExpectedType.

As you might see in the code example: There is no change on the Process() implementation. This is one of the ADTF 3 benefits! It might be easy to configure Data or Time-Triggered behaviour within a very late stage.

Data Trigger with ProcessInput
For your convienience you are able to forward each Data Trigger to a single method, the ProcessInput method. Also one single AcceptType is possible.
class cMyFilter : public adtf::filter::cFilter
{
public:
ADTF_CLASS_ID_NAME(cMyFilter, "my_filter.filter.example.cid", "My Filter");
cMyFilter()
{
//create the pins AND the readers
//we store them in member variables
//only streams with adtf::streaming::stream_type_plain<uint64_t>() ... otherwise a stream error is set
m_pReader1 = CreateInputPin("InPin_1",
true, //create a data in trigger and "connect" the ProcessInput to it (see the Inner Trigger Pipe (see chapter "Trigger Pipe"))
true); //each data trigger is also forwarded to the output (if any)
//this is the same call as above
//it will also lead into a synchronized ProcessInput/AcceptType call
m_pReader2 = CreateInputPin("InPin_2",
}
//override the process method to react on the data trigger
//Note: You can also call the default implementation
// \c RETURN_IF_FAILED(cFilter::AcceptType(pReader, pType));
//at the beginnging for basic type check, perform additonal checks afterwards or standalone as follows
{
if (pReader == m_pReader1)
{
//do something accepting incoming type on "InPin_1"
}
else if (pReader == m_pReader2)
{
//do something accepting incoming type on "InPin_2"
}
return {};
};
//override the process method to react on the data trigger
{
if (pReader == m_pReader1)
{
//this event was triggered on incoming samples of "InPin_1"
}
else if (pReader == m_pReader2)
{
//this event was triggered on incoming samples of "InPin_2"
}
return {};
}
private:
adtf::streaming::IRunner* m_pRunner = nullptr;
adtf::streaming::ISampleReader* m_pReader1 = nullptr;
adtf::streaming::ISampleReader* m_pReader2 = nullptr;
};
virtual tResult AcceptType(streaming::flash::ISampleReader *pReader, const ucom::ant::iobject_ptr< const streaming::ant::IStreamType > &pType)
Called whenever a new type is read from a reader that has no other accept type callback (streaming::a...
virtual tResult ProcessInput(base::flash::tNanoSeconds tmTrigger, streaming::flash::ISampleReader *pReader)
Provides access to the reader of incoming data.
Generator template to create an instance of a ant::IStreamType class for penguin::stream_meta_type_pl...
The good thing about this single point of processing is:
  • It is easy to use.
  • It is already synchronized
Each ProcessInput and each AcceptType locks the Filter, no parallel call within the Filter, no race conditions can appear.
But you should also consider CreateInputPinWithCallback before starting to implement huge if-else-if or switch-case commands.

Properties

Properties are the possibility to adjust some capabilities of the Filter without re-compiling. While using it within the Filter Graph you can adjust its behaviour.

Example for a property
class cMyFilter : public adtf::filter::cFilter
{
public:
ADTF_CLASS_ID_NAME(cMyFilter, "my_constant_data_generator.filter.example.cid", "My Constant Data Generator");
cMyFilter()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//give the output pin a name
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//Create a runner and give a hint to connect a timer with a cycle time of 100ms
//as long as there is no connection within the Filter Graph to this Runner, no Trigger event is received!
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
//register the property value with a name
RegisterPropertyVariable("constant", m_nConstantProperty);
}
//implement the Process method to react on the Trigger
//the default Filter implementation will register this method to the Runner
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
//the property value can be access via assignment
uint32_t nCurrentValueToWrite = m_nConstantProperty;
//we write the value into the writer for the pin
RETURN_IF_FAILED(m_pWriter->Write(tmTimeOfTriggering, nCurrentValueToWrite));
}
private:
adtf::streaming::IRunner* m_pRunner = nullptr;
adtf::filter::pin_writer<uint32_t>* m_pWriter = nullptr;
//define a property variable that contains the current value of your property
adtf::base::property_variable<uint32_t> m_nConstantProperty = 0;
};
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
Property Variable template for the given T.
For a full description see ADTF Base SDK.

Filter State Machine

The following state machine is provided by the Filter Base and implementation.

Streaming Service

Streaming Services - Sources and Sinks - are components to define system entry points and system exit points. Usually these implementations are the first or last item of a Data Pipe or Trigger Pipe.

Note
Please note that as of ADTF 3.8 and later, we are no longer limiting the use of Streaming Services to Streaming Graphs, but you can use these components in a Filter Graph as well. You are encouraged to use Filter Graphs in favour of Streaming Graphs as they allow for greater flexibility and reusability.

The main difference between Streaming Services and Filters are:
Streaming Services are allowed to create threads and timers internally without a Runner! This is, because they usually deal with devices and real hardware.

We recommend to use system::kernel_thread or system::kernel_timer for thread and timer handling!

Streaming Source

The Streaming Source is a system entry point for Samples and Data Triggers coming i.e. from a hardware. Usually you will implement any device link as Streaming Source.

Streaming Sources can be used to

  • read streaming data like i.e. video streams from cameras, CAN messages from CAN-Bus devices
  • harddisk reader to provide filebased simulation data
  • network or interprocess connection to receive samples from a distributed system like ROS (Robotic operating system) or FEP (Functional Engineering Platform).
Coding Examples:
Use the CMAKE macro adtf_add_streaming_service to create a plugin containing a Streaming Source.
Following requirements exists for Streaming Sources:
  • Only Out Pins are allowed!
  • Sources are allowed to create threads and timers, if it is not possible to use a Runner
  • Make sure to create Data Triggers (streaming::trigger, ManualTrigger) after finishing data transmission.

Streaming Sink

The Streaming Sink is an ADTF system exit point for Samples and Triggers. Usually you will implement any device link as a Streaming Sink.

Streaming Sinks can be used to

  • write streaming data like raw CAN messages to a CAN-Bus device
  • write FlexRay cycles or Automotive Ethernet PDUs to devices
  • create a harddisk access for filebased data logging and high performance recording
Coding Examples:
Use the CMAKE macro adtf_add_streaming_service to create a plugin containing a Streaming Sink.
Following requirements exist for Streaming Sinks:
  • Only In Pins are allowed
  • Sinks are allowed to create own threads and timers, if it is not possible to use a Runner.
  • Do not block an incoming Trigger within your implementation. As long as you block incoming Triggers you block the Trigger Pipe.
  • You are advised to describe your runtime behaviour within your documentation.

Streaming Service State Machine (within Filter Graph)

Data Pipe

The Data Pipe within a Filter Graph is part of the connection between Sample Writers and Sample Readers. Within ADTF one common Data Pipe starts at the Sample Writer and its Out Pin of a sending Filter, over one Sample Stream, to one or more In Pins and their corresponding Sample Readers.

Substreams

Using substreams is one possibility to reduce Pin and Connection appearance within the Filter Graph.
See Chapter: Substreams for more details and how to use them.

Trigger Pipe

The Trigger Pipe within ADTF is a connection based Trigger Path through the Filter Graph. Usually it starts with an Active Runner that triggers the connected items like a Runner of a Filter.

The above example of a Trigger Pipe shows the Active Timer Runner as a Trigger Pipe Source. Trigger Pipe Sources are the very beginning of a Trigger Pipe. Each connected item can be discovered with the help of the ITriggerPipeSource interface. The connected items of a the Trigger Pipe are called: Trigger Pipe Item. They are also sources for the further items in the Trigger Pipe. At the end, a collection of Trigger Pipes in your Filter Graph will show you a callgraph and the critical paths.

Note
The Trigger event as seen in the picture above is not really an object, it is a Run(RUN_TRIGGER) call on Trigger Pipe Items.

Following components can be Trigger Pipe Sources only:

Following components can be Trigger Pipe Items:

Sample Stream

ADTF provides a default implementation of the ISampleStream. A Sample Stream is a mediator between Pins and forward Samples and Data Triggers.

There are different operation modes intented within current Streaming API design, but only two modes are realized yet. The delivered default Sample Stream can be used within the Filter Graph by operating in Push mode.

PushRead Mode - Synchronous Data Pipes

With the Streaming API the ISampleStreamAccess::PushRead was defined.

While writing data onto a Stream the Sample Stream will have different states:

Sample Stream - write()


After writing data onto a Sample Stream the Samples will be queued within a writers queue. There is no Trigger event yet emitted!

Note
Keep in mind: At least one Stream Type will be written onto the stream before any kind of data is transmitted.

Sample Stream - flush()

After flushing the Sample Writer the Samples will be queued in all Sample Reader queues. There is only a temporary reader queue within the Sample Stream to lock the content a short time.


Flushing the writer queue will temporarly forward the samples to Reader Queues.


Still within the flush() call, the Samples (and Stream Types) will be read into the Sample Reader queue of the Sample Reader 1 by a Push event. A Sample Reader using the PushRead mode MUST read the Samples immediatelly while the Push event appears, otherwise the Samples are lost for this Reader.


The Samples (and Stream Types) will be read into the Sample Reader queue of the Sample Reader 2 by a Push event. This is now the end state of the queues after flushing.


It is possible to use many different kinds of Sample Readers/Pin Readers: Look at ADTF Filter SDK! If using the Filter implementation you do not need to call that flush separately!

Sample Stream - trigger()


The trigger call is to create a Data Trigger on the Triger Pipe. Following call sequence:

  • OutPin_1->Run(RUN_TRIGGER)
  • Sample_Stream->Run(RUN_TRIGGER)
  • InPin_1->Run(RUN_TRIGGER)
  • InPin_2->Run(RUN_TRIGGER)
Remarks
The "old" ADTF 2 Pin::Transmit call is now separated into a Writer::Write, Writer::Flush and Writer::Trigger call!

Asynchronous Mode - Asynchronous Data Pipes

Note
This mode is not yet implemented in ADTF 3

But so far: there is no Push event. The reader queues within the Sample Stream (not within the Sample Readers) are used and each Sample Reader directly reads from there.

Push Mode - Queue Free Data Pipes

Note
detailed description in progress

Active Runner

Active Runners are the single self-active objects of the Filter Graph. They are the very beginngin of a Trigger Pipe.
By default ADTF will deliver 2 different kinds of Active Runner you may use in you Filter Graph:

  • A Timer Runner will periodically activate a Trigger Pipe with timing constraints. It will send a Time Trigger. (see also Timer Runner)
  • A Thread Runner will cyclically activate a Trigger Pipe with NO timing constraints (or only single shot). It will send a Thread Trigger. (see also Thread Runner)

See also Delivered Active Runners for base components and Qt5 Key Event Runner Plugin as an example to implement own Active Runner.

Inner Trigger Pipe of a Filter

Usually, a Filter works in a context of a RUN_TRIGGER call that is catched by a Runner. This Runner is part of the configured Trigger Pipe within the Filter Graph.

To describe the internal runtime behaviour of the Filter, you should use Inner Pipes. Inner Pipes are inside the Filter and describe how the Filter will forward the context of a Trigger to its Out Pins. Otherwise the Filter is the end point of a Trigger Pipe and might be the beginning of another, that is "hidden".

Hidden Trigger Pipes are trigger paths, that are not discoverable that they belong to another trigger path. They will appear, if you create ManualTriggers within a Filter implementation.

The ADTF Filter SDK will help you to create valid Filters with well formed Inner Pipes. Have a look at the following code examples, what will happen to the Inner Pipe of that particular Filter or Streaming Service.

Example 1: Simple Inner Pipe

cMyFilter1()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//Create a runner with timer hint
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
}
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
uint32_t nSampleValue = 123;
//we write the value into a pin
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
Trigger Call Sequence
  • (1) incoming RUN_TRIGGER call on "1" will call and wait for "Runner_1"
  • (2) "Runner_1" will call and wait for Process()
  • (3) after (2) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (4) only after (4) returned, (3) will return, (1) will return

Example 2: No Inner Pipe to the Outputs

cMyFilter2()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//Create a runner with timer hint
//BUT the Runner will NOT formward the Trigger to the Out Pins!
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms), false);
}
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
uint32_t nSampleValue = 123;
//we write the value
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
Trigger Call Sequence
  • incoming Trigger on (1) will call and wait on "Runner_1"
  • "Runner_1" will call and wait on (2) Process()
  • after (2) returned (1) returns
Remarks
There is NO trigger forwarded within this Filter to the second Trigger Pipe on "OutPin_1". It will NOT flush and NOT trigger the Sample Writer.

Example 3: Inner Pipe and second hidden Trigger Pipe

cMyFilter3()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//Create a runner with timer hint
//BUT the Runner will NOT formward the Trigger to the Out Pins!
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms), false);
}
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
using namespace adtf::streaming;
uint32_t nSampleValue = 123;
//we write the value, flushes and triggers
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
*m_pWriter << flush << trigger;
}
WRITER_TYPE & flush(WRITER_TYPE &oWriter)
Global function template to flush a writer.
WRITER_TYPE & trigger(WRITER_TYPE &oWriter)
Global function template to trigger a writers sample stream manually.
Trigger Call Sequence
  • (A1) incoming Trigger will call and wait for "Runner_1"
  • (A2) "Runner_1" will call and wait for Process()
Remarks
(A2) will use a ManualTrigger call and so it triggers a second hidden Trigger Pipe (B1) on "OutPin_1" and wait until (B2) returns! This is not really wrong, but this hidden dependency is difficult to maintain. Additionally you must document that behaviour to the users!

Example 4: Inner Pipe with Data Trigger

cMyFilter4()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//create a reader (but with no default data trigger, but with a trigger forward to the output
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), false, true);
//Create a runner with data trigger on "InPin_1"
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint("InPin_1"), true);
}
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
using namespace adtf::streaming;
//read until the end of all queued sample
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
uint32_t nSampleValue = sample_data<uint32_t>(pSample);
m_pWriter->Write(get_sample_time(pSample), nSampleValue);
}
}
base::flash::tNanoSeconds get_sample_time(const ucom::ant::iobject_ptr< const ant::ISample > &pSample)
Returns the sample time stamp with nanosecond precision.
Trigger Call Sequence
  • (1) incoming Data Trigger on "InPin_1" will call and wait for "Runner_1" on (2)
  • (3) "Runner_1" will call and wait for Process()
  • (4) after (3) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (5) only after (5) returned, (4) will return, (2) will return and last (1) will return

Example 5: Inner Pipe with only 1 Data Trigger catched

cMyFilter5()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//create a reader (but with no default data trigger, but with a trigger forward to the output)
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), false, true);
//create a second reader (but with no default data trigger and with no trigger forward)
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<uint32_t>(), false, false);
//Create a runner with data trigger hint on "InPin_1"
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint("InPin_1"), true);
}
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
using namespace adtf::streaming;
//read all until last sample
RETURN_IF_FAILED(m_pReader1->GetLastSample(oLastSample1));
RETURN_IF_FAILED(m_pReader2->GetLastSample(oLastSample2));
//calculate
uint32_t nSampleValue = sample_data<uint32_t>(oLastSample1) + sample_data<uint32_t>(oLastSample2);
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
Trigger Call Sequence
  • (A1) incoming Data Trigger on "InPin_1" will call and wait for "Runner_1" on (A2)
  • (A3) "Runner_1" will call and wait for Process()
  • (A4) after (A3) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
  • (B1) The second incoming Trigger Pipe immediatelly returns, the Pipe ends here!

Example 6: Inner Pipe using ProcessInput

cMyFilter6()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//create a reader (and add a runner for that, and forward the trigger)
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), true, true);
//create a second reader (but with no default data trigger and no trigger forward to the output)
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<uint32_t>(), false, false);
}
tResult ProcessInput(adtf::streaming::ISampleReader* pReader,
{
//we work only in context of "InPin_1" triggers
using namespace adtf::streaming;
if (pSample.Get())
{
RETURN_IF_FAILED(m_pReader2->GetLastSample(oLastSample2));
//calculate
uint32_t nSampleValue = sample_data<uint32_t>(pSample) + sample_data<uint32_t>(oLastSample2);
m_pWriter->Write(get_sample_time(pSample), nSampleValue);
}
else
{
RETURN_ERROR(ERR_INVALID_ADDRESS);
}
}
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
virtual T * Get() const =0
Get raw pointer to shared object.
Trigger Call Sequence
This is exactly the same like Example 5: Inner Pipe with only 1 Data Trigger catched but the Process is additionally forwarded to a ProcessInput() call.
  • (A1) incoming Data Trigger on "InPin_1" will call and wait for "InPin_1_trigger" on (A2)
  • (A3) "InPin_1_trigger" will call and wait for Process()
  • (A4) after (A3) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
  • (B1) The second incoming Trigger Pipe immediatelly returns, the Pipe ends here!

Example 7: Inner Pipe using ProcessInput and 2 Data Triggers

cMyFilter7()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
//create a reader (but with no default data trigger, but with a trigger forward to the output
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), true, true);
//create a second reader (but with no default data trigger, but with a trigger forward to the output
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<uint32_t>(), true, true);
}
tResult ProcessInput(adtf::streaming::ISampleReader* pReader,
{
//we work only in context of "InPin_1" and "InPin_2" triggers
using namespace adtf::streaming;
if (pSample.Get())
{
if (pReader == m_pReader1)
{
RETURN_IF_FAILED(m_pReader2->GetLastSample(oLastOtherSample));
uint32_t nSampleValue = sample_data<uint32_t>(pSample) + sample_data<uint32_t>(oLastOtherSample);
m_pWriter->Write(get_sample_time(pSample), nSampleValue);
}
else if (pReader == m_pReader2)
{
RETURN_IF_FAILED(m_pReader1->GetLastSample(oLastOtherSample));
uint32_t nSampleValue = sample_data<uint32_t>(pSample) + sample_data<uint32_t>(oLastOtherSample);
m_pWriter->Write(get_sample_time(pSample), nSampleValue);
}
else
{
RETURN_ERROR(ERR_INVALID_ADDRESS);
}
}
else
{
RETURN_ERROR(ERR_INVALID_ADDRESS);
}
}
Trigger Call Sequence on A
  • (A1) incoming Data Trigger on "InPin_1" will call and wait for "InPin_1_trigger" on (A2)
  • (A3) "InPin_1_trigger" will call and wait for Process() and this the ProcessInput()
  • As long as (B3) is currently working, this pipe is blocked
  • (A4) after (A3) returned "InPin_1_trigger" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
Trigger Call Sequence on B
  • (B1) incoming Data Trigger on "InPin_2" will call and wait for "InPin_2_trigger" on (B2)
  • (B3) "InPin_2_trigger" will call and wait for Process() and this the ProcessInput()
  • As long as (A3) is currently working, this pipe is blocked
  • (B4) after (B3) returned "InPin_2_trigger" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (B5) only after (B5) returned, (B4) will return, (B2) will return and last (B1) will return

Example 8: Inner Pipe with one Runner and 2 Data Triggers

cMyFilter8()
{
using namespace adtf::filter;
using namespace adtf::streaming;
//create a writer
m_pWriter = CreateOutputPin<pin_writer<tUInt32>>("OutPin_1", stream_type_plain<tUInt32>());
//create a reader (but with no default data trigger, and no forward to the output
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<tUInt32>(), false, false);
//create a second reader (with also no default data trigger, and no forward to the output
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<tUInt32>(), false, false);
//Create a runner with a vector of data triggers on "InPin_1" and "InPin_2"
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint({"InPin_1", "InPin_2"}), true);
}
tResult Process(adtf::base::tNanoSeconds tmTimeOfTriggering,
adtf::streaming::IRunner* /*pRunner*/) override
{
using namespace adtf::streaming;
//read until the end of all queued sample on the first reader
while (IS_OK(m_pReader1->GetNextSample(pSample)))
{
tUInt32 nSampleValue = sample_data<tUInt32>(pSample);
m_pWriter->Write(get_sample_time(pSample), nSampleValue);
}
//read until the end of all queued sample on the second reader
while (IS_OK(m_pReader2->GetNextSample(pSample)))
{
tUInt32 nSampleValue = sample_data<tUInt32>(pSample);
m_pWriter->Write(get_sample_time(pSample), nSampleValue);
}
//if this returns the trigger will be forwarded to the output trigger
}
uint32_t tUInt32
type definition for unsigned integer values (32bit) (platform and compiler independent type).
Trigger Call Sequence on A
  • (A1) incoming Data Trigger on "InPin_1" will call and wait for "Runner_1" (A2) (without a "InPin_1_trigger" call in between)
  • (A3) "Runner_1" will call and wait for Process() to return
  • By using CreateRunner() the filters Process() call is automatically synchronized against (B3) with a mutex
  • After (A3) returned (A4) and (A5) can be performed.
  • (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
Trigger Call Sequence on B
  • (B1) incoming Data Trigger on "InPin_2" will also directly call and wait on "Runner_1" on (B2)
  • (B3) "Runner_1" will call and wait for Process()
  • (B3) is synchronized and protected against parallel (A3) call.
  • (B4) is called after (B3) returned from "Process()" will call and wait for a RUN_TRIGGER call at "OutPin_1"
  • (B5) only after (B5) returned, (B4) will return, (B2) will return and last (B1) will return

ManualTrigger call

It is possible to call ManualTrigger after writing Samples and Stream Types onto the Sample Writer. But you should prevent that.
The reasons for this is:

  • Each ManualTrigger call might be the beginning of a new Trigger Pipe within the Filter Graph!
  • Each ManualTrigger may double or at least raise the amount of Trigger calls in the system, if there are already Inner Pipes configured!

But the ManualTrigger must be called by a Streaming Source that is mostly the beginning of Trigger Pipe with Data Triggers.

The design purpose for a good Filter and so a good Filter Graph is, to prevent the ManualTrigger call. Only Filters with a well formed Inner Pipe are reconfigurable in their runtime behaviour.

Client/Server Connections

The Filters interface binding items are BindingObjects.
A Filter can define some common functionality which must be used by others through a interface agreement.
  • To publish such an interface you can define a IBindingServer
  • To subscribe and use such an interface you can define a IBindingClient
While interconnecting these items the client can obtain the servers interface.
Note
Do not use interface binding to add samples to the Data Pipe or to trigger the Runner Pipe. Otherwise you could have undefined behaviour!

Binding Proxy

The Binding Proxy will connect formal ADTF Client/Server interfaces between Filters. It always acts as mediator between these clients and servers.

Have a look at the Demo Interface Binding Plugin example, to see how to program and use it.

Sub Graph

Sub Graphs are Filter Graph instances that are added to another Filter Graph.

Note
These Sub Graphs are treated like Filters!

On changing Filter Graphs state the Sub Graphs (subgraph) state will be set before all other objects as one specific characteristic. See States of the Filter Graph for more details.

Filter Graph Ports

Since Filter Graphs can be added as a Sub Graph the Filter Graph must clearly define its data binding, interface binding and runtime behaviour to the Filter Graph it is embedded to.

The Filter Graphs data access points are Data In Ports and Data Out Ports.
  • Data Out Ports are treated similar to Out Pins of the Sub Graph to define outgoing streaming data.
  • Data In Port are treated similar to In Pins of the Sub Graph to define incoming streaming data.
The Filter Graphs runtime behaviour can be configured with the help of Runner Ports.
  • Runner Ports are treated similar to Runner and will forward the Trigger Pipe
The Filter Graphs interface binding items are Client Ports and Server Ports.
  • To forward a Server Object you can add a Server Port.
  • To forward an Client Object you can add a Client Port.

States of the Filter Graph

The Filter Graph tates can be set by the same interface function like the Filter, the SetState(tFilterGraphState) method.

Usually the Filter Graph is managed together with the Streaming Graph by the Session Manager. It lives only while Runlevel RL_FilterGraph and RL_Running. Have a look at Chapter: Initialization Order to understand the states of the Filter Graph.

Streaming Graph as a Top-Level Graph

Note
Please note that as of ADTF 3.8 and later, we are no longer advocating the use of Streaming Graphs but encourage you to use a Filter Graph instead. They allow for greater flexibility and reusability and can now provide all the required functionality (Streaming Services, etc.).

The ADTF Streaming Graph is a Top-Level Graph to create well defined ADTF system borders and interconnections with the "outside" world. You may add

  • Streaming Sources where samples are produced and data is read from devices or process borders
  • Streaming Sinks where samples and data are sent to devices or process borders
  • Exactly one Filter Graph (IFilterGraph) where samples are manipulated, functions are calculated and data is transformed.
Following Named Graph Objects are accepted:
If a Streaming Service is added it will immediatelly set to adtf::streaming::ant::IStreamingService::tStreamingState" "State_Constructed".
A added Filter Graph will be set to IFilterGraph::tFilterGraphState::State_Constructed.

Stream Port - to embed a Filter Graph

The Streaming Graph is able to automatically connect a Filter Graph which is embedded. To do so a Filter Graph has to define Sample Streaming Ports. These Sample Streaming Ports are data access points of the Filter Graph. Stream Ports define a Port ID (IStreamingPort::GetPortID). The Port ID will be used to connect to the defined Stream Port of the Streaming Graph with the same name.

Note
Example: If you create a Stream Port with the name "VIDEO" within the Stream Graph and if you define a Sample Streaming Ports in the Filter Graph to embed, there will be an automatic connection between them.
It is possible to add more than one Sample Streaming Port with the same Port ID. If you add 3 Sample Streaming Ports you will get 3 connections!

Sample Streaming Ports in the Filter Graph

The Filter Graphs streaming access points are Sample Streaming Ports.

Init Sequence

The Streaming Graph initializes in a well defined order from "left" to "right". This will offer the opertunity to discover connections from left to the right.

For more detail see Reaching Runlevel Streaming Graph and Reaching Runlevel Filter Graph.

Start Streaming Sequence

The Streaming Graph starts streaming in the order from "right" to "left". This will prevent any processing of Samples while a Filter or Streaming Sink is not yet in running state. This means: Streaming Sources comes always last.

For more detail see Reaching Runlevel Running.

Connections within the Filter Graph

To build up a graph Synchronous connections are part of a Trigger Pipe. Connections have a category left, middle or right and some constrainst which item is connectable to whom. Consider the following table:

Source Dest Sync / Async
Client Port (IFilterGraphInterfaceClientPort) Filter Binding Client (IBindingClient) does not matter
Data InPort (IFilterGraphDataInPort) Sample Stream (ISampleStream) always sync
Sample Stream InPort (ISampleStreamInPort) Sample Stream (ISampleStream) always sync
Sample Stream InPort (ISampleStreamInPort) Filter InPin (IInPin) sync or async
Filter Binding Server (IBindingServer) Binding Proxy (IBindingProxy) does not matter
Filter OutPin (IOutPin) Sample Stream (ISampleStream) sync or async
Filter OutPin (IOutPin) Sample Stream OutPort (ISampleStreamOutPort) sync or async
Filter InPin (IInPin) Filter Runner (IRunner) sync (defines a DataTrigger)
Filter Runner (IRunner) Filter OutPin (IOutPin) sync (defines a internal DataTrigger on the OutPin)
Active Runner (IActiveRunner) Filter Runner (IRunner) always sync (defines the TimeTrigger or ThreadTrigger)
Active Runner (IActiveRunner) Sample Stream (ISampleStream) always sync
Filter Runner (IRunner) Filter Runner (IRunner) sync (forwards the Trigger)
Sample Stream (ISampleStream) Filter InPin (IInPin) sync or async
Sample Stream (ISampleStream) Sample Stream (ISampleStream) sync or async
Sample Stream (ISampleStream) Sample Stream OutPort (ISampleStreamOutPort) sync or async
Sample Stream (ISampleStream) Data OutPort (IFilterGraphDataOutPort) always sync
Binding Proxy (IBindingProxy) Filter BindingClient (IBindingClient) does not matter
Binding Proxy (IBindingProxy) Server Port (IFilterGraphInterfaceServerPort) does not matter
Runner Port (IFilterGraphRunnerPort) Filter Runner (IRunner) always sync
Note
Note that the Substream Selector as special adaption of the Sample Stream also implements an ISampleStream and can be so the replacement of any Sample Stream occurence within the overview.

Additional Information

Dependency resolving

If a new Filter, Streaming Source or Streaming Sink should support automatic dependency resolving, while building sessions within the ADTF Configuration Editor, follow the instructions for Setup dependencies between components.