ADTF  3.13.2
Demo ADTF IPC Integration
Description
Implements a library that wraps an ADTF System and the IPC Sample Streaming Sinks and Sources and an example application that sends and recieves data to communicate from/to IPC components within ADTF.
Dependencies
Location
./src/examples/src/remote/adtf_ipc_integration/
Build Environment
To see how to set up the build environment have a look at ADTF CMake Environment
this implementation shows:
  • how to integrate the IPC mechanism used by ADTF into your own applications
Remarks
  • The library provides the adtf::ipc::devil::cStreamingServiceFactory class that wraps an ADTF system and provides a simple interface to send and receive data via the ADTF IPC mechanisms
  • By instantiating a factory, an ADTF System will be started and all required ADTF plugins will be loaded
  • After that you can create sinks and sources that manage the communication with other ADTF systems
  • The application will send some strings to itself via a host-only sink and a host-only source
Header (Library)
#pragma once
#include <a_utils.h>
#include <adtf_streaming3.h>
#include <adtf_systemsdk.h>
#include <unordered_map>
#include <functional>
namespace adtf
{
namespace ipc
{
namespace devil
{
struct tStreamType
{
std::string strMetaTypeName;
std::unordered_map<std::string, std::string> oProperties;
};
inline bool operator==(const tStreamType& sFirst, const tStreamType& sSecond)
{
return sFirst.strMetaTypeName == sSecond.strMetaTypeName &&
sFirst.oProperties == sSecond.oProperties;
}
struct tStreamHandler
{
std::function<void(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>&)> fnStreamTypeHandler;
std::function<void(const ucom::ant::iobject_ptr<const streaming::ant::ISample>&)> fnSampleHandler;
};
class cSource final
{
public:
cSource() = default;
~cSource();
cSource(const cSource&) = delete;
cSource(cSource&&) = default;
cSource& operator=(const cSource&) = delete;
cSource& operator=(cSource&&) = default;
private:
friend class cStreamingServiceFactory;
cSource(const ucom::ant::iobject_ptr<streaming::ant::ISampleStreamingSource>& pSource,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers);
ucom::ant::object_ptr<streaming::ant::ISampleStreamingSource> m_pSource;
struct tStreamObjects
{
std::unique_ptr<streaming::ant::cDynamicSampleReader> pReader;
ucom::ant::object_ptr<streaming::ant::ISampleStream> pSampleStream;
};
std::vector<tStreamObjects> m_oObjectsOfStreams;
};
class cSink final
{
public:
cSink() = default;
~cSink();
cSink(const cSink&) = delete;
cSink(cSink&&) = default;
cSink& operator=(const cSink&) = delete;
cSink& operator=(cSink&&) = default;
void SendStreamType(const std::string& strStreamName,
const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pStreamType);
void Send(const std::string& strStreamName, const void* pData, size_t nDataSize);
private:
friend class cStreamingServiceFactory;
cSink(const ucom::ant::iobject_ptr<streaming::ant::ISampleStreamingSink>& pSink,
const std::vector<std::string>& oStreams);
ucom::ant::object_ptr<streaming::ant::ISampleStreamingSink> m_pSink;
ucom::ant::object_ptr<services::IReferenceClock> m_pClock;
struct tStreamObjects
{
ucom::ant::object_ptr<streaming::ant::IInPin> pSinkPin;
std::unique_ptr<streaming::ant::cSampleWriter> pWriter;
ucom::ant::object_ptr<streaming::ant::ISampleStream> pSampleStream;
};
std::unordered_map<std::string, tStreamObjects> m_oObjectsOfStreams;
};
class cStreamingServiceFactory
{
public:
cStreamingServiceFactory(const std::string& strAdtfPluginFolder, tBool bLogging = tFalse);
virtual ~cStreamingServiceFactory();
void LoadService(const std::string& strFileName,
const std::string& strClassId,
const std::string& strObjectId);
void LoadPlugin(const std::string& strFileName);
cSource CreateSource(const std::string& strClassId,
const std::string& strName,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers,
const std::unordered_map<std::string, std::string>& oProperties);
cSink CreateSink(const std::string& strClassId,
const std::string& strName,
const std::vector<std::string>& oStreams,
const std::unordered_map<std::string, std::string>& oProperties);
public:
util::cFilename m_strAdtfPluginFolder;
system::ant::cADTFSystem m_oADTFSystem;
};
tStreamType get_stream_type(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pStreamType);
ucom::ant::object_ptr<streaming::ant::IStreamType> create_stream_type(const tStreamType& sStreamType);
void get_sample_data(const ucom::ant::iobject_ptr<const streaming::ant::ISample>& pSample,
std::function<void(const void*, size_t)> fnCallback);
}
using devil::cStreamingServiceFactory;
using devil::cSource;
using devil::cSink;
using devil::tStreamType;
using devil::get_stream_type;
using devil::create_stream_type;
using devil::get_sample_data;
}
}
Implementation (Library)
#include <adtf_ipc_integration/adtf_ipc_integration.h>
namespace adtf
{
namespace ipc
{
namespace devil
{
#define CHECKED(__exp)\
{\
tResult __result = (__exp);\
if (IS_FAILED(__result))\
{\
throw std::runtime_error(util::to_string(__result).GetPtr());\
}\
}
tVoid null_logger(const util::log::tLogEntry& /* sEntry */)
{
}
cStreamingServiceFactory::cStreamingServiceFactory(const std::string& strAdtfPluginFolder, tBool bLogging):
m_strAdtfPluginFolder(strAdtfPluginFolder.c_str())
{
m_strAdtfPluginFolder.AppendTrailingSlash();
if (!bLogging)
{
util::log::set_logger(&null_logger);
}
m_oADTFSystem.EnableLogging(tTrue);
m_oADTFSystem.SetLoggingParameter(bLogging ? adtf::util::log::All : adtf::util::log::None,
adtf::util::log::None,
adtf::util::log::None,
0);
CHECKED(m_oADTFSystem.Launch(adtf::util::cCommandLine()));
LoadService("adtf_clock.adtfplugin", CID_ADTF_REFERENCE_CLOCK, "clock");
LoadService("adtf_kernel.adtfplugin", CID_ADTF_KERNEL, "kernel");
LoadPlugin("default_core_objects.adtfplugin");
LoadPlugin("adtf_ipc_host_only.adtfplugin");
LoadPlugin("adtf_ipc_tcp.adtfplugin");
LoadPlugin("adtf_ipc_udp.adtfplugin");
#ifndef _MSC_VER
LoadPlugin("adtf_ipc_sctp.adtfplugin");
#endif
CHECKED(_runtime->SetRunLevel(base::tADTFRunLevel::RL_System));
}
cStreamingServiceFactory::~cStreamingServiceFactory()
{
}
void set_properties(const ucom::ant::iobject_ptr<ucom::ant::IObject>& pObject,
const std::unordered_map<std::string, std::string>& oProperties)
{
if (!oProperties.empty())
{
auto pConfiguration = ucom::ant::ucom_cast<base::IConfiguration*>(pObject.Get());
for (const auto& oProperty: oProperties)
{
base::set_property_by_path<util::cString>(*pConfiguration, oProperty.first.c_str(), oProperty.second.c_str());
}
}
}
cSource cStreamingServiceFactory::CreateSource(const std::string& strClassId,
const std::string& strName,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers,
const std::unordered_map<std::string, std::string>& oProperties)
{
ucom::ant::object_ptr<streaming::ant::ISampleStreamingSource> pSource;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pSource));
pSource->SetName(strName.c_str());
set_properties(pSource, oProperties);
return cSource(pSource, oStreamHandlers);
}
cSink cStreamingServiceFactory::CreateSink(const std::string& strClassId,
const std::string& strName,
const std::vector<std::string>& oStreams,
const std::unordered_map<std::string, std::string>& oProperties)
{
ucom::ant::object_ptr<streaming::ant::ISampleStreamingSink> pSink;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pSink));
pSink->SetName(strName.c_str());
set_properties(pSink, oProperties);
return cSink(pSink, oStreams);
}
tVoid cStreamingServiceFactory::LoadService(const std::string& strFileName,
const std::string& strClassId,
const std::string& strObjectId)
{
LoadPlugin(strFileName);
ucom::ant::object_ptr<ucom::ant::IObject> pService;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pService));
CHECKED(_runtime->RegisterObject(pService, strObjectId.c_str(), base::tADTFRunLevel::RL_System));
}
tVoid cStreamingServiceFactory::LoadPlugin(const std::string& strFileName)
{
ucom::ant::object_ptr<ucom::ant::IPluginInfo> pPlugin;
util::cFilename strFileNameHelper(strFileName.c_str());
if (strFileNameHelper.IsRelative())
{
strFileNameHelper = m_strAdtfPluginFolder + strFileNameHelper;
}
CHECKED(_runtime->RegisterPlugin(strFileNameHelper, ucom::ant::IRuntime::RL_Internal, pPlugin));
}
namespace
{
class cTriggerPipeItemHelper:
public ucom::catwo::object<streaming::ant::trigger_pipe_item<streaming::ant::ITriggerPipeItem>>
{
public:
cTriggerPipeItemHelper(base::IRunnable::tRunFunction fnTriggerHandler):
m_oRunnable(fnTriggerHandler)
{
SetRunnable(m_oRunnable);
}
private:
base::ant::runnable<base::IRunnable::RUN_TRIGGER> m_oRunnable;
};
}
cSource::cSource(const ucom::ant::iobject_ptr<adtf::streaming::ant::ISampleStreamingSource>& pSource,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers):
m_pSource(pSource)
{
CHECKED(pSource->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Constructed));
ucom::ant::object_ptr<streaming::ant::IStreamType> pType = ucom::ant::make_object_ptr<streaming::ant::cStreamType>(streaming::ant::stream_meta_type_anonymous());
for (auto& oStreamHandler: oStreamHandlers)
{
ucom::ant::object_ptr<streaming::ant::IOutPin> pPin;
CHECKED(pSource->RequestPin(oStreamHandler.first.c_str(), pType, pPin));
tStreamObjects sStreamObjects;
sStreamObjects.pReader.reset(new streaming::ant::cDynamicSampleReader);
sStreamObjects.pReader->SetName(oStreamHandler.first.c_str());
if (oStreamHandler.second.fnStreamTypeHandler)
{
sStreamObjects.pReader->SetAcceptTypeCallback([oStreamHandler](const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pStreamType) -> tResult
{
try
{
oStreamHandler.second.fnStreamTypeHandler(pStreamType);
}
catch (const std::exception& oError)
{
RETURN_ERROR_DESC(ERR_INVALID_TYPE, oError.what());
}
});
}
sStreamObjects.pSampleStream = ucom::ant::make_object_ptr<streaming::ant::cSampleStream>();
CHECKED(streaming::testing::connect_pin(pPin, sStreamObjects.pSampleStream, tTrue));
CHECKED(sStreamObjects.pReader->BeginStreaming(*sStreamObjects.pSampleStream));
streaming::ant::cDynamicSampleReader* pReaderPointer = sStreamObjects.pReader.get();
adtf::ucom::ant::make_object_ptr<adtf::streaming::ant::cRunner>("test", [oStreamHandler, pReaderPointer](tTimeStamp /* tmTrigger */) -> tResult
{
ucom::ant::object_ptr<const streaming::ant::ISample> pSample;
while (IS_OK(pReaderPointer->GetNextSample(pSample)))
{
try
{
if (oStreamHandler.second.fnSampleHandler)
{
oStreamHandler.second.fnSampleHandler(pSample);
}
}
catch (const std::exception& oError)
{
RETURN_ERROR_DESC(ERR_FAILED, oError.what());
}
}
});
sStreamObjects.pSampleStream->RegisterSubItem(pTriggerHandler,
m_oObjectsOfStreams.push_back(std::move(sStreamObjects));
}
CHECKED(m_pSource->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Streaming));
}
cSource::~cSource()
{
if (m_pSource)
{
for (auto& sStreamObjects: m_oObjectsOfStreams)
{
sStreamObjects.pReader->EndStreaming();
}
m_pSource->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Shutdown);
}
}
cSink::cSink(const ucom::ant::iobject_ptr<adtf::streaming::ant::ISampleStreamingSink>& pSink,
const std::vector<std::string>& oStreams):
m_pSink(pSink)
{
CHECKED(_runtime->GetObject(m_pClock));
CHECKED(pSink->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Constructed));
ucom::ant::object_ptr<streaming::ant::IStreamType> pType = ucom::ant::make_object_ptr<streaming::ant::cStreamType>(streaming::ant::stream_meta_type_anonymous());
for (auto& strStream: oStreams)
{
tStreamObjects sStreamObjects;
CHECKED(pSink->RequestPin(strStream.c_str(), pType, sStreamObjects.pSinkPin));
sStreamObjects.pWriter.reset(new streaming::ant::cSampleWriter);
sStreamObjects.pWriter->SetName(strStream.c_str());
sStreamObjects.pSampleStream = ucom::ant::make_object_ptr<streaming::ant::cSampleStream>();
CHECKED(streaming::testing::connect_pin(sStreamObjects.pSinkPin, sStreamObjects.pSampleStream, tTrue));
CHECKED(sStreamObjects.pWriter->BeginStreaming(*sStreamObjects.pSampleStream));
m_oObjectsOfStreams.emplace(strStream, std::move(sStreamObjects));
}
CHECKED(pSink->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Streaming));
}
cSink::~cSink()
{
if (m_pSink)
{
m_pSink->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Initialized);
for (auto& sStreamObjects: m_oObjectsOfStreams)
{
sStreamObjects.second.pSinkPin->Disconnect();
sStreamObjects.second.pWriter->EndStreaming();
}
m_pSink->SetState(streaming::ant::ISampleStreamingSource::tStreamingState::State_Shutdown);
}
}
void cSink::SendStreamType(const std::string& strStreamName,
const ucom::ant::iobject_ptr<const adtf::streaming::ant::IStreamType>& pStreamType)
{
auto oStreamObjects = m_oObjectsOfStreams.find(strStreamName);
if (oStreamObjects == m_oObjectsOfStreams.end())
{
throw std::runtime_error("no such stream: " + strStreamName);
}
CHECKED(oStreamObjects->second.pWriter->ChangeType(pStreamType));
}
void cSink::Send(const std::string& strStreamName, const void* pData, tSize nDataSize)
{
auto oStreamObjects = m_oObjectsOfStreams.find(strStreamName);
if (oStreamObjects == m_oObjectsOfStreams.end())
{
throw std::runtime_error("no such stream: " + strStreamName);
}
auto tmNow = m_pClock->GetStreamTimeNs();
ucom::ant::object_ptr<streaming::ant::ISample> pSample;
CHECKED(streaming::alloc_sample(pSample, tmNow));
{
ucom::ant::object_ptr_locked<streaming::ant::ISampleBuffer> pBuffer;
CHECKED(pSample->WriteLock(pBuffer, nDataSize));
CHECKED(pBuffer->Write(base::adtf_memory_buffer<const tVoid>(pData, nDataSize)));
}
CHECKED(oStreamObjects->second.pWriter->Transmit(pSample));
CHECKED(oStreamObjects->second.pSampleStream->Run(adtf::base::duration_cast<tTimeStamp>(tmNow),
nullptr,
0));
}
class cPropertyDumper: public base::ant::cPropertiesHelper
{
public:
cPropertyDumper(tStreamType& sStreamType, const std::string& strPrefix = std::string()):
m_sStreamType(sStreamType),
m_strPrefix(strPrefix)
{
}
virtual tResult SetProperty(const base::IProperty& oProperty)
{
std::string strName;
RETURN_IF_FAILED(oProperty.GetName(base::adtf_string<std::string>(&strName)));
std::string strValue;
RETURN_IF_FAILED(oProperty.GetValue()->ToString(base::adtf_string<std::string>(&strValue)));
m_sStreamType.oProperties.emplace(m_strPrefix + strName, strValue);
ucom::ant::object_ptr<const base::ant::IProperties> pSubProperties;
if (IS_OK(oProperty.GetProperties(pSubProperties)))
{
cPropertyDumper oSubDumper(m_sStreamType, m_strPrefix + strName + "/");
pSubProperties->Get(oSubDumper);
}
}
private:
tStreamType& m_sStreamType;
std::string m_strPrefix;
};
tStreamType get_stream_type(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pStreamType)
{
tStreamType sType;
CHECKED(pStreamType->GetMetaTypeName(base::adtf_string<std::string>(&sType.strMetaTypeName)));
ucom::ant::object_ptr<const base::IProperties> pProperties;
CHECKED(pStreamType->GetConfig(pProperties))
cPropertyDumper oDumper(sType);
CHECKED(pProperties->Get(oDumper));
return sType;
}
ucom::ant::object_ptr<streaming::ant::IStreamType> create_stream_type(const tStreamType& sStreamType)
{
ucom::ant::object_ptr<streaming::ant::IStreamType> pNewType = ucom::ant::make_object_ptr<adtf::streaming::cCamelionStreamType>(sStreamType.strMetaTypeName.c_str());
ucom::ant::object_ptr<base::ant::IProperties> pProperties;
CHECKED(pNewType->GetConfig(pProperties));
for (auto& oProperty: sStreamType.oProperties)
{
CHECKED(base::set_property<util::cString>(*pProperties,
oProperty.first.c_str(),
oProperty.second.c_str()));
}
return pNewType;
}
void get_sample_data(const ucom::ant::iobject_ptr<const streaming::ant::ISample>& pSample,
std::function<void (const void*, size_t)> fnCallback)
{
ucom::ant::object_ptr_shared_locked<const streaming::ant::ISampleBuffer> pBuffer;
CHECKED(pSample->Lock(pBuffer));
fnCallback(pBuffer->GetPtr(), pBuffer->GetSize());
}
}
}
}
Implementation (Application)
#include <adtf_ipc_integration/adtf_ipc_integration.h>
#include <iostream>
#include <csignal>
#include <atomic>
#include <chrono>
void print_usage(const std::string& strExecutableName)
{
std::cout << strExecutableName << " [-logging] <ADTF_PLUGIN_DIRECTORY>\n" ;
}
void print_exception(const std::exception& oError, int nLevel = 0)
{
std::cerr << std::string(nLevel, ' ') << "exception: " << oError.what() << '\n';
try
{
std::rethrow_if_nested(oError);
}
catch (const std::exception& oNestedException)
{
print_exception(oNestedException, nLevel + 1);
}
catch (...)
{
}
}
std::atomic_bool g_bKeepSending(true);
void send_data(adtf::ipc::cSink& oSink)
{
oSink.SendStreamType("test_stream", adtf::ipc::create_stream_type({"test/string", {{"test_property", "test_value"}}}));
std::string strMessage("Hello World");
while (g_bKeepSending)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
oSink.Send("test_stream", strMessage.data(), strMessage.size());
}
}
void signal_handler(int nSignalId)
{
g_bKeepSending = false;
std::signal(nSignalId, SIG_DFL);
}
int main(int argn, const char** argv)
try
{
adtf::util::cCommandLine oArguments(argn, argv);
if (oArguments.GetFlag("h") || oArguments.GetFlag("help"))
{
print_usage(argv[0]);
return 0;
}
auto strPluginDirectory = oArguments.GetValue(1);
if (strPluginDirectory.IsEmpty())
{
throw std::runtime_error("missing ADTF plugin directory specification (see -h)");
}
adtf::ipc::cStreamingServiceFactory oFactory(strPluginDirectory.GetPtr(),
oArguments.GetFlag("logging"));
auto oSource = oFactory.CreateSource("host_only_receiver.streaming_source.adtf.cid", "source1",
{
{
"test_stream",
{
{
std::cout << "type received: " << adtf::ipc::get_stream_type(pType).strMetaTypeName << "\n";
},
{
adtf::ipc::get_sample_data(pSample, [](const void* /* pData */, size_t nDataSize)
{
std::cout << "sample received: " << nDataSize << " bytes\n";
});
}
}
}
},
{
{"source/communication_end_point_name", "test"}
});
auto oSink = oFactory.CreateSink("host_only_sender.streaming_sink.adtf.cid", "sink1",
{
"test_stream"
},
{
{"sink/communication_end_point_name", "test"}
});
std::signal(SIGINT, &signal_handler);
send_data(oSink);
return 0;
}
catch (const std::exception& oError)
{
print_exception(oError);
return 1;
}

Copyright © Audi Electronics Venture GmbH. All rights reserved. (Generated on Tue Nov 16 2021 by doxygen 1.8.14)