adtf_file_library  0.13.1
CSV Reader

Description

Example reader which shows how to implement a adtf_file::Reader to import a DDL based stream from the content of a *.csv file.

Usage

adtfdat_tool --plugin csv_reader.adtffileplugin --create <adtfdat> --input <csv> --stream <name> --readerid csv <csv> [--property stream_name=<value> --property separator=<value> --property timestamp_column_index=<value> --property timestamp_factor=<value> --property timestamp_offset=<value> --property ddl_data_type=<value>]

Source

#pragma once
#include <memory>
#include <fstream>
#include <string>
#include <chrono>
#include <ddl/ddl.h>
class CsvReader : public adtf_file::Reader
{
public:
CsvReader();
std::string getReaderIdentifier() const override;
void open(const std::string& file_name,
std::shared_ptr<adtf_file::SampleFactory> sample_factory,
std::shared_ptr<adtf_file::StreamTypeFactory> stream_type_factory) override;
std::vector<adtf_file::Stream> getStreams() const override;
private:
std::ifstream _csv_file;
ddl::codec::CodecFactory _codec_factory;
std::vector<ddl::codec::CodecIndex> _codec_indices;
std::string _separator;
std::vector<adtf_file::Stream> _streams;
size_t _timestamp_column_index = 0;
double _timestamp_factor = 1.0;
std::chrono::nanoseconds _timestamp_offset;
std::chrono::nanoseconds _last_sample_timestamp;
std::shared_ptr<adtf_file::SampleFactory> _sample_factory;
};
class to create or read a file item. This file item is either a sample, streamtype or trigger.
Definition: reader.h:156
Definition: reader.h:173
virtual std::string getReaderIdentifier() const =0
Get the Reader Identifier of the Reader.
virtual std::vector< Stream > getStreams() const
Get the Streams.
Definition: reader.h:237
virtual FileItem getNextItem()=0
virtual void open(const std::string &filename, std::shared_ptr< SampleFactory > sample_factory, std::shared_ptr< StreamTypeFactory > stream_type_factory)=0
opens a file by the given filename. The given factories must be used to create samples and streamtype...
#include "csv_reader.h"
#include <sstream>
#include <cerrno>
#include <string.h>
#include <cctype>
static adtf_file::PluginInitializer initializer([] {
adtf_file::getObjects().push_back(
});
namespace {
void checkCompatibility(const std::string& filename)
{
if (!(filename.rfind("csv") == filename.size() - 3))
{
throw std::runtime_error("not a csv file.");
}
}
void check_error_bits(const std::ifstream& stream)
{
if (stream.fail() ||
stream.bad())
{
throw std::runtime_error(std::string("unable to read from file: ") + strerror(errno));
}
}
void rtrim(std::string& s) {
s.erase(std::find_if(s.rbegin(), s.rend(), [](unsigned char ch) {
return !std::isspace(ch);
}).base(), s.end());
}
std::vector<std::string> split_line(std::string line, const std::string& separator)
{
rtrim(line);
std::vector<std::string> columns;
auto separator_position = std::string::npos;
while ((separator_position = line.find(separator)) != std::string::npos)
{
columns.push_back(line.substr(0, separator_position));
line = line.substr(separator_position + 1);
}
if (!line.empty())
{
columns.push_back(line);
}
return columns;
}
std::string build_ddl(const std::vector<std::string>& columns,
const std::string& stream_name,
const std::string& ddl_data_type,
size_t timestamp_column_index)
{
auto data_type = ddl::PredefinedDataTypes::getInstance().getPredefinedType(ddl_data_type);
if (!data_type)
{
throw std::runtime_error("unable to resolve data type: " + ddl_data_type);
}
auto type_byte_size = data_type->getBitSize() / 8;
if (data_type->getBitSize() % 8)
{
++type_byte_size;
}
// I hate to do it manually, but the ddl:: implementation is sooo cumbersome,
// that it is barely usable (especially the memory handling)
std::ostringstream struct_definition;
struct_definition << "<struct name=\"" << stream_name << "\" alignment=\"1\" version=\"1\">";
size_t byte_position = 0;
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
if (column_index != timestamp_column_index)
{
struct_definition << "<element name=\"" << columns[column_index] << "\" alignment=\"1\" type=\"" << ddl_data_type << "\" bytepos=\"" << byte_position << "\" arraysize=\"1\" byteorder=\"LE\"/>";
byte_position += type_byte_size;
}
}
struct_definition << "</struct>";
return struct_definition.str();
}
}
CsvReader::CsvReader()
{
// use the default CTOR to create the set the default configuration of the reader
setConfiguration({ {"stream_name", { "csv", "string" }},
{ "separator", {";", "string"} },
{ "timestamp_column_index", {"0", "uint64"} },
{ "timestamp_factor", {"1.0", "double"} },
{ "timestamp_offset", {"0", "int64"} },
{ "ddl_data_type", {"tFloat64", "string"} } });
}
std::string CsvReader::getReaderIdentifier() const
{
// this must be a "unique" identifier for your reader
return "csv";
}
void CsvReader::open(const std::string& file_name,
std::shared_ptr<adtf_file::SampleFactory> sample_factory,
std::shared_ptr<adtf_file::StreamTypeFactory> stream_type_factory)
{
_sample_factory = sample_factory;
checkCompatibility(file_name);
try
{
_csv_file.open(file_name);
check_error_bits(_csv_file);
}
catch (...)
{
std::throw_with_nested(std::runtime_error("unable to open input csv file '" + file_name + "'"));
}
const auto& config = getConfiguration();
auto stream_name = adtf_file::getPropertyValue<std::string>(config, "stream_name");
_separator = adtf_file::getPropertyValue<std::string>(config, "separator");
auto ddl_data_type = adtf_file::getPropertyValue<std::string>(config, "ddl_data_type");
_timestamp_column_index = adtf_file::getPropertyValue<uint64_t>(config, "timestamp_column_index");
_timestamp_factor = adtf_file::getPropertyValue<double>(config, "timestamp_factor");
_timestamp_offset = std::chrono::microseconds(adtf_file::getPropertyValue<int64_t>(config, "timestamp_offset"));
std::string first_line;
std::getline(_csv_file, first_line);
if (first_line.empty())
{
throw std::runtime_error("unable to read header line from '" + file_name + "'");
}
auto columns = split_line(first_line, _separator);
auto struct_definition = build_ddl(columns, stream_name, ddl_data_type, _timestamp_column_index);
_codec_factory = ddl::codec::CodecFactory(stream_name.c_str(), struct_definition.c_str());
_codec_indices = ddl::codec::getCodecIndices(_codec_factory);
A_UTIL_THROW_IF_FAILED(_codec_factory.isValid(), "unable to create codec factory for '" + file_name + "'");
auto type = stream_type_factory->build();
auto property_type = std::dynamic_pointer_cast<adtf_file::PropertyStreamType>(type);
property_type->setMetaType("adtf/default");
property_type->setProperty("md_definitions", "cString", struct_definition);
property_type->setProperty("md_struct", "cString", stream_name);
property_type->setProperty("md_data_serialized", "tBool", "false");
_streams.push_back({ 0,
stream_name,
0,
std::chrono::seconds(0),
std::chrono::seconds(0),
adtfdat_processing::createAdtfDefaultStreamType(stream_name, struct_definition, false) });
}
std::vector<adtf_file::Stream> CsvReader::getStreams() const
{
return _streams;
}
adtf_file::FileItem CsvReader::getNextItem()
{
std::string line;
if (!std::getline(_csv_file, line))
{
if (_csv_file.eof())
{
}
check_error_bits(_csv_file);
}
auto items = split_line(line, _separator);
auto sample = _sample_factory->build();
auto read_sample = std::dynamic_pointer_cast<adtf_file::ReadSample>(sample);
{
auto codec = _codec_factory.makeStaticCodecFor(read_sample->beginBufferWrite(_codec_factory.getStaticBufferSize()),
_codec_factory.getStaticBufferSize());
size_t current_codec_index_to_use = 0;
for (size_t item_index = 0; item_index < items.size(); ++item_index)
{
auto value = std::stod(items[item_index]);
if (item_index == _timestamp_column_index)
{
_last_sample_timestamp = std::chrono::microseconds(static_cast<int64_t>(value * _timestamp_factor)) + _timestamp_offset;
read_sample->setTimeStamp(_last_sample_timestamp);
}
else
{
if (current_codec_index_to_use < _codec_indices.size()) {
try
{
codec.setElementValue(_codec_indices[current_codec_index_to_use++], value);
}
catch (...)
{
std::throw_with_nested(std::runtime_error("unable to update element with index " + std::to_string(current_codec_index_to_use)));
}
}
}
}
read_sample->endBufferWrite();
}
return {0, _last_sample_timestamp, sample};
}
Plugin initializer class to use within a ADTF File Library plugin.
Definition: object.h:104
Default Reader factory implementation for readers using a standard default CTOR.
Definition: reader.h:339
#define A_UTIL_THROW_IF_FAILED(__exp, __msg)
helper macro to throw an exception on failed a_util::result::Result
Definition: ddl_helpers.h:27
Objects & getObjects()
Get the objects singleton of the library.
std::shared_ptr< adtf_file::StreamType > createAdtfDefaultStreamType(const std::string &struct_name, const std::string &struct_definition, bool is_serialized=false)
Definition: ddl_helpers.h:74
utils5ext::exceptions::EndOfFile EndOfFile
Exception to indicate the end of file was reached.
Definition: indexedfile_types.h:114

Copyright © CARIAD SE.
Generated on Fri Apr 19 2024 by doxygen 1.9.1
GIT Commit Hash: 82d535f82776c20b12fc60740bdae991b62444a7