ADTF  3.18.2
Source Code for UDP/TCP Receiver/Sender From/To Non-ADTF Application Plugin
Location
./src/examples/src/remote/foreign_application/
Build Environment
To see how to set up the build environment have a look at ADTF CMake Environment
This example shows:
  • how to implement a Streaming Source
  • how to implement a Streaming Sink
  • how to implement an UDP and a TCP socket (IPv4 and IPv6) with asio library communication
  • how to communicate with a small non-ADTF application
Header (UDP Sender)
#pragma once
#include <sink_to_non_adtf.h>
#include "udp_socket_wrapper.h"
class cUdpSinkToNonADTFSystem : public cSinkToNonADTFSystem
{
public:
ADTF_CLASS_ID_NAME(cUdpSinkToNonADTFSystem,
"demo_foreign_application_udp_sender.streaming_sink.adtf.cid",
"UDP Sender To Non ADTF Application");
cUdpSinkToNonADTFSystem();
tResult StartStreaming() override;
private:
void Send(const void* pData, size_t nDataSize) override;
IUdpSocket& GetSocket();
};
#define ADTF_CLASS_ID_NAME(_class, _strcid, _strclabel)
Common macro to enable correct treatment of class identifier AND Class Name by IClassInfo.
Definition: class_id.h:33
Property Variable template for the given T.
Helper class that wraps a streaming::ant::IBindingClient.
Definition: graph_object.h:67
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:163
Implementation (UDP Sender)
#include "udp_sink_to_non_adtf.h"
using namespace adtf::util;
using namespace adtf::ucom;
using namespace adtf::base;
using namespace adtf::streaming;
using namespace asio::ip;
cUdpSinkToNonADTFSystem::cUdpSinkToNonADTFSystem()
{
m_strRemoteHost.SetDescription("If set, send data to this host. This is required if the sink is not connected to a source.\n"
"If this property is empty, packets will be sent to the last endpoint that data was received from.\n"
"If set to 'localhost' the IPv4 localhost address '127.0.0.1' is used.\n"
"IPv4 Example: '127.0.0.1'\n"
"IPv6 Example: '::1'\n");
RegisterPropertyVariable("remote_host", m_strRemoteHost);
m_nRemotePort.SetDescription("This is only taken into account when remote_host is set as well and when this sink is not connected to a source via the socket interface binding.");
RegisterPropertyVariable("port", m_nRemotePort);
m_oUdpSocketClient = CreateInterfaceClient<IUdpSocket>("socket");
SetDescription("socket", "Interface client to use a shared socket for communication. The connected socket provides the local host and port to bound as source.");
// sets a short description for the component
SetDescription("Use this Streaming Sink to transmit sample data to a Non-ADTF Application using the UDP protocol");
// set help link to jump to documentation from ADTF Configuration Editor
SetHelpLink("$(ADTF_DIR)/doc/adtf_html/page_demo_non_adtf_application_sender_receiver.html");
}
tResult cUdpSinkToNonADTFSystem::StartStreaming()
{
RETURN_IF_FAILED(cSampleStreamingSink::StartStreaming());
if (!m_oUdpSocketClient.IsValid())
{
// we are not connected to a source, so just create our own socket instance
auto pSocket = make_object_ptr<cUdpSocketWrapper>();
RETURN_IF_FAILED(pSocket->Open("", 0, "", 0, 0));
m_pFallbackSocket = pSocket;
}
if (m_strRemoteHost->IsNotEmpty())
{
RETURN_IF_FAILED(GetSocket().SetRemote(m_strRemoteHost->GetPtr(), m_nRemotePort));
}
}
void cUdpSinkToNonADTFSystem::Send(const void* pData, size_t nDataSize)
{
const auto oResult = GetSocket().Send(pData, nDataSize);
if (IS_FAILED(oResult))
{
LOG_RESULT(oResult);
}
}
IUdpSocket& cUdpSinkToNonADTFSystem::GetSocket()
{
return m_pFallbackSocket ? *m_pFallbackSocket : m_oUdpSocketClient.Get();
}
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
Namespace for the ADTF Base SDK.
Namespace for the ADTF Streaming SDK.
Namespace for the ADTF uCOM3 SDK.
alias namespace for the A_UTILS Library.
Header (UDP Receiver)
#pragma once
#include <adtf_base.h>
#include <adtf_filtersdk.h>
#include "udp_socket_wrapper.h"
class cUdpSourceFromNonADTFSystem : public adtf::filter::cSampleStreamingSource
{
public:
ADTF_CLASS_ID_NAME(cUdpSourceFromNonADTFSystem,
"demo_foreign_application_udp_receiver.streaming_source.adtf.cid",
"UDP Receiver From Non ADTF Application");
cUdpSourceFromNonADTFSystem();
tResult Construct() override;
tResult Init() override;
tResult StartStreaming() override;
tResult StopStreaming() override;
private:
void SocketThread();
void PackToSample(const void* pData, size_t nBytes);
adtf::base::property_variable<bool> m_bDeserializeViaMediaDescription = false;
adtf::base::property_variable<bool> m_bMarkSamplesAsDeserialized = true;
adtf::base::property_variable<size_t> m_nReceiveBufferSize = 9000;
adtf::base::property_variable<size_t> m_nSocketReceiveBufferSize = 0;
// Usually, every device source will need the reference clock.
// Either to stamp samples using this clock on arrival, or to convert external timestamps.
adtf::streaming::ISampleWriter* m_pOutputWriter = nullptr;
// A device source usually a runners for the device context, which is under real-time constraints as it's under risk of having device side buffers flow over.
// The consumer should connect a Thread Invoker or similar decoupling components to the output of the source in order to preserve real-time properties.
// This runner is providing our real-time handling of the socket thread.
ddl::codec::CodecFactory m_oCodecFactory;
};
#define REQUIRE_INTERFACE(_interface)
Macro usable with ADTF_CLASS_DEPENDENCIES() to require mandatory interfaces.
#define ADTF_CLASS_DEPENDENCIES(...)
Add interface ids (string literals,.
Base class for ADTF sample streaming sources.
tResult StopStreaming() override
Stop your threads and timers in this method.
tResult StartStreaming() override
Start your threads and timers in this method.
tResult Init() override
Initializes the source.
This class is used to provide a fallback kernel thread/timer for a runner of a streaming service.
Kernel interface for thread, timer and signal handling.
Definition: kernel_intf.h:388
The IReferenceClock interface provides the reference time source for the filter graph.
virtual tResult Construct()
Construct transission while changing from tStreamingState::State_Shutdown to tStreamState::State_Cons...
Interface for sample writers that write to sample streams via output pins.
Copyright © Audi Electronics Venture GmbH.
Implementation (UDP Receiver)
#include "udp_source_from_non_adtf.h"
#include <functional>
using namespace adtf::util;
using namespace adtf::ucom;
using namespace adtf::base;
using namespace adtf::streaming;
using namespace adtf::system;
cUdpSourceFromNonADTFSystem::cUdpSourceFromNonADTFSystem()
{
m_strInterface.SetDescription("if set, bind the socket to this local interface");
RegisterPropertyVariable("interface", m_strInterface);
m_nListeningPort.SetDescription("The UDP port to bind to and listen on.");
RegisterPropertyVariable("port", m_nListeningPort);
m_strMulticastGroup.SetDescription("if set, join the given multicast group");
RegisterPropertyVariable("multicast_group", m_strMulticastGroup);
m_strDDLStructName.SetDescription("If set, the stream type will contain the media description of the given struct, retrieved from the Media Description Service.");
RegisterPropertyVariable("ddl_struct_name", m_strDDLStructName);
m_bDeserializeViaMediaDescription.SetDescription("If activated, incoming data will be deserialized as defined "
"by the media description of the struct selected via ddl_struct_name before writing it into output samples.");
RegisterPropertyVariable("deserialize_via_media_description", m_bDeserializeViaMediaDescription);
m_bMarkSamplesAsDeserialized.SetDescription("In case that 'deserialize_via_media_description' is disabled, this will force the output stream type to have the deserialized flag set.");
RegisterPropertyVariable("mark_samples_as_deserialized", m_bMarkSamplesAsDeserialized);
m_nReceiveBufferSize.SetDescription("Maximum payload size in bytes this filter can accept. Choose according to configured MTU. Values up to 64kB are plausible from UDP side.");
m_nReceiveBufferSize.SetValidRange(1, 0xFFFF);
RegisterPropertyVariable("receive_buffer_size", m_nReceiveBufferSize);
m_nSocketReceiveBufferSize.SetDescription("Sockets receive buffer size in bytes (min: 4098, Default: 0 - uses system default)");
RegisterPropertyVariable("socket_receive_buffer_size", m_nSocketReceiveBufferSize);
m_pOutputWriter = CreateOutputPin("output");
SetDescription("output", "Provides the sample data received from a Non-ADTF instance over UDP");
m_pSocket = adtf::ucom::make_object_ptr<cUdpSocketWrapper>(
std::bind(&cUdpSourceFromNonADTFSystem::PackToSample, this, std::placeholders::_1, std::placeholders::_2)
);
CreateInterfaceServer<IUdpSocket>("socket", ucom_object_ptr_cast<IUdpSocket>(m_pSocket));
SetDescription("socket", "Interface server to provide a shared socket for communication");
// For session compatibility reasons we use this fallback helper for the case where no active runner is connected to the runner.
// In your own implementations use a simple CreateRunner(...) instead!
// We are using the callback based signature so the device context thread is free-running.
// This means WE have to deal with thread synchronization for this thread!
m_oSocketThread = adtf::filter::cRunnerFallback(this, "socket_thread", cThreadTriggerHint(), [this](){ SocketThread(); });
SetDescription("socket_thread",
"Connect a Thread Runner that will provide the context for receiving all data from the socket. "
"If this is not connected, the source will create a thread on its own.");
// sets a short description for the component
SetDescription("Use this Streaming Source to receive sample data from a Non-ADTF Application using the UDP protocol");
// set help link to jump to documentation from ADTF Configuration Editor
SetHelpLink("$(ADTF_DIR)/doc/adtf_html/page_demo_non_adtf_application_sender_receiver.html");
}
tResult cUdpSourceFromNonADTFSystem::Construct()
{
RETURN_IF_FAILED(cSampleStreamingSource::Construct());
if (!m_strDDLStructName->empty())
{
object_ptr<IStreamType> pType = make_object_ptr<adtf::mediadescription::stream_type_default<>>(
m_strDDLStructName->c_str(), m_bDeserializeViaMediaDescription || m_bMarkSamplesAsDeserialized ?
m_pOutputWriter->ChangeType(pType);
if (m_bDeserializeViaMediaDescription)
{
m_oCodecFactory = ddl::codec::CodecFactory(std::get<0>(oDescription).c_str(),
std::get<1>(oDescription).c_str());
}
}
else
{
if (m_bDeserializeViaMediaDescription)
{
RETURN_ERROR_DESC(ERR_INVALID_ARG, "Deserialization requires a ddl struct name to be set.");
}
}
}
tResult cUdpSourceFromNonADTFSystem::Init()
{
RETURN_IF_FAILED(cSampleStreamingSource::Init());
RETURN_IF_FAILED(m_pSocket->Open(*m_strInterface, *m_nListeningPort, *m_strMulticastGroup, *m_nReceiveBufferSize,
*m_nSocketReceiveBufferSize));
}
tResult cUdpSourceFromNonADTFSystem::StartStreaming()
{
// In StartStreaming() we enable our filter to stream data.
// This means foremost starting all threads which we control ourselves.
// Additionallly, we may now enable blocking operations on thread runner ports.
// The implementation of this method needs to be threadsafe with regard to any already running trigger processing.
// Base class FIRST for init / start.
RETURN_IF_FAILED(cSampleStreamingSource::StartStreaming());
RETURN_IF_FAILED(m_pSocket->Start());
// Used with cRunnerFallback only. Normally, a runner will start running without any further action.
RETURN_IF_FAILED(m_oSocketThread.Activate());
}
tResult cUdpSourceFromNonADTFSystem::StopStreaming()
{
// In StopStreaming() we must ensure that all streaming can seize.
// This means we must both stop all threads we have started ourselves.
// We also must unblock all connected runners, and ensure that they can not block again!
// The implementation of this method needs to be threadsafe with regard to any trigger processing.
RETURN_IF_FAILED(m_pSocket->Stop());
// Used with cRunnerFallback only. Normally, a runner thread will be joined by the attached runner.
m_oSocketThread.Deactivate();
// Base class LAST for shutdown / stop.
return cSampleStreamingSource::StopStreaming();
}
void cUdpSourceFromNonADTFSystem::SocketThread()
{
// There is still a chance of pending invocations to
const tResult nResult = m_pSocket->Run();
if (IS_FAILED(nResult))
{
LOG_RESULT(nResult);
}
}
void cUdpSourceFromNonADTFSystem::PackToSample(const void* pData, size_t nBytes)
{
// Get the applicable time stamps as ealy as possible!
// Usually, you would want to prefer to use a hardware provided timestamp if *any* exists.
// This timestamp is not going to be accurate as this method is potentially already called several milliseconds delayed.
// If you need precise timestamps, then ASIO and UDP are not the way to go. PCAP will be a much better fit, but requires aditional drivers.
const auto tmNow = m_pClock->GetStreamTimeNs();
object_ptr<ISample> pSample;
THROW_IF_FAILED(alloc_sample(pSample, tmNow));
if (m_bDeserializeViaMediaDescription)
{
const auto oDecoder = m_oCodecFactory.makeDecoderFor(pData, nBytes, ddl::tDataRepresentation::Serialized);
THROW_IF_FAILED(alloc_sample(pSample, tmNow));
(TO_ADTF_RESULT(oDecoder.isValid()));
object_ptr_locked<ISampleBuffer> pBuffer;
THROW_IF_FAILED(pSample->WriteLock(pBuffer, oDecoder.getBufferSize(ddl::tDataRepresentation::Deserialized)));
auto oCodec =
oDecoder.makeCodecFor(pBuffer->GetPtr(), pBuffer->GetSize(), ddl::tDataRepresentation::Deserialized);
THROW_IF_FAILED(TO_ADTF_RESULT(ddl::codec::transform(oDecoder, oCodec)));
}
else
{
object_ptr_locked<ISampleBuffer> pBuffer;
THROW_IF_FAILED(pSample->WriteLock(pBuffer, nBytes));
THROW_IF_FAILED(pBuffer->Write(adtf_memory_buffer<const void>(pData, nBytes)));
}
m_pOutputWriter->Write(pSample);
// We are not returning from this thread, and we have opted *against* forwarding triggers when we return from the runner function.
// That means we are responsible ourselves for gennerating triggers.
m_pOutputWriter->ManualTrigger(tmNow);
}
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
virtual tResult GetObject(iobject_ptr< IObject > &pObject, const char *strNameOID) const =0
Get registered object from object registry.
std::tuple< std::string, std::string, adtf_ddl::tDataRepresentation > get_media_description_from_stream_type(const adtf::streaming::IStreamType &oStreamType)
Extract the media description information from stream type.
tResult alloc_sample(ucom::ant::iobject_ptr< ucom::ant::IObject > &pSampleObject, const char *strSampleCID)
Helper Function to get a Sample Instance through the adtf::ucom::ant::IRuntime.
Namespace for the ADTF System SDK.
a_util::result::Result transform(const DECODER &decoder, ENCODER &encoder)
Copies all elements from a decoder to a codec.
adtf::ucom::IRuntime * _runtime
Global Runtime Pointer to reference to the current runtime.
#define THROW_IF_FAILED(s)
throws if the expression returns a failed tResult
Header (UDP Socket Wrapper)
#pragma once
#include <asio.hpp>
#include <optional>
#include <array>
#include <adtf_base.h>
#include <adtf_utils.h>
#include <adtf_filtersdk.h>
#include <adtf_systemsdk.h>
#ifdef _WIN32
#undef GetObject
#endif
#if defined(__GLIBC__) && (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12))
#define OS_HAS_RECVMMSG 1
#endif
class IUdpSocket: public adtf::ucom::ant::IObject
{
public:
ADTF_IID(IUdpSocket, "udp_socket.demo.adtf");
public:
virtual tResult SetRemote(const std::string& strRemoteAddress, uint16_t nRemotePort) = 0;
virtual tResult Send(const void* pData, size_t nDataSize) = 0;
protected:
~IUdpSocket() = default;
};
class cUdpSocketWrapper: public adtf::ucom::object<IUdpSocket>
{
public:
cUdpSocketWrapper(
// Avoid polling schemes, maintain async behavior as much as you can.
std::function<void(const void*, size_t)> fnOnReceive = {}
);
~cUdpSocketWrapper() override;
tResult Open(const std::string& strInterface,
uint16_t nLocalPort,
const std::string& strMulticastGroup,
size_t nReceiveBufferSize,
size_t nSocketReceiveBufferSize);
tResult SetRemote(const std::string& strRemoteAddress, uint16_t nRemotePort) override;
tResult Send(const void* pData, size_t nDataSize) override;
// Prepare to restart a the io_context.
// Only allowed to call when not currently running.
tResult Start();
// Run the io_context for this socket on the current thread.
// Will not return until Stop() is called explicitly.
// Unsafe to call if PrepareForRun() is still in progress.
tResult Run();
// Interrupt the io_context running on a different thread.
// You'll still need to wait until Run() returns.
// You'll need to call PrepareForRun() again before restarting.
tResult Stop();
private:
#if defined(ASIO_HAS_IOCP) || defined(ASIO_HAS_IO_URING_AS_DEFAULT) || defined(OS_HAS_RECVMMSG)
// IOCP and uring backends are safe to use with enqueued read ops, proper scaling and order of execution is
// preserved.
// For the other backends, direct use of deep async queues is unsafe and doesn't scale from asio side, packets are
// received out-of-order. Linux has recvmmsg as a synchronous, but still high performance alternative.
static constexpr size_t m_nQueueDepth = 64;
#else
static constexpr size_t m_nQueueDepth = 1;
#endif
struct tPacketHandler
{
tPacketHandler(size_t nHandlerId, size_t nBufferSize): nHandlerId(nHandlerId), oBuffer(nBufferSize)
{
}
size_t nHandlerId;
std::vector<std::byte> oBuffer;
asio::mutable_registered_buffer oRegisteredBuffer;
asio::ip::udp::endpoint oEndpoint;
};
#if defined(OS_HAS_RECVMMSG)
void ScheduleReadMany() noexcept;
void ReadMany(const std::error_code& nError // Result of operation
) noexcept;
#endif
void ScheduleReadOne(const std::shared_ptr<tPacketHandler>& oBuffer) noexcept;
void OnReceiveOne(const std::error_code& nError, // Result of operation.
std::size_t nBytesTransferred, // Number of bytes received.
const std::shared_ptr<tPacketHandler>& oBuffer) noexcept;
asio::io_context m_oIoContext;
asio::ip::udp::socket m_oUdpSocket;
// Statically configured endpoint for sending.
std::optional<asio::ip::udp::endpoint> m_oUdpRemoteEndpoint;
std::optional<asio::ip::udp::endpoint> m_oLastReceivedUdpRemoteEndpoint;
std::unique_ptr<asio::buffer_registration<std::vector<asio::mutable_buffer>>> m_pAsioBufferRegistration;
// Callback passed externally in constructor.
const std::function<void(const void*, size_t)> m_fnOnReceive;
std::array<std::shared_ptr<tPacketHandler>, m_nQueueDepth> m_oHandlers;
bool m_bRunning = false;
std::mutex m_oRunningMutex;
std::condition_variable m_oRunningCv;
};
Copyright © Audi Electronics Venture GmbH.
Base class for every interface type within the uCOM.
Definition: object_intf.h:31
ADTF_IID(IObject, "object.ant.ucom.adtf.iid")
Marks the IObject to be castable with the ucom_cast()
Use this template if you want to implement an ucom::ant::IObject based Interface and/or subclass an e...
Definition: object.h:379
Implementation (UDP Socket Wrapper)
#include "udp_socket_wrapper.h"
#include <asio_helpers.h>
#include <iostream>
#include <cinttypes>
cUdpSocketWrapper::cUdpSocketWrapper(std::function<void(const void*, size_t)> fnOnReceive):
m_oUdpSocket(m_oIoContext), m_fnOnReceive(std::move(fnOnReceive))
{
// Start in stopped state.
m_oIoContext.stop();
}
cUdpSocketWrapper::~cUdpSocketWrapper() = default;
tResult cUdpSocketWrapper::Open(const std::string& strInterface,
uint16_t nLocalPort,
const std::string& strMulticastGroup,
size_t nReceiveBufferSize,
size_t nSocketReceiveBufferSize)
{
if (m_oUdpSocket.is_open())
{
RETURN_ERROR_DESC(ERR_INVALID_STATE, "Can't reconfigure socket - socket is already open.");
}
const auto oBindAddress = resolve_address(strInterface);
asio::ip::udp::socket::protocol_type protocol = oBindAddress.is_v4() ? asio::ip::udp::socket::protocol_type::v4() : asio::ip::udp::socket::protocol_type::v6();
RETURN_IF_THROWS(m_oUdpSocket.open(protocol));
RETURN_IF_THROWS(m_oUdpSocket.set_option(asio::ip::udp::socket::reuse_address(true)));
// 0 means ... do not touch and use systems default
if (nSocketReceiveBufferSize != 0)
{
try
{
asio::ip::udp::socket::receive_buffer_size oRcvBufferSizeOptionGet;
m_oUdpSocket.get_option(oRcvBufferSizeOptionGet);
m_oUdpSocket.set_option(asio::ip::udp::socket::receive_buffer_size(static_cast<int>(nSocketReceiveBufferSize)));
LOG_INFO("Sockets buffersize set from %d to %zu bytes.", oRcvBufferSizeOptionGet.value(),
nSocketReceiveBufferSize);
}
catch (const asio::error_code&)
{
return adtf::base::current_exception_to_result();
}
}
if (strMulticastGroup.empty())
{
RETURN_IF_THROWS(m_oUdpSocket.bind(asio::ip::udp::endpoint(oBindAddress, nLocalPort)));
}
else
{
const auto oMulticastAddress = resolve_address(strMulticastGroup);
if (!oMulticastAddress.is_multicast())
{
RETURN_ERROR_DESC(ERR_INVALID_ARG, "%s is not a multicast address.", strMulticastGroup.c_str());
}
if (!oBindAddress.is_unspecified() &&
oMulticastAddress.is_v4() != oBindAddress.is_v4())
{
RETURN_ERROR_DESC(ERR_INVALID_ARG, "Both the multicast address and the bind interface address need to be of the same ip version.");
}
#ifdef _WIN32
RETURN_IF_THROWS(m_oUdpSocket.bind(asio::ip::udp::endpoint(oBindAddress, nLocalPort)));
#else
RETURN_IF_THROWS(m_oUdpSocket.bind(asio::ip::udp::endpoint(oMulticastAddress, nLocalPort)));
#endif
if (oMulticastAddress.is_v4())
{
RETURN_IF_THROWS(m_oUdpSocket.set_option(asio::ip::multicast::join_group(oMulticastAddress.to_v4(), oBindAddress.to_v4())));
}
else
{
RETURN_IF_THROWS(m_oUdpSocket.set_option(asio::ip::multicast::join_group(oMulticastAddress.to_v6(), oBindAddress.to_v6().scope_id())));
}
}
// Set up receive operations if used with callback.
if (m_fnOnReceive)
{
std::vector<asio::mutable_buffer> oOriginalBuffers;
for (size_t i = 0; i < m_nQueueDepth; ++i)
{
m_oHandlers[i] = std::make_shared<tPacketHandler>(i, nReceiveBufferSize);
oOriginalBuffers.emplace_back(m_oHandlers[i]->oBuffer.data(), m_oHandlers[i]->oBuffer.size());
}
m_pAsioBufferRegistration = std::make_unique<asio::buffer_registration<std::vector<asio::mutable_buffer>>>(
asio::register_buffers(m_oIoContext, oOriginalBuffers));
for (size_t i = 0; i < m_nQueueDepth; ++i)
{
m_oHandlers[i]->oRegisteredBuffer = (*m_pAsioBufferRegistration)[i];
}
#if defined(OS_HAS_RECVMMSG) && !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
// The reactive backend is not preserving the order for queued async operations.
// So instead keep only a single async wait in flight, but poll in bulk instead.
ScheduleReadMany();
#else
for (auto& oBuffer : m_oHandlers)
{
ScheduleReadOne(oBuffer);
}
#endif
}
}
tResult cUdpSocketWrapper::SetRemote(const std::string& strRemoteAddress, uint16_t nRemotePort)
{
RETURN_IF_THROWS(m_oUdpRemoteEndpoint = asio::ip::udp::endpoint(resolve_address(strRemoteAddress), nRemotePort));
}
tResult cUdpSocketWrapper::Send(const void* pData, size_t nDataSize)
{
asio::ip::udp::endpoint oDestination;
if (m_oUdpRemoteEndpoint)
{
oDestination = *m_oUdpRemoteEndpoint;
}
else if (m_oLastReceivedUdpRemoteEndpoint)
{
oDestination = *m_oLastReceivedUdpRemoteEndpoint;
}
else
{
RETURN_ERROR_DESC(ERR_INVALID_STATE, "Unable to send data via UDP, remote endpoint is not yet known.");
}
size_t nDataSent = 0;
RETURN_IF_THROWS(nDataSent = m_oUdpSocket.send_to(asio::buffer(pData, nDataSize), oDestination));
if (static_cast<size_t>(nDataSent) != nDataSize)
{
RETURN_ERROR_DESC(ERR_INVALID_ARG, "Unable to send data via UDP, data size is too large: %" PRIu64, nDataSize);
}
}
tResult cUdpSocketWrapper::Start()
{
std::unique_lock oLock(m_oRunningMutex);
if (m_bRunning)
{
RETURN_ERROR(ERR_INVALID_STATE);
}
// Reset to runnable if previously stopped.
if (m_oIoContext.stopped())
{
RETURN_IF_THROWS(m_oIoContext.restart());
}
m_bRunning = true;
m_oRunningCv.notify_all();
}
tResult cUdpSocketWrapper::Run()
{
std::unique_lock oLock(m_oRunningMutex);
if (m_oRunningCv.wait_for(oLock, std::chrono::milliseconds(100), [this]() { return m_bRunning; }))
{
oLock.unlock();
m_oIoContext.run();
oLock.lock();
m_bRunning = false;
m_oRunningCv.notify_all();
}
else
{
RETURN_ERROR(ERR_NOT_READY);
}
}
tResult cUdpSocketWrapper::Stop()
{
{
std::unique_lock oLock(m_oRunningMutex);
m_oIoContext.stop();
m_oRunningCv.wait(oLock, [this]() -> bool { return !m_bRunning; });
}
}
#if defined(OS_HAS_RECVMMSG)
void cUdpSocketWrapper::ScheduleReadMany() noexcept
{
m_oUdpSocket.async_wait(asio::ip::udp::socket::wait_read, std::bind(&cUdpSocketWrapper::ReadMany, this, std::placeholders::_1));
}
void cUdpSocketWrapper::ReadMany(const std::error_code& nError) noexcept
{
if (!nError)
{
// Pull bulk messages in a non-blocking, tight loop.
do {
// What is received may be IPv4 or IPv6 address.
std::array<sockaddr_storage, m_nQueueDepth> oAddresses = {0};
std::array<iovec, m_nQueueDepth> oBuffers = {0};
std::array<mmsghdr, m_nQueueDepth> oMessages = {0};
for (size_t i = 0; i < m_nQueueDepth; ++i)
{
// Sender address.
oMessages[i].msg_hdr.msg_name = &oAddresses[i];
oMessages[i].msg_hdr.msg_namelen = sizeof(sockaddr_storage);
// Scatter fragments.
oBuffers[i].iov_base = m_oHandlers[i]->oBuffer.data();
oBuffers[i].iov_len = m_oHandlers[i]->oBuffer.size();
oMessages[i].msg_hdr.msg_iov = &oBuffers[i];
oMessages[i].msg_hdr.msg_iovlen = 1;
// Received number of bytes.
oMessages[i].msg_len = 0;
}
// recvmmsg is missing in ASIO, but absolutely necessary to avoid excessive syscall overhead...
const auto nReceived =
::recvmmsg(m_oUdpSocket.native_handle(), oMessages.data(), oMessages.size(), MSG_DONTWAIT, nullptr);
if (nReceived > 0)
{
for (int i = 0; i < nReceived; ++i)
{
const auto& oMessage = oMessages[i];
if (m_fnOnReceive && oMessage.msg_hdr.msg_iovlen > 0)
{
m_fnOnReceive(oMessage.msg_hdr.msg_iov[0].iov_base, oMessage.msg_len);
}
}
const auto& oLastMessage = oMessages[nReceived - 1];
const auto pAddress = static_cast<const sockaddr_storage*>(oLastMessage.msg_hdr.msg_name);
if (pAddress)
{
if (pAddress->ss_family == AF_INET)
{
const auto pIPV4 = reinterpret_cast<const sockaddr_in*>(pAddress);
m_oLastReceivedUdpRemoteEndpoint = {asio::ip::address_v4(pIPV4->sin_addr.s_addr),
pIPV4->sin_port};
}
else if (pAddress->ss_family == AF_INET6)
{
const auto pIPV6 = reinterpret_cast<const sockaddr_in6*>(pAddress);
m_oLastReceivedUdpRemoteEndpoint = {
asio::ip::address_v6(
reinterpret_cast<const asio::ip::address_v6::bytes_type&>(pIPV6->sin6_addr.s6_addr),
pIPV6->sin6_scope_id),
pIPV6->sin6_port};
}
}
}
else
{
// Return control back to ASIO.
break;
}
} while(true);
}
if (nError != asio::error::operation_aborted)
{
ScheduleReadMany();
}
}
#endif
void cUdpSocketWrapper::ScheduleReadOne(const std::shared_ptr<tPacketHandler>& oBuffer) noexcept
{
m_oUdpSocket.async_receive_from(
oBuffer->oRegisteredBuffer, oBuffer->oEndpoint,
std::bind(&cUdpSocketWrapper::OnReceiveOne, this, std::placeholders::_1, std::placeholders::_2, oBuffer)
);
}
void cUdpSocketWrapper::OnReceiveOne(
const std::error_code& nError,
std::size_t nBytesTransferred,
const std::shared_ptr<tPacketHandler>& oBuffer) noexcept
{
if (!nError)
{
m_oLastReceivedUdpRemoteEndpoint = oBuffer->oEndpoint;
if (m_fnOnReceive)
{
m_fnOnReceive(oBuffer->oBuffer.data(), nBytesTransferred);
}
}
// Keep recycling our buffers.
if (nError != asio::error::operation_aborted)
{
ScheduleReadOne(oBuffer);
}
}
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
#define RETURN_IF_THROWS(s)
if the expression throws an exception, returns a tResult containing the exception information.
Header (TCP Sender)
#pragma once
#include <sink_to_non_adtf.h>
#include "tcp_socket_wrapper.h"
class cTcpSinkToNonADTFSystem : public cSinkToNonADTFSystem
{
public:
ADTF_CLASS_ID_NAME(cTcpSinkToNonADTFSystem,
"demo_foreign_application_tcp_sender.streaming_sink.adtf.cid",
"TCP Sender To Non ADTF Application");
cTcpSinkToNonADTFSystem();
tResult StartStreaming() override;
protected:
void Send(const void* pData, size_t nDataSize) override;
private:
ITCPSocket& GetSocket();
adtf::base::property_variable<bool> m_bEnableAutomaticReconnection = false;
};
Implementation (TCP Sender)
#include "tcp_sink_to_non_adtf.h"
using namespace adtf::util;
using namespace adtf::ucom;
using namespace adtf::base;
using namespace adtf::streaming;
cTcpSinkToNonADTFSystem::cTcpSinkToNonADTFSystem()
{
m_strRemoteHost.SetDescription("This is only taken into account when the sink is not connected to a source via the socket interface binding.\n"
"If set to 'localhost' the IPv4 localhost address '127.0.0.1' is used.\n"
"IPv4 Example: '127.0.0.1'\n"
"IPv6 Example: '::1'\n");
RegisterPropertyVariable("remote_host", m_strRemoteHost);
m_nRemotePort.SetDescription("This is only taken into account when the sink is not connected to a source via the socket interface binding.");
RegisterPropertyVariable("remote_port", m_nRemotePort);
m_bNoTCPDelay.SetDescription("If activated, the TCP_NODELAY option will be set for the socket.");
RegisterPropertyVariable("no_tcp_delay", m_bNoTCPDelay);
m_bEnableAutomaticReconnection.SetDescription("If enabled, connection loss is not treated as an error, but the source tries to reconnect."
" This is only taken into account when the sink is not connected to a source via the socket interface binding.");
RegisterPropertyVariable("enable_automatic_reconnection", m_bEnableAutomaticReconnection);
m_oSocketClient = CreateInterfaceClient<ITCPSocket>("socket");
SetDescription("socket", "Interface client to use a shared socket for communication");
// sets a short description for the component
SetDescription("Use this Streaming Sink to transmit sample data to a Non-ADTF Application using the TCP protocol");
// set help link to jump to documentation from ADTF Configuration Editor
SetHelpLink("$(ADTF_DIR)/doc/adtf_html/page_demo_non_adtf_application_sender_receiver.html");
}
tResult cTcpSinkToNonADTFSystem::StartStreaming()
{
RETURN_IF_FAILED(cSampleStreamingSink::StartStreaming());
if (!m_oSocketClient.IsValid())
{
// we are not connected to a source, so just create our own socket instance
auto pSocket = make_object_ptr<cTCPSocketWrapper>();
if (m_bNoTCPDelay)
{
pSocket->EnableTcpNoDelay();
}
RETURN_IF_FAILED_DESC(pSocket->Connect(*m_strRemoteHost, m_nRemotePort, m_bEnableAutomaticReconnection),
"Unable to connect to %s:%d", m_strRemoteHost->GetPtr(), *m_nRemotePort);
m_pFallbackSocket = pSocket;
}
else
{
if (m_bNoTCPDelay)
{
GetSocket().EnableTcpNoDelay();
}
}
}
void cTcpSinkToNonADTFSystem::Send(const void* pData, size_t nDataSize)
{
const auto oResult = GetSocket().Send(pData, nDataSize);
if (!m_bEnableAutomaticReconnection || oResult != ERR_RETRY)
{
THROW_IF_FAILED(oResult);
}
}
ITCPSocket& cTcpSinkToNonADTFSystem::GetSocket()
{
return m_pFallbackSocket ? *m_pFallbackSocket : m_oSocketClient.Get();
}
#define RETURN_IF_FAILED_DESC(s,...)
returns if the expression returns a failed tResult and ammends the error message.
Header (TCP Receiver)
#pragma once
#include "tcp_socket_wrapper.h"
class cTcpSourceFromNonADTFSystem : public adtf::filter::cSampleStreamingSource
{
public:
ADTF_CLASS_ID_NAME(cTcpSourceFromNonADTFSystem,
"demo_foreign_application_tcp_receiver.streaming_source.adtf.cid",
"TCP Receiver From Non ADTF Application");
cTcpSourceFromNonADTFSystem();
tResult Construct() override;
tResult Init() override;
tResult StartStreaming() override;
tResult StopStreaming() override;
private:
void ReadThread();
size_t ReadFromSocket(void* pBuffer, size_t nBufferSize, bool bFillBuffer);
private:
adtf::base::property_variable<bool> m_bDeserializeViaMediaDescription = false;
adtf::base::property_variable<bool> m_bMarkSamplesAsDeserialized = true;
adtf::base::property_variable<bool> m_bEnableAutomaticReconnection = false;
adtf::streaming::ISampleWriter* m_pOutputWriter = nullptr;
std::vector<uint8_t> m_oReadBuffer;
ddl::codec::CodecFactory m_oCodecFactory;
bool m_bReadDirectlyIntoSample = true;
};
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Implementation (TCP Receiver)
#include "tcp_source_from_non_adtf.h"
#include <asio_helpers.h>
constexpr const tTimeStamp g_tmSocketTimeOut = 50000;
constexpr const size_t g_nDefaultReadBufferSize = 0xFFFF;
using namespace adtf::util;
using namespace adtf::ucom;
using namespace adtf::base;
using namespace adtf::streaming;
using namespace adtf::system;
cTcpSourceFromNonADTFSystem::cTcpSourceFromNonADTFSystem()
{
m_strRemoteHost.SetDescription("The hostname or ip address that the source should connect to.");
RegisterPropertyVariable("remote_host", m_strRemoteHost);
m_nRemotePort.SetDescription("The port on the remote host that the source should connect to.");
RegisterPropertyVariable("remote_port", m_nRemotePort);
m_nFixedPacketSize.SetDescription("If non-zero, the source will wait to read this amount of bytes before putting them into a Sample.");
RegisterPropertyVariable("fixed_packet_size", m_nFixedPacketSize);
m_strDDLStructName.SetDescription("If set, the stream type will contain the media description of the given struct, retrieved from the Media Description Service.");
RegisterPropertyVariable("ddl_struct_name", m_strDDLStructName);
m_bDeserializeViaMediaDescription.SetDescription("If activated, incoming data will be deserialized as defined "
"by the media description of the struct selected via ddl_struct_name before writing it into output samples.");
RegisterPropertyVariable("deserialize_via_media_description", m_bDeserializeViaMediaDescription);
m_bMarkSamplesAsDeserialized.SetDescription("In case that 'deserialize_via_media_description' is disabled, this will force the output stream type to have the deserialized flag set.");
RegisterPropertyVariable("mark_samples_as_deserialized", m_bMarkSamplesAsDeserialized);
m_bEnableAutomaticReconnection.SetDescription("If enabled, connection loss is not treated as an error, but the source tries to reconnect.");
RegisterPropertyVariable("enable_automatic_reconnection", m_bEnableAutomaticReconnection);
m_pOutputWriter = CreateOutputPin("output");
SetDescription("output", "Provides the sample data received from a Non-ADTF instance over TCP");
m_pSocket = make_object_ptr<cTCPSocketWrapper>();
CreateInterfaceServer<ITCPSocket>("socket", ucom_object_ptr_cast<ITCPSocket>(m_pSocket));
SetDescription("socket", "Interface server to provide a shared socket for communication");
// For session compatibility reasons we use this fallback helper for the case where no active runner is connected to the runner.
// In your own implementations use a simple CreateRunner(...) instead!
m_oReadThread = adtf::filter::cRunnerFallback(this, "receive_data", cThreadTriggerHint(), [this](){ ReadThread(); });
SetDescription("receive_data",
"Connect a Thread Runner that will provide the context for receiving all data. "
"If this is not connected, the source will create a thread on its own.");
// sets a short description for the component
SetDescription("Use this Streaming Source to receive sample data from a Non-ADTF Application using the TCP protocol");
// set help link to jump to documentation from ADTF Configuration Editor
SetHelpLink("$(ADTF_DIR)/doc/adtf_html/page_demo_non_adtf_application_sender_receiver.html");
}
tResult cTcpSourceFromNonADTFSystem::Construct()
{
RETURN_IF_FAILED(cSampleStreamingSource::Construct());
if (m_strDDLStructName->IsNotEmpty())
{
object_ptr<IStreamType> pType = make_object_ptr<adtf::mediadescription::stream_type_default<>>(
m_strDDLStructName->GetPtr(), m_bDeserializeViaMediaDescription || m_bMarkSamplesAsDeserialized ?
m_pOutputWriter->ChangeType(pType);
if (m_bDeserializeViaMediaDescription)
{
const auto oDescription = adtf::mediadescription::get_media_description_from_stream_type(*pType.Get());
m_oCodecFactory = ddl::codec::CodecFactory(std::get<0>(oDescription).c_str(),
std::get<1>(oDescription).c_str());
}
if (*m_nFixedPacketSize == 0)
{
if (m_bDeserializeViaMediaDescription)
{
m_nFixedPacketSize = m_oCodecFactory.getStaticBufferSize(ddl::tDataRepresentation::Serialized);
}
else
{
m_nFixedPacketSize = m_oCodecFactory.getStaticBufferSize(ddl::tDataRepresentation::Deserialized);
}
}
}
else
{
if (m_bDeserializeViaMediaDescription)
{
RETURN_ERROR_DESC(ERR_INVALID_ARG, "Deserialization requires a ddl struct name to be set.");
}
}
}
tResult cTcpSourceFromNonADTFSystem::Init()
{
RETURN_IF_FAILED(cSampleStreamingSource::Init());
m_bReadDirectlyIntoSample = m_nFixedPacketSize > 0 && !m_bDeserializeViaMediaDescription;
if (!m_bReadDirectlyIntoSample)
{
if (m_nFixedPacketSize > 0)
{
m_oReadBuffer.resize(m_nFixedPacketSize);
}
else
{
m_oReadBuffer.resize(g_nDefaultReadBufferSize);
}
}
}
tResult cTcpSourceFromNonADTFSystem::StartStreaming()
{
RETURN_IF_FAILED(cSampleStreamingSource::StartStreaming());
RETURN_IF_FAILED_DESC(m_pSocket->Connect(*m_strRemoteHost, m_nRemotePort, m_bEnableAutomaticReconnection),
"Unable to connect to %s:%d", m_strRemoteHost->GetPtr(), *m_nRemotePort);
RETURN_IF_FAILED(m_oReadThread.Activate());
}
tResult cTcpSourceFromNonADTFSystem::StopStreaming()
{
m_oReadThread.Deactivate();
m_pSocket->Close();
return cSampleStreamingSource::StopStreaming();
}
void cTcpSourceFromNonADTFSystem::ReadThread()
{
try
{
object_ptr<ISample> pSample;
if (m_bReadDirectlyIntoSample)
{
pSample = ReadIntoSample();
}
else
{
if (m_bDeserializeViaMediaDescription)
{
pSample = ReadAndDeserialize();
}
else
{
pSample = ReadIntoBuffer();
}
}
if (pSample)
{
m_pOutputWriter->Write(pSample);
m_pOutputWriter->ManualTrigger(get_sample_time(pSample));
}
}
catch (...)
{
m_pOutputWriter->SetStreamError(current_exception_to_result());
}
}
adtf::ucom::object_ptr<adtf::streaming::ISample> cTcpSourceFromNonADTFSystem::ReadIntoSample()
{
object_ptr<adtf::streaming::flash::ISample> pSample;
object_ptr_locked<ISampleBuffer> pBuffer;
THROW_IF_FAILED(pSample->WriteLock(pBuffer, m_nFixedPacketSize));
const auto nBytesRead = ReadFromSocket(pBuffer->GetPtr(), pBuffer->GetSize(), true);
if (nBytesRead == 0)
{
return nullptr;
}
pSample->SetTime(m_pClock->GetStreamTimeNs());
return pSample;
}
adtf::ucom::object_ptr<adtf::streaming::ant::ISample> cTcpSourceFromNonADTFSystem::ReadIntoBuffer()
{
const auto nBytesRead = ReadFromSocket(m_oReadBuffer.data(), m_oReadBuffer.size(), false);
if (nBytesRead == 0)
{
return nullptr;
}
object_ptr<adtf::streaming::flash::ISample> pSample;
THROW_IF_FAILED(alloc_sample(pSample, m_pClock->GetStreamTimeNs()));
object_ptr_locked<ISampleBuffer> pBuffer;
THROW_IF_FAILED(pSample->WriteLock(pBuffer, nBytesRead));
THROW_IF_FAILED(pBuffer->Write(adtf_memory_buffer<const void>(m_oReadBuffer.data(), nBytesRead)));
return pSample;
}
adtf::ucom::object_ptr<adtf::streaming::ant::ISample> cTcpSourceFromNonADTFSystem::ReadAndDeserialize()
{
const auto nBytesRead = ReadFromSocket(m_oReadBuffer.data(), m_oReadBuffer.size(), true);
if (nBytesRead == 0)
{
return nullptr;
}
object_ptr<adtf::streaming::flash::ISample> pSample;
THROW_IF_FAILED(alloc_sample(pSample, m_pClock->GetStreamTimeNs()));
const auto oDecoder = m_oCodecFactory.makeDecoderFor(m_oReadBuffer.data(), nBytesRead, ddl::tDataRepresentation::Serialized);
THROW_IF_FAILED(TO_ADTF_RESULT(oDecoder.isValid()));
object_ptr_locked<ISampleBuffer> pBuffer;
THROW_IF_FAILED(pSample->WriteLock(pBuffer, oDecoder.getBufferSize(ddl::tDataRepresentation::Deserialized)));
auto oCodec = oDecoder.makeCodecFor(pBuffer->GetPtr(), pBuffer->GetSize(), ddl::tDataRepresentation::Deserialized);
THROW_IF_FAILED(TO_ADTF_RESULT(ddl::codec::transform(oDecoder, oCodec)));
return pSample;
}
size_t cTcpSourceFromNonADTFSystem::ReadFromSocket(void* pBuffer, size_t nBufferSize, bool bFillBuffer)
{
auto pBufferWritePosition = static_cast<uint8_t*>(pBuffer);
size_t nOverallBytesRead = 0;
do
{
size_t nLastBytesRead = 0;
auto oResult = m_pSocket->Read(pBufferWritePosition, static_cast<int>(nBufferSize), &nLastBytesRead);
if (IS_FAILED(oResult))
{
if (oResult == ERR_TIMEOUT)
{
if (nOverallBytesRead == 0)
{
return 0;
}
continue;
}
if (oResult == ERR_RETRY)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 0;
}
throw oResult;
}
nOverallBytesRead += nLastBytesRead;
pBufferWritePosition += nLastBytesRead;
nBufferSize -= nLastBytesRead;
}
while (bFillBuffer && nBufferSize > 0);
return nOverallBytesRead;
}
base::flash::tNanoSeconds get_sample_time(const ucom::ant::iobject_ptr< const ant::ISample > &pSample)
Returns the sample time stamp with nanosecond precision.
Header (TCP Socket Wrapper)
#pragma once
#include <asio.hpp>
#include <mutex>
#ifdef WIN32
#undef GetObject
#endif
class ITCPSocket: public adtf::ucom::ant::IObject
{
public:
ADTF_IID(ITCPSocket, "tcp_socket.demo.adtf");
public:
virtual void EnableTcpNoDelay() = 0;
virtual tResult Send(const void* pData, size_t nDataSize) = 0;
protected:
~ITCPSocket() = default;
};
class cTCPSocketWrapper: public adtf::ucom::object<ITCPSocket>
{
public:
cTCPSocketWrapper();
void EnableTcpNoDelay() override;
tResult Connect(const char* strRemoteHost, uint16_t nRemotePort, bool bReconnect);
tResult Send(const void* pData, size_t nDataSize) override;
tResult Read(void* pDestination, size_t nBytes, size_t* pBytesRead);
void Close();
private:
tResult Connect();
std::string m_strRemoteHost;
uint16_t m_nRemotePort = 0;
bool m_bReconnect = false;
std::mutex m_oConnectMutex;
asio::io_context m_oIoContext;
asio::ip::tcp::socket m_oTcpSocket;
bool m_bTcpNoDelay = false;
};
Copyright © Audi Electronics Venture GmbH.
Implementation (TCP Socket Wrapper)
#include "tcp_socket_wrapper.h"
#include <asio_helpers.h>
constexpr std::chrono::milliseconds SOCKET_READ_TIMEOUT{50};
cTCPSocketWrapper::cTCPSocketWrapper():
m_oTcpSocket(m_oIoContext)
{
}
tResult cTCPSocketWrapper::Connect(const char* strRemoteHost, uint16_t nRemotePort, bool bReconnect)
{
m_strRemoteHost = strRemoteHost;
m_nRemotePort = nRemotePort;
m_bReconnect = bReconnect;
return Connect();
}
tResult cTCPSocketWrapper::Connect()
{
std::lock_guard<std::mutex> oGuard(m_oConnectMutex);
m_oTcpSocket.close();
try
{
m_oTcpSocket.connect(asio::ip::tcp::endpoint(resolve_address(m_strRemoteHost), m_nRemotePort));
}
catch (...)
{
if (m_bReconnect)
{
}
}
if (m_bTcpNoDelay)
{
RETURN_IF_THROWS(m_oTcpSocket.set_option(asio::ip::tcp::no_delay(true)));
}
}
void cTCPSocketWrapper::EnableTcpNoDelay()
{
m_bTcpNoDelay = true;
}
bool is_connection_error(const asio::system_error& oError)
{
return oError.code() == asio::error::not_connected ||
oError.code() == asio::error::connection_aborted ||
oError.code() == asio::error::connection_refused ||
oError.code() == asio::error::connection_reset ||
oError.code() == asio::error::eof ||
oError.code() == asio::error::broken_pipe;
}
tResult cTCPSocketWrapper::Send(const void* pData, size_t nDataSize)
{
auto pPosition = static_cast<const uint8_t*>(pData);
size_t nDataLeft = nDataSize;
while (nDataLeft)
{
size_t nDataSent = 0;
try
{
nDataSent = m_oTcpSocket.send(asio::buffer(pPosition, nDataSize));
}
catch (const asio::system_error& oError)
{
if (is_connection_error(oError) &&
m_bReconnect)
{
if (IS_OK(Connect()))
{
pPosition = static_cast<const uint8_t*>(pData);
nDataLeft = nDataSize;
continue;
}
RETURN_ERROR(ERR_RETRY);
}
}
pPosition += nDataSent;
nDataLeft -= static_cast<size_t>(nDataSent);
}
}
tResult cTCPSocketWrapper::Read(void* pDestination, size_t nBytes, size_t* pBytesRead)
{
if (!wait_for_data<>(m_oTcpSocket, SOCKET_READ_TIMEOUT))
{
RETURN_ERROR(ERR_TIMEOUT);
}
try
{
*pBytesRead = m_oTcpSocket.receive(asio::buffer(pDestination, nBytes));
}
catch (const asio::system_error& oError)
{
if (is_connection_error(oError) &&
m_bReconnect)
{
Connect();
RETURN_ERROR(ERR_RETRY);
}
}
}
void cTCPSocketWrapper::Close()
{
m_oTcpSocket.close();
}
#define RETURN_CURRENT_EXCEPTION()
returns the current exception as a tResult, use it in a catch block.
Demo executable Non-ADTF Application
#include <a_utils.h>
#include <iostream>
#include <csignal>
#include <vector>
#include <asio.hpp>
namespace udp
{
struct async_context
{
async_context(const std::shared_ptr<asio::ip::udp::socket>& socket): socket(socket)
{
}
std::shared_ptr<asio::ip::udp::socket> socket;
std::array<uint8_t, 9000> buffer;
asio::ip::udp::endpoint endpoint;
};
void schedule_read(const std::error_code& nError, const std::shared_ptr<async_context>& oContext);
void schedule_echo(const std::error_code& nError, size_t nBytesRead, const std::shared_ptr<async_context>& context)
{
if (!nError)
{
context->socket->async_send_to(asio::buffer(context->buffer.data(), nBytesRead), context->endpoint,
std::bind(&schedule_read, std::placeholders::_1, context));
}
else
{
schedule_read(nError, context);
}
}
void schedule_read(const std::error_code& nError, const std::shared_ptr<async_context>& oContext)
{
if (nError)
{
std::cerr << nError.message();
}
if (nError != asio::error::operation_aborted)
{
oContext->socket->async_receive_from(asio::buffer(oContext->buffer.data(), oContext->buffer.size()),
oContext->endpoint,
std::bind(&schedule_echo, std::placeholders::_1, std::placeholders::_2, oContext));
}
}
void server(asio::io_context& oIoContext, uint16_t nPort, bool bIpv6, const std::string& strMulticastGroup)
{
std::cout << "Creating UDP server " << (bIpv6 ? "v6" : "v4") << " at port " << nPort << " mc: " << strMulticastGroup
<< std::endl;
std::shared_ptr<asio::ip::udp::socket> oUdpSocket = std::make_shared<asio::ip::udp::socket>(
oIoContext, asio::ip::udp::endpoint(bIpv6 ? asio::ip::udp::v6() : asio::ip::udp::v4(), nPort));
if (!strMulticastGroup.empty())
{
oUdpSocket->set_option(asio::ip::multicast::join_group(asio::ip::make_address(strMulticastGroup)));
}
// Just make sure this echo app isn't going to responsible for loosing packets...
for (size_t nBufferCount = 0; nBufferCount < 64; ++nBufferCount)
{
auto oContext = std::make_shared<udp::async_context>(oUdpSocket);
schedule_read(std::error_code(), oContext);
}
}
}
namespace tcp
{
struct async_context
{
async_context(const std::shared_ptr<asio::ip::tcp::socket>& socket) : socket(socket)
{
}
std::shared_ptr<asio::ip::tcp::socket> socket;
std::array<uint8_t, 9000> buffer;
};
void schedule_read(const std::error_code& nError, const std::shared_ptr<async_context>& oContext);
void on_accept(const std::error_code& nError,
const std::shared_ptr<async_context>& context,
const std::shared_ptr<asio::ip::tcp::acceptor>& acceptor)
{
if (context)
{
schedule_read(nError, context);
}
if (nError != asio::error::operation_aborted)
{
auto nextSocket = std::make_shared<asio::ip::tcp::socket>(acceptor->get_executor());
auto nextContext = std::make_shared<async_context>(nextSocket);
acceptor->async_accept(*nextSocket,
std::bind(&on_accept, std::placeholders::_1, nextContext, acceptor));
}
}
void schedule_echo(const std::error_code& nError, size_t nBytesWritten, asio::const_buffer oCurrentTxBuffer, const std::shared_ptr<async_context>& oContext)
{
if (!nError)
{
oCurrentTxBuffer += nBytesWritten;
if (oCurrentTxBuffer.size() > 0)
{
oContext->socket->async_write_some(oCurrentTxBuffer,
std::bind(&schedule_echo, std::placeholders::_1, std::placeholders::_2, oCurrentTxBuffer, oContext));
}
else
{
schedule_read(nError, oContext);
}
}
else
{
schedule_read(nError, oContext);
}
}
void on_read(const std::error_code& nError, size_t nBytesRead, const std::shared_ptr<async_context>& oContext)
{
if (nError != asio::error::operation_aborted)
{
schedule_echo(nError, 0, asio::buffer(oContext->buffer.data(), nBytesRead), oContext);
}
}
void schedule_read(const std::error_code& nError, const std::shared_ptr<async_context>& oContext)
{
if (nError)
{
std::cerr << nError.message();
}
if (nError != asio::error::operation_aborted)
{
oContext->socket->async_read_some(asio::buffer(oContext->buffer.data(), oContext->buffer.size()),
std::bind(&on_read, std::placeholders::_1, std::placeholders::_2, oContext));
}
}
void server(asio::io_context& oIoContext, uint16_t nPort, bool bIpv6)
{
std::cout << "Creating TCP server at " << (bIpv6 ? "v6" : "v4") << " port " << nPort << std::endl;
{
auto oTcpAcceptor = std::make_shared<asio::ip::tcp::acceptor>(
oIoContext, bIpv6 ? asio::ip::tcp::endpoint(asio::ip::tcp::v6(), nPort) :
asio::ip::tcp::endpoint(asio::ip::tcp::v4(), nPort));
oTcpAcceptor->set_option(asio::ip::tcp::acceptor::reuse_address(true));
on_accept(std::error_code(), nullptr, oTcpAcceptor);
}
}
} // namespace tcp
namespace
{
asio::io_context oIoContext;
void signal_handler(int nSignalId)
{
oIoContext.stop();
std::signal(nSignalId, SIG_DFL);
}
} // namespace
void run_threaded(asio::io_context& context, size_t count)
{
std::list<std::thread> threads;
for (size_t i = 0; i < count; ++i)
{
threads.emplace_back([&]() { context.run(); });
}
while (!threads.empty())
{
threads.front().join();
threads.pop_front();
}
}
int main(int nArgn, const char** pArgv)
{
try
{
adtf::util::cCommandLine arguments(nArgn, pArgv);
std::signal(SIGINT, &signal_handler);
tUInt16 nPort = static_cast<uint16_t>(arguments.GetProperty("p", "54321").AsInt32());
tBool bTcp = arguments.GetFlag("tcp");
tBool bIpv6 = arguments.GetFlag("ipv6");
std::string strMulticastGroup = arguments.GetProperty("m").GetPtr();
if (bTcp)
{
tcp::server(oIoContext, nPort, bIpv6);
}
else
{
udp::server(oIoContext, nPort, bIpv6, strMulticastGroup);
}
// Stick to one thread for now - otherwise we'd loose order of events for UDP.
// Even though this app can be made to scale up arbitrarily.
run_threaded(oIoContext, 1);
return 0;
}
catch (const asio::system_error& oError)
{
if (oError.code() == asio::error::bad_descriptor)
{
std::cout << "server closed\n";
return 0;
}
std::cerr << "exception: " << oError.what() << "\n";
return 1;
}
}
Copyright © Audi Electronics Venture GmbH.
uint16_t tUInt16
type definition for unsigned integer values (16bit) (platform and compiler independent type).
bool tBool
The tBool defines the type for the Values tTrue and tFalse (platform and compiler dependent).