ADTF  3.18.2
Demo ADTF IPC Integration
Description
Implements a library that wraps an ADTF System and the IPC Sample Streaming Sinks and Sources and an example application that sends and recieves data to communicate from/to IPC components within ADTF.
Dependencies
Location
./src/examples/src/remote/adtf_ipc_integration/
Build Environment
To see how to set up the build environment have a look at ADTF CMake Environment
this implementation shows:
  • how to integrate the IPC mechanism used by ADTF into your own applications
Remarks
  • The library provides the adtf::ipc::devil::cStreamingServiceFactory class that wraps an ADTF system and provides a simple interface to send and receive data via the ADTF IPC mechanisms
  • By instantiating a factory, an ADTF System will be started and all required ADTF plugins will be loaded
  • After that you can create sinks and sources that manage the communication with other ADTF systems
  • The application will send some strings to itself via a host-only sink and a host-only source
Header (Library)
#pragma once
#include <a_utils.h>
#include <adtf_streaming3.h>
#include <adtf_systemsdk.h>
#include <unordered_map>
#include <unordered_set>
#include <functional>
#include <string>
namespace adtf
{
namespace ipc
{
namespace devil
{
struct tStreamType
{
std::string strMetaTypeName;
std::unordered_map<std::string, std::string> oProperties;
};
inline bool operator==(const tStreamType& sFirst, const tStreamType& sSecond)
{
return sFirst.strMetaTypeName == sSecond.strMetaTypeName &&
sFirst.oProperties == sSecond.oProperties;
}
class IStreamingRequests
{
public:
virtual void RequestSubstream(uint32_t nSubstreamId) = 0;
virtual void CancelSubstream(uint32_t nSubstreamId) = 0;
};
struct tStreamHandler
{
std::function<void(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>&)> fnStreamTypeHandler;
std::function<void(const ucom::ant::iobject_ptr<const streaming::ant::ISample>&)> fnSampleHandler;
std::function<void(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>&, IStreamingRequests&)> fnSyncStreamTypeHandler;
};
class cSource final
{
public:
cSource() = default;
~cSource();
cSource(const cSource&) = delete;
cSource(cSource&&) = default;
cSource& operator=(const cSource&) = delete;
cSource& operator=(cSource&&) = default;
private:
friend class cStreamingServiceFactory;
cSource(const ucom::ant::iobject_ptr<streaming::ant::ISampleStreamingSource>& pSource,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers);
ucom::ant::object_ptr<streaming::ant::ISampleStreamingSource> m_pSource;
struct tStreamObjects: public IStreamingRequests
{
std::unique_ptr<streaming::cDynamicSampleReader> pReader;
ucom::ant::object_ptr<streaming::ISampleStream> pSampleStream;
std::unordered_map<uint32_t, ucom::object_ptr<streaming::IStreamingRequest>> oRequests;
void RequestSubstream(uint32_t nSubstreamId) override;
void CancelSubstream(uint32_t nSubstreamId) override;
};
std::list<tStreamObjects> m_oObjectsOfStreams;
};
class cSink final
{
public:
cSink() = default;
~cSink();
cSink(const cSink&) = delete;
cSink(cSink&&) = default;
cSink& operator=(const cSink&) = delete;
cSink& operator=(cSink&&) = default;
void SendStreamType(const std::string& strStreamName,
const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pStreamType);
void Send(const std::string& strStreamName, const void* pData, size_t nDataSize);
private:
friend class cStreamingServiceFactory;
cSink(const ucom::ant::iobject_ptr<streaming::ant::ISampleStreamingSink>& pSink,
const std::vector<std::string>& oStreams,
std::function<void(const std::string&, uint32_t)> fnEnableSubstream = {},
std::function<void(const std::string&, uint32_t)> fnDisableSubstream = {});
ucom::ant::object_ptr<streaming::ant::ISampleStreamingSink> m_pSink;
ucom::ant::object_ptr<services::IReferenceClock> m_pClock;
struct tStreamObjects
{
ucom::ant::object_ptr<streaming::IInPin> pSinkPin;
std::unique_ptr<streaming::requestable_writer<>> pWriter;
ucom::ant::object_ptr<streaming::ISampleStream> pSampleStream;
};
std::unordered_map<std::string, tStreamObjects> m_oObjectsOfStreams;
};
class cStreamingServiceFactory
{
public:
cStreamingServiceFactory(const std::string& strAdtfPluginFolder, bool bLogging = false);
virtual ~cStreamingServiceFactory();
void LoadService(const std::string& strFileName,
const std::string& strClassId,
const std::string& strObjectId);
void LoadPlugin(const std::string& strFileName);
cSource CreateSource(const std::string& strClassId,
const std::string& strName,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers,
const std::unordered_map<std::string, std::string>& oProperties);
cSink CreateSink(const std::string& strClassId,
const std::string& strName,
const std::vector<std::string>& oStreams,
const std::unordered_map<std::string, std::string>& oProperties,
std::function<void(const std::string&, uint32_t)> fnEnableSubstream = {},
std::function<void(const std::string&, uint32_t)> fnDisableSubstream = {});
public:
util::cFilename m_strAdtfPluginFolder;
system::ant::cADTFSystem m_oADTFSystem;
};
class cRequestor
{
public:
cRequestor(std::unordered_set<std::string> oSubstreams):
m_oSubstreams(std::move(oSubstreams))
{
}
void operator()(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pType, IStreamingRequests& oRequests)
{
streaming::stream_meta_type_substreams::ListSubStreams(*pType.Get(), [&](const char* strSubstreamName, uint32_t nSubstreamId)
{
if (m_oSubstreams.count(strSubstreamName))
{
oRequests.RequestSubstream(nSubstreamId);
}
});
}
private:
const std::unordered_set<std::string> m_oSubstreams;
};
tStreamType get_stream_type(const ucom::ant::iobject_ptr<const streaming::ant::IStreamType>& pStreamType);
ucom::ant::object_ptr<streaming::ant::IStreamType> create_stream_type(const tStreamType& sStreamType);
void get_sample_data(const ucom::ant::iobject_ptr<const streaming::ant::ISample>& pSample,
std::function<void(const void*, size_t)> fnCallback);
}
using devil::cStreamingServiceFactory;
using devil::IStreamingRequests;
using devil::cSource;
using devil::cSink;
using devil::tStreamType;
using devil::cRequestor;
using devil::get_stream_type;
using devil::create_stream_type;
using devil::get_sample_data;
}
}
Copyright © Audi Electronics Venture GmbH.
tBool operator==(const tErrorCode &lhs, const tErrorCode &rhs)
Compare two POD error code types for equality.
Namespace for entire ADTF SDK.
static void ListSubStreams(const ant::IStreamType &oStreamType, std::function< void(const char *)> fnCallback)
List all available Substreams.
Implementation (Library)
#include <adtf_ipc_integration/adtf_ipc_integration.h>
#ifdef GetObject
#undef GetObject
#endif
namespace adtf
{
namespace ipc
{
namespace devil
{
#define CHECKED(__exp) \
{ \
tResult __result = (__exp); \
if (IS_FAILED(__result)) \
{ \
throw std::runtime_error(util::to_string(__result).GetPtr()); \
} \
}
void null_logger(const util::log::tLogEntry& /* sEntry */)
{
}
cStreamingServiceFactory::cStreamingServiceFactory(const std::string& strAdtfPluginFolder, bool bLogging):
m_strAdtfPluginFolder(strAdtfPluginFolder.c_str())
{
m_strAdtfPluginFolder.AppendTrailingSlash();
if (!bLogging)
{
util::log::set_logger(&null_logger);
}
m_oADTFSystem.EnableLogging(true);
m_oADTFSystem.SetLoggingParameter(bLogging ? adtf::util::log::All : adtf::util::log::None,
adtf::util::log::None,
adtf::util::log::None,
0);
CHECKED(m_oADTFSystem.Launch(adtf::util::cCommandLine()));
LoadService("adtf_clock.adtfplugin", CID_ADTF_REFERENCE_CLOCK, "clock");
LoadService("adtf_kernel.adtfplugin", CID_ADTF_KERNEL, "kernel");
LoadPlugin("default_core_objects.adtfplugin");
LoadPlugin("adtf_ipc_host_only.adtfplugin");
LoadPlugin("adtf_ipc_tcp.adtfplugin");
LoadPlugin("adtf_ipc_udp.adtfplugin");
#ifndef _MSC_VER
LoadPlugin("adtf_ipc_sctp.adtfplugin");
#endif
CHECKED(_runtime->SetRunLevel(base::tADTFRunLevel::RL_System));
}
cStreamingServiceFactory::~cStreamingServiceFactory()
{
_runtime->SetRunLevel(ucom::IRuntime::RL_Shutdown);
}
void set_properties(const ucom::iobject_ptr<ucom::IObject>& pObject,
const std::unordered_map<std::string, std::string>& oProperties)
{
if (!oProperties.empty())
{
auto pConfiguration = ucom::ucom_cast<base::IConfiguration*>(pObject.Get());
for (const auto& oProperty : oProperties)
{
base::set_property_by_path<util::cString>(*pConfiguration, oProperty.first.c_str(), oProperty.second.c_str());
}
}
}
cSource cStreamingServiceFactory::CreateSource(const std::string& strClassId,
const std::string& strName,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers,
const std::unordered_map<std::string, std::string>& oProperties)
{
ucom::object_ptr<streaming::ISampleStreamingSource> pSource;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pSource));
pSource->SetName(strName.c_str());
set_properties(pSource, oProperties);
return cSource(pSource, oStreamHandlers);
}
cSink cStreamingServiceFactory::CreateSink(const std::string& strClassId,
const std::string& strName,
const std::vector<std::string>& oStreams,
const std::unordered_map<std::string, std::string>& oProperties,
std::function<void(const std::string&, uint32_t)> fnEnableSubstream,
std::function<void(const std::string&, uint32_t)> fnDisableSubstream)
{
ucom::object_ptr<streaming::ISampleStreamingSink> pSink;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pSink));
pSink->SetName(strName.c_str());
set_properties(pSink, oProperties);
return cSink(pSink, oStreams, fnEnableSubstream, fnDisableSubstream);
}
void cStreamingServiceFactory::LoadService(const std::string& strFileName,
const std::string& strClassId,
const std::string& strObjectId)
{
LoadPlugin(strFileName);
ucom::object_ptr<ucom::IObject> pService;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pService));
CHECKED(_runtime->RegisterObject(pService, strObjectId.c_str(), base::tADTFRunLevel::RL_System));
}
void cStreamingServiceFactory::LoadPlugin(const std::string& strFileName)
{
ucom::object_ptr<ucom::IPluginInfo> pPlugin;
util::cFilename strFileNameHelper(strFileName.c_str());
if (strFileNameHelper.IsRelative())
{
strFileNameHelper = m_strAdtfPluginFolder + strFileNameHelper;
}
CHECKED(_runtime->RegisterPlugin(strFileNameHelper, ucom::IRuntime::RL_Internal, pPlugin));
}
namespace
{
class cTriggerPipeItemHelper :
public ucom::catwo::object<streaming::trigger_pipe_item<streaming::ITriggerPipeItem>>
{
public:
cTriggerPipeItemHelper(base::IRunnable::tRunFunction fnTriggerHandler):
m_oRunnable(fnTriggerHandler)
{
SetRunnable(m_oRunnable);
}
private:
base::runnable<base::IRunnable::RUN_TRIGGER> m_oRunnable;
};
} // namespace
cSource::cSource(const ucom::iobject_ptr<adtf::streaming::ISampleStreamingSource>& pSource,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers):
m_pSource(pSource)
{
CHECKED(pSource->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Constructed));
ucom::object_ptr<streaming::IStreamType> pType = ucom::make_object_ptr<streaming::cStreamType>(streaming::stream_meta_type_anonymous());
for (auto& oStreamHandler : oStreamHandlers)
{
ucom::object_ptr<streaming::IOutPin> pPin;
CHECKED(pSource->RequestPin(oStreamHandler.first.c_str(), pType, pPin));
tStreamObjects& sStreamObjects = m_oObjectsOfStreams.emplace_back();
sStreamObjects.pReader.reset(new streaming::cDynamicSampleReader);
sStreamObjects.pReader->SetName(oStreamHandler.first.c_str());
if (oStreamHandler.second.fnSyncStreamTypeHandler)
{
sStreamObjects.pReader->SetSynchronousTypeUpdateCallback([oStreamHandler, &sStreamObjects](const ucom::iobject_ptr<const streaming::IStreamType>& pStreamType) -> tResult
{
sStreamObjects.oRequests.clear();
RETURN_IF_THROWS(oStreamHandler.second.fnSyncStreamTypeHandler(pStreamType, sStreamObjects));
});
}
if (oStreamHandler.second.fnStreamTypeHandler)
{
sStreamObjects.pReader->SetAcceptTypeCallback([oStreamHandler](const ucom::iobject_ptr<const streaming::IStreamType>& pStreamType) -> tResult
{
RETURN_IF_THROWS(oStreamHandler.second.fnStreamTypeHandler(pStreamType));
});
}
sStreamObjects.pSampleStream = ucom::make_object_ptr<streaming::cSampleStream>();
CHECKED(streaming::testing::connect_pin(pPin, sStreamObjects.pSampleStream, true));
CHECKED(sStreamObjects.pReader->BeginStreaming(*sStreamObjects.pSampleStream));
streaming::cDynamicSampleReader* pReaderPointer = sStreamObjects.pReader.get();
adtf::ucom::make_object_ptr<adtf::streaming::cRunner>("test", [oStreamHandler, pReaderPointer](tTimeStamp /* tmTrigger */) -> tResult {
ucom::object_ptr<const streaming::ISample> pSample;
while (IS_OK(pReaderPointer->GetNextSample(pSample)))
{
if (oStreamHandler.second.fnSampleHandler)
{
RETURN_IF_THROWS(oStreamHandler.second.fnSampleHandler(pSample));
}
}
});
sStreamObjects.pSampleStream->RegisterSubItem(pTriggerHandler,
}
CHECKED(m_pSource->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Streaming));
}
cSource::~cSource()
{
if (m_pSource)
{
for (auto& sStreamObjects : m_oObjectsOfStreams)
{
sStreamObjects.oRequests.clear();
sStreamObjects.pReader->EndStreaming();
}
m_pSource->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Shutdown);
}
}
void cSource::tStreamObjects::RequestSubstream(uint32_t nSubstreamId)
{
ucom::object_ptr<streaming::IStreamingRequest> pRequest;
CHECKED(pReader->RequestSamples(pRequest, nSubstreamId));
oRequests.emplace(nSubstreamId, pRequest);
}
void cSource::tStreamObjects::CancelSubstream(uint32_t nSubstreamId)
{
oRequests.erase(nSubstreamId);
}
cSink::cSink(const ucom::iobject_ptr<adtf::streaming::ISampleStreamingSink>& pSink,
const std::vector<std::string>& oStreams,
std::function<void(const std::string&, uint32_t)> fnEnableSubstream,
std::function<void(const std::string&, uint32_t)> fnDisableSubstream):
m_pSink(pSink)
{
CHECKED(_runtime->GetObject(m_pClock));
CHECKED(pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Constructed));
ucom::object_ptr<streaming::IStreamType> pType = ucom::make_object_ptr<streaming::cStreamType>(streaming::stream_meta_type_anonymous());
for (auto& strStream : oStreams)
{
tStreamObjects sStreamObjects;
CHECKED(pSink->RequestPin(strStream.c_str(), pType, sStreamObjects.pSinkPin));
sStreamObjects.pWriter.reset(new streaming::requestable_writer<>);
sStreamObjects.pWriter->SetName(strStream.c_str());
sStreamObjects.pSampleStream = ucom::make_object_ptr<streaming::cSampleStream>();
CHECKED(streaming::testing::connect_pin(sStreamObjects.pSinkPin, sStreamObjects.pSampleStream, true));
CHECKED(sStreamObjects.pWriter->BeginStreaming(*sStreamObjects.pSampleStream));
sStreamObjects.pWriter->SetCallbacks([strStream, fnEnableSubstream](uint32_t nSubStreamId, const base::IProperties*)
{
fnEnableSubstream(strStream, nSubStreamId);
},
[strStream, fnDisableSubstream](uint32_t nSubStreamId)
{
fnDisableSubstream(strStream, nSubStreamId);
});
m_oObjectsOfStreams.emplace(strStream, std::move(sStreamObjects));
}
CHECKED(pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Streaming));
}
cSink::~cSink()
{
if (m_pSink)
{
m_pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Initialized);
for (auto& sStreamObjects : m_oObjectsOfStreams)
{
sStreamObjects.second.pSinkPin->Disconnect();
sStreamObjects.second.pWriter->EndStreaming();
}
m_pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Shutdown);
}
}
void cSink::SendStreamType(const std::string& strStreamName,
const ucom::iobject_ptr<const adtf::streaming::IStreamType>& pStreamType)
{
auto oStreamObjects = m_oObjectsOfStreams.find(strStreamName);
if (oStreamObjects == m_oObjectsOfStreams.end())
{
throw std::runtime_error("no such stream: " + strStreamName);
}
CHECKED(oStreamObjects->second.pWriter->ChangeType(pStreamType));
}
void cSink::Send(const std::string& strStreamName, const void* pData, size_t nDataSize)
{
auto oStreamObjects = m_oObjectsOfStreams.find(strStreamName);
if (oStreamObjects == m_oObjectsOfStreams.end())
{
throw std::runtime_error("no such stream: " + strStreamName);
}
auto tmNow = m_pClock->GetStreamTimeNs();
ucom::object_ptr<streaming::ISample> pSample;
CHECKED(streaming::alloc_sample(pSample, tmNow));
{
ucom::object_ptr_locked<streaming::ISampleBuffer> pBuffer;
CHECKED(pSample->WriteLock(pBuffer, nDataSize));
CHECKED(pBuffer->Write(base::adtf_memory_buffer<const void>(pData, nDataSize)));
}
CHECKED(oStreamObjects->second.pWriter->Transmit(pSample));
CHECKED(oStreamObjects->second.pSampleStream->Run(adtf::base::duration_cast<tTimeStamp>(tmNow),
base::IRunnable::RUN_TRIGGER,
nullptr,
0));
}
class cPropertyDumper : public base::cPropertiesHelper
{
public:
cPropertyDumper(tStreamType& sStreamType, const std::string& strPrefix = std::string()):
m_sStreamType(sStreamType),
m_strPrefix(strPrefix)
{
}
virtual tResult SetProperty(const base::IProperty& oProperty)
{
std::string strName;
RETURN_IF_FAILED(oProperty.GetName(base::adtf_string<std::string>(&strName)));
std::string strValue;
RETURN_IF_FAILED(oProperty.GetValue()->ToString(base::adtf_string<std::string>(&strValue)));
m_sStreamType.oProperties.emplace(m_strPrefix + strName, strValue);
ucom::object_ptr<const base::IProperties> pSubProperties;
if (IS_OK(oProperty.GetProperties(pSubProperties)))
{
cPropertyDumper oSubDumper(m_sStreamType, m_strPrefix + strName + "/");
pSubProperties->Get(oSubDumper);
}
}
private:
tStreamType& m_sStreamType;
std::string m_strPrefix;
};
tStreamType get_stream_type(const ucom::iobject_ptr<const streaming::IStreamType>& pStreamType)
{
tStreamType sType;
CHECKED(pStreamType->GetMetaTypeName(base::adtf_string<std::string>(&sType.strMetaTypeName)));
ucom::object_ptr<const base::IProperties> pProperties;
CHECKED(pStreamType->GetConfig(pProperties))
cPropertyDumper oDumper(sType);
CHECKED(pProperties->Get(oDumper));
return sType;
}
ucom::object_ptr<streaming::IStreamType> create_stream_type(const tStreamType& sStreamType)
{
ucom::object_ptr<streaming::IStreamType> pNewType = ucom::make_object_ptr<adtf::streaming::cCamelionStreamType>(sStreamType.strMetaTypeName.c_str());
ucom::object_ptr<base::IProperties> pProperties;
CHECKED(pNewType->GetConfig(pProperties));
for (auto& oProperty : sStreamType.oProperties)
{
CHECKED(base::set_property<util::cString>(*pProperties,
oProperty.first.c_str(),
oProperty.second.c_str()));
}
return pNewType;
}
void get_sample_data(const ucom::iobject_ptr<const streaming::ISample>& pSample,
std::function<void(const void*, size_t)> fnCallback)
{
ucom::object_ptr_shared_locked<const streaming::ISampleBuffer> pBuffer;
CHECKED(pSample->Lock(pBuffer));
fnCallback(pBuffer->GetPtr(), pBuffer->GetSize());
}
} // namespace devil
} // namespace ipc
} // namespace adtf
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
Copyright © Audi Electronics Venture GmbH.
virtual tResult CreateInstance(const char *strCID, iobject_ptr< IObject > &pObject, const tChar *strNameOfObject="") const =0
Creates a new instance of an object.
virtual tResult UnregisterAllPlugins(int8_t nRunLevel, uint32_t ui32Flags=0)=0
Unregister all plugin from plugin registry.
virtual tResult SetRunLevel(int8_t nRunLevel, bool bWait=true)=0
Set run level.
virtual tResult RegisterObject(const iobject_ptr< IObject > &pObject, const char *strNameOID, int8_t nRunLevel, uint32_t ui32Flags=0)=0
Register object at object registry.
virtual tResult RegisterPlugin(const char *strUrl, int8_t nRunLevel, iobject_ptr< IPluginInfo > &pPluginInfo, uint32_t ui32Flags=0)=0
Register plugin at the plugin registry.
virtual tResult GetObject(iobject_ptr< IObject > &pObject, const char *strNameOID) const =0
Get registered object from object registry.
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:163
#define CID_ADTF_KERNEL
Object ID of the default kernel service.
Definition: kernel_intf.h:13
tVoid set_logger(logger oLogger)
Sets the currently used logger.
@ RL_Internal
internal runlevel Is not for users !
Definition: adtf_runtime.h:42
sample_reader< cDynamicSampleReaderQueue > cDynamicSampleReader
The cDynamicSampleReader will create a sample reader which will create a internal sample queue with u...
Definition: samplereader.h:813
tResult alloc_sample(ucom::ant::iobject_ptr< ucom::ant::IObject > &pSampleObject, const char *strSampleCID)
Helper Function to get a Sample Instance through the adtf::ucom::ant::IRuntime.
adtf::ucom::IRuntime * _runtime
Global Runtime Pointer to reference to the current runtime.
#define CID_ADTF_REFERENCE_CLOCK
ClassID for the ADTF Default Reference Clock.
#define RETURN_IF_THROWS(s)
if the expression throws an exception, returns a tResult containing the exception information.
Implementation (Application)
#include <adtf_ipc_integration/adtf_ipc_integration.h>
#include <iostream>
#include <csignal>
#include <atomic>
#include <chrono>
void print_usage(const std::string& strExecutableName)
{
std::cout << strExecutableName << " [-logging] <ADTF_PLUGIN_DIRECTORY>\n" ;
}
void print_exception(const std::exception& oError, int nLevel = 0)
{
std::cerr << std::string(nLevel, ' ') << "exception: " << oError.what() << '\n';
try
{
std::rethrow_if_nested(oError);
}
catch (const std::exception& oNestedException)
{
print_exception(oNestedException, nLevel + 1);
}
catch (...)
{
}
}
std::atomic_bool g_bKeepSending(true);
void send_data(adtf::ipc::cSink& oSink)
{
oSink.SendStreamType("test_stream", adtf::ipc::create_stream_type({"test/string", {{"test_property", "test_value"}}}));
std::string strMessage("Hello World");
while (g_bKeepSending)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
oSink.Send("test_stream", strMessage.data(), strMessage.size());
}
}
void signal_handler(int nSignalId)
{
g_bKeepSending = false;
std::signal(nSignalId, SIG_DFL);
}
int main(int argn, const char** argv)
try
{
adtf::util::cCommandLine oArguments(argn, argv);
if (oArguments.GetFlag("h") || oArguments.GetFlag("help"))
{
print_usage(argv[0]);
return 0;
}
auto strPluginDirectory = oArguments.GetValue(1);
if (strPluginDirectory.IsEmpty())
{
throw std::runtime_error("missing ADTF plugin directory specification (see -h)");
}
adtf::ipc::cStreamingServiceFactory oFactory(strPluginDirectory.GetPtr(),
oArguments.GetFlag("logging"));
auto oSource = oFactory.CreateSource("host_only_receiver.streaming_source.adtf.cid", "source1",
{
{
"test_stream",
{
{
std::cout << "type received: " << adtf::ipc::get_stream_type(pType).strMetaTypeName << "\n";
},
{
adtf::ipc::get_sample_data(pSample, [](const void* /* pData */, size_t nDataSize)
{
std::cout << "sample received: " << nDataSize << " bytes\n";
});
}
}
}
},
{
{"source/communication_end_point_name", "test"}
});
auto oSink = oFactory.CreateSink("host_only_sender.streaming_sink.adtf.cid", "sink1",
{
"test_stream"
},
{
{"sink/communication_end_point_name", "test"}
});
std::signal(SIGINT, &signal_handler);
send_data(oSink);
return 0;
}
catch (const std::exception& oError)
{
print_exception(oError);
return 1;
}
Base object pointer to realize binary compatible reference counting in interface methods.