#pragma once
#include <memory>
#include <fstream>
#include <ddl/codec/codec_factory.h>
namespace {
class EnumDecoder
{
public:
using index_t = uint64_t;
using value_t = std::string;
EnumDecoder(const ::ddl::codec::CodecTypeInfo* type_info) : _invalid(), _table(generate(type_info))
{
}
EnumDecoder(const EnumDecoder&) = delete;
EnumDecoder(EnumDecoder&&) noexcept = delete;
template<class N>
const value_t& lookup(const N& value) const noexcept
{
if (auto it = _table.find(static_cast<index_t>(value)); it != _table.end())
{
return it->second;
}
return _invalid;
}
private:
using lookup_t = std::unordered_map<index_t, value_t>;
static lookup_t generate(const ::ddl::codec::CodecTypeInfo* type_info)
{
lookup_t tmp;
for (const auto& name : type_info->getEnumElementNames())
{
auto value = type_info->getEnumElementValue(name);
tmp.try_emplace(value.asUInt64(), name);
}
return tmp;
}
const value_t _invalid;
const lookup_t _table;
};
}
{
public:
CsvExporter();
~CsvExporter();
{
return "csv";
}
private:
void process(
const std::shared_ptr<const adtf_file::WriteSample>& sample);
void rebuildIndex(const ddl::codec::CodecFactory& codec_factory, bool data_is_serialized);
std::ofstream _csv_file;
std::string _separator;
uint64_t _value_precision = 4;
uint64_t _timestamp_precision = 0;
int64_t _timestamp_divisor = 1;
std::vector<std::string> _columns;
struct ElementContext {
ddl::codec::LeafCodecIndex value_decoder;
std::shared_ptr<EnumDecoder> enum_decoder;
};
std::vector<ElementContext> _indices;
};
class to create or read a file item. This file item is either a sample, streamtype or trigger.
Definition: reader.h:156
class to create and describe a stream within a adtf_file::Reader. Each stream has an identifier strea...
Definition: reader.h:134
virtual bool isCompatible(const adtf_file::Stream &stream) const =0
virtual void process(const adtf_file::FileItem &item)=0
virtual std::string getProcessorIdentifier() const =0
Definition: processor.h:65
void open(const std::vector< adtf_file::Stream > &streams, const std::string &destination_url) override
Definition: processor.h:76
#include "csv_exporter.h"
#include <iomanip>
#include <string>
#include <string_view>
#include <variant>
});
static std::vector<std::string> explode(std::string_view text, std::string_view separator)
{
std::vector<std::string> result;
while (!text.empty())
{
auto pos = text.find(separator);
if (pos != text.npos)
{
auto prefix = text.substr(0, pos);
if (!prefix.empty())
{
result.emplace_back(prefix);
}
text = text.substr(pos + separator.size());
}
else
{
result.emplace_back(text);
text = {};
}
}
return result;
}
CsvExporter::CsvExporter()
{
setConfiguration({
{"decimal_places", {"4", "uint32"}},
{"separator", {";", "string"}},
{"timestamp_base_ns", {"1", "uint64"}},
{"timestamp_decimals", {"0", "uint32"}},
{"columns", {"", "string"}},
});
}
CsvExporter::~CsvExporter()
{
}
{
try
{
return true;
}
catch (...)
{
return false;
}
}
void CsvExporter::open(
const adtf_file::Stream& stream,
const std::string& destination_url)
{
_csv_file.exceptions(std::ofstream::failbit | std::ofstream::badbit);
try
{
_csv_file.open(destination_url);
}
catch (...)
{
std::throw_with_nested(std::runtime_error("unable to open output csv file '" +
destination_url + "' for stream " + stream.name));
}
const auto config = getConfiguration();
_separator = adtf_file::getPropertyValue<std::string>(config, "separator");
_value_precision = adtf_file::getPropertyValue<uint64_t>(config, "decimal_places");
_timestamp_precision = adtf_file::getPropertyValue<uint64_t>(config, "timestamp_decimals");
_timestamp_divisor = adtf_file::getPropertyValue<int64_t>(config, "timestamp_base_ns");
auto fields = adtf_file::getPropertyValue<std::string>(config, "columns");
_columns = explode(fields, _separator);
ddl::codec::CodecFactory codec_factory;
bool data_is_serialized = false;
if (_columns.empty()) {
codec_factory.getElements(),
[&](const decltype(codec_factory)::Element& element)
{
_columns.push_back(element.getFullName());
}
);
}
rebuildIndex(codec_factory, data_is_serialized);
_csv_file << "timestamp";
for (const auto& column : _columns)
{
_csv_file << config.at("separator").value << column;
}
_csv_file << "\n";
}
{
if (
auto sample = std::dynamic_pointer_cast<const adtf_file::WriteSample>(item.
stream_item))
{
process(sample);
}
else if (
auto stream_type = std::dynamic_pointer_cast<const adtf_file::StreamType>(item.
stream_item))
{
ddl::codec::CodecFactory codec_factory;
bool data_is_serialized = false;
rebuildIndex(codec_factory, data_is_serialized);
}
}
void CsvExporter::process(const std::shared_ptr<const adtf_file::WriteSample>& sample)
{
using namespace a_util;
auto buffer = sample->beginBufferRead();
if (_timestamp_precision)
{
_csv_file << std::fixed << std::setprecision(_timestamp_precision) << (sample->getTimeStamp().count() / (double)_timestamp_divisor);
}
else
{
_csv_file << (sample->getTimeStamp().count() / _timestamp_divisor);
}
_csv_file << std::fixed << std::setprecision(_value_precision);
for (const auto& index : _indices)
{
_csv_file << _separator;
if (index.enum_decoder) {
auto value = ddl::codec::LeafValueGetter<EnumDecoder::index_t>::getValue(buffer.first, buffer.second, index.value_decoder.getLayout());
const auto& text = index.enum_decoder->lookup(value);
if (!text.empty()) {
_csv_file << text;
continue;
}
}
using variant_t = std::variant<bool, int8_t, uint8_t, int16_t, uint16_t, int32_t, uint32_t, int64_t, uint64_t, float, double>;
const auto value = ddl::codec::LeafValueGetter<variant_t>::getValue(buffer.first, buffer.second, index.value_decoder.getLayout());
std::visit(
[this](auto&& value) {
if constexpr (std::is_integral_v<std::remove_reference_t<decltype(value)>>)
{
if constexpr (std::is_signed_v<std::remove_reference_t<decltype(value)>>)
{
_csv_file << static_cast<int64_t>(value);
}
else
{
_csv_file << static_cast<uint64_t>(value);
}
}
else
{
_csv_file << static_cast<double>(value);
}
},
value
);
}
_csv_file << "\n";
}
void CsvExporter::rebuildIndex(const ddl::codec::CodecFactory& codec_factory, bool data_is_serialized)
{
_indices.clear();
std::unordered_map<const ddl::codec::CodecTypeInfo*, std::shared_ptr<EnumDecoder>> enum_decoders;
for (const auto& column : _columns)
{
ddl::codec::CodecIndex index;
try
{
auto element = codec_factory.getElement(column);
if (element.hasChildren())
{
throw std::runtime_error("Expected scalar, got complex type instead");
}
index = element.getIndex();
}
catch (...)
{
std::throw_with_nested(std::runtime_error("column name '" +
column + "' is not a valid member for the current stream type"));
}
std::shared_ptr<EnumDecoder> enum_decoder;
auto type_info = index.getLayout().type_info;
if (type_info->isEnum())
{
auto it = enum_decoders.find(type_info);
if (it == enum_decoders.end())
{
it = enum_decoders
.try_emplace(type_info, std::make_shared<EnumDecoder>(type_info))
.first;
}
enum_decoder = it->second;
}
_indices.push_back(ElementContext{
ddl::codec::LeafCodecIndex(index, data_is_serialized ? ddl::DataRepresentation::serialized : ddl::DataRepresentation::deserialized),
std::move(enum_decoder)
});
}
}
std::shared_ptr< const StreamItem > stream_item
Definition: reader.h:164
Plugin initializer class to use within a ADTF File Library plugin.
Definition: object.h:104
Definition: processor.h:112
Objects & getObjects()
Get the objects singleton of the library.
void for_each_leaf_element(ElementsType &elements, const element_callback< ElementsType > &callback)
Iterates ALL leaf elements within ALL array elements.
Definition: ddl_helpers.h:147
std::tuple< ddl::codec::CodecFactory, bool > createDDLCodecFactoryFromStreamType(const std::shared_ptr< const adtf_file::StreamType > &stream_type)
Definition: ddl_helpers.h:90