#include <adtf_ipc_integration/adtf_ipc_integration.h>
#ifdef GetObject
#undef GetObject
#endif
{
namespace ipc
{
namespace devil
{
#define CHECKED(__exp) \
{ \
tResult __result = (__exp); \
if (IS_FAILED(__result)) \
{ \
throw std::runtime_error(util::to_string(__result).GetPtr()); \
} \
}
void null_logger(const util::log::tLogEntry& )
{
}
cStreamingServiceFactory::cStreamingServiceFactory(const std::string& strAdtfPluginFolder, bool bLogging):
m_strAdtfPluginFolder(strAdtfPluginFolder.c_str())
{
m_strAdtfPluginFolder.AppendTrailingSlash();
if (!bLogging)
{
util::log::set_logger(&null_logger);
}
m_oADTFSystem.EnableLogging(true);
m_oADTFSystem.SetLoggingParameter(bLogging ? adtf::util::log::All : adtf::util::log::None,
adtf::util::log::None,
adtf::util::log::None,
0);
CHECKED(m_oADTFSystem.Launch(adtf::util::cCommandLine()));
LoadPlugin("default_core_objects.adtfplugin");
LoadPlugin("adtf_ipc_host_only.adtfplugin");
LoadPlugin("adtf_ipc_tcp.adtfplugin");
LoadPlugin("adtf_ipc_udp.adtfplugin");
#ifndef _MSC_VER
LoadPlugin("adtf_ipc_sctp.adtfplugin");
#endif
CHECKED(_runtime->SetRunLevel(base::tADTFRunLevel::RL_System));
}
cStreamingServiceFactory::~cStreamingServiceFactory()
{
_runtime->SetRunLevel(ucom::IRuntime::RL_Shutdown);
_runtime->UnregisterAllPlugins(ucom::IRuntime::RL_Internal);
}
void set_properties(const ucom::iobject_ptr<ucom::IObject>& pObject,
const std::unordered_map<std::string, std::string>& oProperties)
{
if (!oProperties.empty())
{
auto pConfiguration = ucom::ucom_cast<base::IConfiguration*>(pObject.Get());
for (const auto& oProperty : oProperties)
{
base::set_property_by_path<util::cString>(*pConfiguration, oProperty.first.c_str(), oProperty.second.c_str());
}
}
}
cSource cStreamingServiceFactory::CreateSource(const std::string& strClassId,
const std::string& strName,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers,
const std::unordered_map<std::string, std::string>& oProperties)
{
ucom::object_ptr<streaming::ISampleStreamingSource> pSource;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pSource));
pSource->SetName(strName.c_str());
set_properties(pSource, oProperties);
return cSource(pSource, oStreamHandlers);
}
cSink cStreamingServiceFactory::CreateSink(const std::string& strClassId,
const std::string& strName,
const std::vector<std::string>& oStreams,
const std::unordered_map<std::string, std::string>& oProperties,
std::function<void(const std::string&, uint32_t)> fnEnableSubstream,
std::function<void(const std::string&, uint32_t)> fnDisableSubstream)
{
ucom::object_ptr<streaming::ISampleStreamingSink> pSink;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pSink));
pSink->SetName(strName.c_str());
set_properties(pSink, oProperties);
return cSink(pSink, oStreams, fnEnableSubstream, fnDisableSubstream);
}
void cStreamingServiceFactory::LoadService(const std::string& strFileName,
const std::string& strClassId,
const std::string& strObjectId)
{
LoadPlugin(strFileName);
ucom::object_ptr<ucom::IObject> pService;
CHECKED(_runtime->CreateInstance(strClassId.c_str(), pService));
CHECKED(_runtime->RegisterObject(pService, strObjectId.c_str(), base::tADTFRunLevel::RL_System));
}
void cStreamingServiceFactory::LoadPlugin(const std::string& strFileName)
{
ucom::object_ptr<ucom::IPluginInfo> pPlugin;
util::cFilename strFileNameHelper(strFileName.c_str());
if (strFileNameHelper.IsRelative())
{
strFileNameHelper = m_strAdtfPluginFolder + strFileNameHelper;
}
CHECKED(_runtime->RegisterPlugin(strFileNameHelper, ucom::IRuntime::RL_Internal, pPlugin));
}
namespace
{
class cTriggerPipeItemHelper :
public ucom::catwo::object<streaming::trigger_pipe_item<streaming::ITriggerPipeItem>>
{
public:
cTriggerPipeItemHelper(base::IRunnable::tRunFunction fnTriggerHandler):
m_oRunnable(fnTriggerHandler)
{
SetRunnable(m_oRunnable);
}
private:
base::runnable<base::IRunnable::RUN_TRIGGER> m_oRunnable;
};
}
cSource::cSource(const ucom::iobject_ptr<adtf::streaming::ISampleStreamingSource>& pSource,
const std::unordered_map<std::string, tStreamHandler>& oStreamHandlers):
m_pSource(pSource)
{
CHECKED(pSource->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Constructed));
ucom::object_ptr<streaming::IStreamType> pType = ucom::make_object_ptr<streaming::cStreamType>(streaming::stream_meta_type_anonymous());
for (auto& oStreamHandler : oStreamHandlers)
{
ucom::object_ptr<streaming::IOutPin> pPin;
CHECKED(pSource->RequestPin(oStreamHandler.first.c_str(), pType, pPin));
tStreamObjects& sStreamObjects = m_oObjectsOfStreams.emplace_back();
sStreamObjects.pReader.reset(new streaming::cDynamicSampleReader);
sStreamObjects.pReader->SetName(oStreamHandler.first.c_str());
if (oStreamHandler.second.fnSyncStreamTypeHandler)
{
sStreamObjects.pReader->SetSynchronousTypeUpdateCallback([oStreamHandler, &sStreamObjects](
const ucom::iobject_ptr<const streaming::IStreamType>& pStreamType) ->
tResult
{
sStreamObjects.oRequests.clear();
RETURN_IF_THROWS(oStreamHandler.second.fnSyncStreamTypeHandler(pStreamType, sStreamObjects));
});
}
if (oStreamHandler.second.fnStreamTypeHandler)
{
sStreamObjects.pReader->SetAcceptTypeCallback([oStreamHandler](
const ucom::iobject_ptr<const streaming::IStreamType>& pStreamType) ->
tResult
{
});
}
sStreamObjects.pSampleStream = ucom::make_object_ptr<streaming::cSampleStream>();
CHECKED(streaming::testing::connect_pin(pPin, sStreamObjects.pSampleStream, true));
CHECKED(sStreamObjects.pReader->BeginStreaming(*sStreamObjects.pSampleStream));
streaming::cDynamicSampleReader* pReaderPointer = sStreamObjects.pReader.get();
ucom::object_ptr<const streaming::ISample> pSample;
while (
IS_OK(pReaderPointer->GetNextSample(pSample)))
{
if (oStreamHandler.second.fnSampleHandler)
{
}
}
});
sStreamObjects.pSampleStream->RegisterSubItem(pTriggerHandler,
}
CHECKED(m_pSource->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Streaming));
}
cSource::~cSource()
{
if (m_pSource)
{
for (auto& sStreamObjects : m_oObjectsOfStreams)
{
sStreamObjects.oRequests.clear();
sStreamObjects.pReader->EndStreaming();
}
m_pSource->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Shutdown);
}
}
void cSource::tStreamObjects::RequestSubstream(uint32_t nSubstreamId)
{
ucom::object_ptr<streaming::IStreamingRequest> pRequest;
CHECKED(pReader->RequestSamples(pRequest, nSubstreamId));
oRequests.emplace(nSubstreamId, pRequest);
}
void cSource::tStreamObjects::CancelSubstream(uint32_t nSubstreamId)
{
oRequests.erase(nSubstreamId);
}
cSink::cSink(const ucom::iobject_ptr<adtf::streaming::ISampleStreamingSink>& pSink,
const std::vector<std::string>& oStreams,
std::function<void(const std::string&, uint32_t)> fnEnableSubstream,
std::function<void(const std::string&, uint32_t)> fnDisableSubstream):
m_pSink(pSink)
{
CHECKED(_runtime->GetObject(m_pClock));
CHECKED(pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Constructed));
ucom::object_ptr<streaming::IStreamType> pType = ucom::make_object_ptr<streaming::cStreamType>(streaming::stream_meta_type_anonymous());
for (auto& strStream : oStreams)
{
tStreamObjects sStreamObjects;
CHECKED(pSink->RequestPin(strStream.c_str(), pType, sStreamObjects.pSinkPin));
sStreamObjects.pWriter.reset(new streaming::requestable_writer<>);
sStreamObjects.pWriter->SetName(strStream.c_str());
sStreamObjects.pSampleStream = ucom::make_object_ptr<streaming::cSampleStream>();
CHECKED(streaming::testing::connect_pin(sStreamObjects.pSinkPin, sStreamObjects.pSampleStream, true));
CHECKED(sStreamObjects.pWriter->BeginStreaming(*sStreamObjects.pSampleStream));
sStreamObjects.pWriter->SetCallbacks([strStream, fnEnableSubstream](uint32_t nSubStreamId, const base::IProperties*)
{
fnEnableSubstream(strStream, nSubStreamId);
},
[strStream, fnDisableSubstream](uint32_t nSubStreamId)
{
fnDisableSubstream(strStream, nSubStreamId);
});
m_oObjectsOfStreams.emplace(strStream, std::move(sStreamObjects));
}
CHECKED(pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Streaming));
}
cSink::~cSink()
{
if (m_pSink)
{
m_pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Initialized);
for (auto& sStreamObjects : m_oObjectsOfStreams)
{
sStreamObjects.second.pSinkPin->Disconnect();
sStreamObjects.second.pWriter->EndStreaming();
}
m_pSink->SetState(streaming::ISampleStreamingSource::tStreamingState::State_Shutdown);
}
}
void cSink::SendStreamType(const std::string& strStreamName,
const ucom::iobject_ptr<const adtf::streaming::IStreamType>& pStreamType)
{
auto oStreamObjects = m_oObjectsOfStreams.find(strStreamName);
if (oStreamObjects == m_oObjectsOfStreams.end())
{
throw std::runtime_error("no such stream: " + strStreamName);
}
CHECKED(oStreamObjects->second.pWriter->ChangeType(pStreamType));
}
void cSink::Send(const std::string& strStreamName, const void* pData, size_t nDataSize)
{
auto oStreamObjects = m_oObjectsOfStreams.find(strStreamName);
if (oStreamObjects == m_oObjectsOfStreams.end())
{
throw std::runtime_error("no such stream: " + strStreamName);
}
auto tmNow = m_pClock->GetStreamTimeNs();
ucom::object_ptr<streaming::ISample> pSample;
CHECKED(streaming::alloc_sample(pSample, tmNow));
{
ucom::object_ptr_locked<streaming::ISampleBuffer> pBuffer;
CHECKED(pSample->WriteLock(pBuffer, nDataSize));
CHECKED(pBuffer->Write(base::adtf_memory_buffer<const void>(pData, nDataSize)));
}
CHECKED(oStreamObjects->second.pWriter->Transmit(pSample));
const auto pSampleStream = ucom::ucom_cast<base::flash::IRunnable*>(oStreamObjects->second.pSampleStream.Get());
if (!pSampleStream)
{
throw std::runtime_error("Cannot cast adtf::streaming::ISampleStream to adtf::base::flash::IRunnable");
}
CHECKED(pSampleStream->Run(tmNow ,
base::IRunnable::RUN_TRIGGER,
nullptr,
0));
}
class cPropertyDumper : public base::cPropertiesHelper
{
public:
cPropertyDumper(tStreamType& sStreamType, const std::string& strPrefix = std::string()):
m_sStreamType(sStreamType),
m_strPrefix(strPrefix)
{
}
virtual tResult SetProperty(
const base::IProperty& oProperty)
{
std::string strName;
std::string strValue;
RETURN_IF_FAILED(oProperty.GetValue()->ToString(base::adtf_string<std::string>(&strValue)));
m_sStreamType.oProperties.emplace(m_strPrefix + strName, strValue);
ucom::object_ptr<const base::IProperties> pSubProperties;
if (
IS_OK(oProperty.GetProperties(pSubProperties)))
{
cPropertyDumper oSubDumper(m_sStreamType, m_strPrefix + strName + "/");
pSubProperties->Get(oSubDumper);
}
}
private:
tStreamType& m_sStreamType;
std::string m_strPrefix;
};
tStreamType get_stream_type(const ucom::iobject_ptr<const streaming::IStreamType>& pStreamType)
{
tStreamType sType;
CHECKED(pStreamType->GetMetaTypeName(base::adtf_string<std::string>(&sType.strMetaTypeName)));
ucom::object_ptr<const base::IProperties> pProperties;
CHECKED(pStreamType->GetConfig(pProperties))
cPropertyDumper oDumper(sType);
CHECKED(pProperties->Get(oDumper));
return sType;
}
ucom::object_ptr<streaming::IStreamType> create_stream_type(const tStreamType& sStreamType)
{
ucom::object_ptr<streaming::IStreamType> pNewType = ucom::make_object_ptr<adtf::streaming::cCamelionStreamType>(sStreamType.strMetaTypeName.c_str());
ucom::object_ptr<base::IProperties> pProperties;
CHECKED(pNewType->GetConfig(pProperties));
for (auto& oProperty : sStreamType.oProperties)
{
CHECKED(base::set_property<util::cString>(*pProperties,
oProperty.first.c_str(),
oProperty.second.c_str()));
}
return pNewType;
}
void get_sample_data(const ucom::iobject_ptr<const streaming::ISample>& pSample,
std::function<void(const void*, size_t)> fnCallback)
{
ucom::object_ptr_shared_locked<const streaming::ISampleBuffer> pBuffer;
CHECKED(pSample->Lock(pBuffer));
fnCallback(pBuffer->GetPtr(), pBuffer->GetSize());
}
}
}
}
tInt64 tTimeStamp
type definition for a time value.
A_UTILS_NS::cResult tResult
For backwards compatibility and to bring latest version into scope.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
#define IS_OK(s)
Check if result is OK.
Copyright © Audi Electronics Venture GmbH.
#define CID_ADTF_KERNEL
Object ID of the default kernel service.
object_ptr< Implementation > make_object_ptr(Args &&... args)
Alias always bringing the latest version of ant::make_object_ptr() into scope.
#define CID_ADTF_REFERENCE_CLOCK
ClassID for the ADTF Default Reference Clock.
#define RETURN_IF_THROWS(s)
if the expression throws an exception, returns a tResult containing the exception information.