ADTF  3.18.2
Source Code for Demo Legacy UDP Receiver and Sender Plugin
Location
./src/examples/src/adtf/streaming_services/adtf2_legacy/legacy_udp_services/
Namespace for entire ADTF SDK.
Build Environment
To see how to set up the build environment have a look at ADTF CMake Environment
this implementation shows:
Remarks
Header (Receiver)
#include "stdafx.h"
#include "demo_legacy_udp_receiver.h"
cDemoUDPReceiver::cDemoUDPReceiver()
{
//SetPropertyInt("port", 3000);
// sets a short description for the component
set_description(*this, "Use this legacy Streaming Source to receive sample data from another ADTF System using UDP protocol.");
}
cDemoUDPReceiver::~cDemoUDPReceiver()
{
}
tResult cDemoUDPReceiver::Init(tInitStage eStage)
{
RETURN_IF_FAILED(cLegacyStreamingSource::Init(eStage));
if (eStage == tInitStage::StageFirst)
{
object_ptr<IStreamType> pType = make_object_ptr<cStreamType>(stream_meta_type_anonymous());
RETURN_IF_FAILED(m_oOutputPin.Create("output", pType));
RETURN_IF_FAILED(RegisterPin(m_oOutputPin));
}
else if (eStage == tInitStage::StageNormal)
{
RETURN_IF_FAILED(m_oSocket.Open(GetPropertyInt("port")));
m_oSocket.SetTimeout(10000);
}
}
tResult cDemoUDPReceiver::Start()
{
RETURN_IF_FAILED(cLegacyStreamingSource::Start());
m_oThread = kernel_thread_looper(OIGetInstanceName() + "::recieve_thread",
&cDemoUDPReceiver::ThreadLoopFunc,
this);
if (!m_oThread.Joinable())
{
RETURN_ERROR_DESC(ERR_UNEXPECTED, "Unable to create kernel thread");
}
}
tResult cDemoUDPReceiver::Stop()
{
m_oThread = kernel_thread_looper();
return cLegacyStreamingSource::Stop();
}
tResult cDemoUDPReceiver::Shutdown(tInitStage eStage)
{
if (eStage == tInitStage::StageNormal)
{
m_oSocket.Close();
}
return cLegacyStreamingSource::Shutdown(eStage);
}
void cDemoUDPReceiver::ThreadLoopFunc()
{
tResult nResult = TransmitPacket();
if (IS_FAILED(nResult))
{
m_oOutputPin.SetStreamError(nResult);
}
}
tResult cDemoUDPReceiver::TransmitPacket()
{
uint8_t aBuffer[0xFFFF];
int nBytes = 0xFFFF;
if (IS_OK(m_oSocket.Read(aBuffer, nBytes, &nBytes)))
{
object_ptr<ISample> pSample;
{
object_ptr_locked<ISampleBuffer> pBuffer;
RETURN_IF_FAILED(pSample->WriteLock(pBuffer, nBytes));
RETURN_IF_FAILED(pBuffer->Write(adtf_memory_buffer<const uint8_t>(aBuffer, nBytes)));
}
RETURN_IF_FAILED(m_oOutputPin.Transmit(object_ptr<const ISample>(pSample)));
}
}
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
tResult alloc_sample(ucom::ant::iobject_ptr< ucom::ant::IObject > &pSampleObject, const char *strSampleCID)
Helper Function to get a Sample Instance through the adtf::ucom::ant::IRuntime.
void set_description(base::ant::IConfiguration &oConfig, const char *strDescription)
Sets description information.
Implementation (Receiver)
#include "stdafx.h"
#include "demo_legacy_udp_receiver.h"
cDemoUDPReceiver::cDemoUDPReceiver()
{
//SetPropertyInt("port", 3000);
// sets a short description for the component
set_description(*this, "Use this legacy Streaming Source to receive sample data from another ADTF System using UDP protocol.");
}
cDemoUDPReceiver::~cDemoUDPReceiver()
{
}
tResult cDemoUDPReceiver::Init(tInitStage eStage)
{
RETURN_IF_FAILED(cLegacyStreamingSource::Init(eStage));
if (eStage == tInitStage::StageFirst)
{
object_ptr<IStreamType> pType = make_object_ptr<cStreamType>(stream_meta_type_anonymous());
RETURN_IF_FAILED(m_oOutputPin.Create("output", pType));
RETURN_IF_FAILED(RegisterPin(m_oOutputPin));
}
else if (eStage == tInitStage::StageNormal)
{
RETURN_IF_FAILED(m_oSocket.Open(GetPropertyInt("port")));
m_oSocket.SetTimeout(10000);
}
}
tResult cDemoUDPReceiver::Start()
{
RETURN_IF_FAILED(cLegacyStreamingSource::Start());
m_oThread = kernel_thread_looper(OIGetInstanceName() + "::recieve_thread",
&cDemoUDPReceiver::ThreadLoopFunc,
this);
if (!m_oThread.Joinable())
{
RETURN_ERROR_DESC(ERR_UNEXPECTED, "Unable to create kernel thread");
}
}
tResult cDemoUDPReceiver::Stop()
{
m_oThread = kernel_thread_looper();
return cLegacyStreamingSource::Stop();
}
tResult cDemoUDPReceiver::Shutdown(tInitStage eStage)
{
if (eStage == tInitStage::StageNormal)
{
m_oSocket.Close();
}
return cLegacyStreamingSource::Shutdown(eStage);
}
void cDemoUDPReceiver::ThreadLoopFunc()
{
tResult nResult = TransmitPacket();
if (IS_FAILED(nResult))
{
m_oOutputPin.SetStreamError(nResult);
}
}
tResult cDemoUDPReceiver::TransmitPacket()
{
uint8_t aBuffer[0xFFFF];
int nBytes = 0xFFFF;
if (IS_OK(m_oSocket.Read(aBuffer, nBytes, &nBytes)))
{
object_ptr<ISample> pSample;
{
object_ptr_locked<ISampleBuffer> pBuffer;
RETURN_IF_FAILED(pSample->WriteLock(pBuffer, nBytes));
RETURN_IF_FAILED(pBuffer->Write(adtf_memory_buffer<const uint8_t>(aBuffer, nBytes)));
}
RETURN_IF_FAILED(m_oOutputPin.Transmit(object_ptr<const ISample>(pSample)));
}
}
  • Header (Sender)
    #include "stdafx.h"
    #include "demo_legacy_udp_sender.h"
    cDemoUDPSender::cDemoUDPSender()
    {
    // sets a short description for the component
    set_description(*this, "Use this legacy Streaming Sink to transmit sample data from another ADTF System using UDP protocol.");
    }
    cDemoUDPSender::~cDemoUDPSender()
    {
    }
    tResult cDemoUDPSender::Init(tInitStage eStage)
    {
    RETURN_IF_FAILED(cLegacyStreamingSink::Init(eStage));
    if (eStage == tInitStage::StageFirst)
    {
    //SetPropertyStr("host", "");
    //SetPropertyInt("port", 3000);
    object_ptr<IStreamType> pType = make_object_ptr<cStreamType>(stream_meta_type_anonymous());
    RETURN_IF_FAILED(m_oInputPin.Create("Input", pType));
    RETURN_IF_FAILED(RegisterPin(m_oInputPin));
    }
    else if (eStage == tInitStage::StageNormal)
    {
    RETURN_IF_FAILED(m_oSocket.Open());
    //m_nTargetIP = cSocket::AddressToUInt32(GetPropertyStr("host"));
    //m_nTargetPort = GetPropertyInt("port");
    }
    }
    tResult cDemoUDPSender::Start()
    {
    return cLegacyStreamingSink::Start();
    }
    tResult cDemoUDPSender::Stop()
    {
    return cLegacyStreamingSink::Stop();
    }
    tResult cDemoUDPSender::Shutdown(tInitStage eStage)
    {
    if (eStage == tInitStage::StageNormal)
    {
    m_oSocket.Close();
    }
    return cLegacyStreamingSink::Shutdown(eStage);
    }
    tResult cDemoUDPSender::OnSampleReceived(cLegacyInputPin* /* pPin */, const adtf::ucom::iobject_ptr<const ISample>& pSample)
    {
    __sample_read_lock(pSample, void, pData);
    return m_oSocket.Write(m_nTargetIP, m_nTargetPort, pData, static_cast<int>(pDataBuffer->GetSize()));
    }
    tResult cDemoUDPSender::OnTypeChanged(cLegacyInputPin* pPin, const adtf::ucom::iobject_ptr<const IStreamType>& pType)
    {
    return cLegacyStreamingSink::OnTypeChanged(pPin, pType);
    }
    Base object pointer to realize binary compatible reference counting in interface methods.
    #define __sample_read_lock(__sample, __type, __variable)
    Compatibility macro to gain read access to a sample.
    Implementation (Sender)
    #include "stdafx.h"
    #include "demo_legacy_udp_sender.h"
    cDemoUDPSender::cDemoUDPSender()
    {
    // sets a short description for the component
    set_description(*this, "Use this legacy Streaming Sink to transmit sample data from another ADTF System using UDP protocol.");
    }
    cDemoUDPSender::~cDemoUDPSender()
    {
    }
    tResult cDemoUDPSender::Init(tInitStage eStage)
    {
    RETURN_IF_FAILED(cLegacyStreamingSink::Init(eStage));
    if (eStage == tInitStage::StageFirst)
    {
    //SetPropertyStr("host", "");
    //SetPropertyInt("port", 3000);
    object_ptr<IStreamType> pType = make_object_ptr<cStreamType>(stream_meta_type_anonymous());
    RETURN_IF_FAILED(m_oInputPin.Create("Input", pType));
    RETURN_IF_FAILED(RegisterPin(m_oInputPin));
    }
    else if (eStage == tInitStage::StageNormal)
    {
    RETURN_IF_FAILED(m_oSocket.Open());
    //m_nTargetIP = cSocket::AddressToUInt32(GetPropertyStr("host"));
    //m_nTargetPort = GetPropertyInt("port");
    }
    }
    tResult cDemoUDPSender::Start()
    {
    return cLegacyStreamingSink::Start();
    }
    tResult cDemoUDPSender::Stop()
    {
    return cLegacyStreamingSink::Stop();
    }
    tResult cDemoUDPSender::Shutdown(tInitStage eStage)
    {
    if (eStage == tInitStage::StageNormal)
    {
    m_oSocket.Close();
    }
    return cLegacyStreamingSink::Shutdown(eStage);
    }
    tResult cDemoUDPSender::OnSampleReceived(cLegacyInputPin* /* pPin */, const adtf::ucom::iobject_ptr<const ISample>& pSample)
    {
    __sample_read_lock(pSample, void, pData);
    return m_oSocket.Write(m_nTargetIP, m_nTargetPort, pData, static_cast<int>(pDataBuffer->GetSize()));
    }
    tResult cDemoUDPSender::OnTypeChanged(cLegacyInputPin* pPin, const adtf::ucom::iobject_ptr<const IStreamType>& pType)
    {
    return cLegacyStreamingSink::OnTypeChanged(pPin, pType);
    }