#include "csv_reader.h"
#include <sstream>
#include <cerrno>
#include <string.h>
#include <cctype>
});
namespace {
void checkCompatibility(const std::string& filename)
{
if (!(filename.rfind("csv") == filename.size() - 3))
{
throw std::runtime_error("not a csv file.");
}
}
void check_error_bits(const std::ifstream& stream)
{
if (stream.fail() ||
stream.bad())
{
throw std::runtime_error(std::string("unable to read from file: ") + strerror(errno));
}
}
void rtrim(std::string& s) {
s.erase(std::find_if(s.rbegin(), s.rend(), [](unsigned char ch) {
return !std::isspace(ch);
}).base(), s.end());
}
std::vector<std::string> split_line(std::string line, const std::string& separator)
{
rtrim(line);
std::vector<std::string> columns;
auto separator_position = std::string::npos;
while ((separator_position = line.find(separator)) != std::string::npos)
{
columns.push_back(line.substr(0, separator_position));
line = line.substr(separator_position + 1);
}
if (!line.empty())
{
columns.push_back(line);
}
return columns;
}
std::string build_ddl(const std::vector<std::string>& columns,
const std::string& stream_name,
const std::string& ddl_data_type,
size_t timestamp_column_index)
{
auto data_type = ddl::PredefinedDataTypes::getInstance().getPredefinedType(ddl_data_type);
if (!data_type)
{
throw std::runtime_error("unable to resolve data type: " + ddl_data_type);
}
auto type_byte_size = data_type->getBitSize() / 8;
if (data_type->getBitSize() % 8)
{
++type_byte_size;
}
std::ostringstream struct_definition;
struct_definition << "<struct name=\"" << stream_name << "\" alignment=\"1\" version=\"1\">";
size_t byte_position = 0;
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
if (column_index != timestamp_column_index)
{
struct_definition << "<element name=\"" << columns[column_index] << "\" alignment=\"1\" type=\"" << ddl_data_type << "\" bytepos=\"" << byte_position << "\" arraysize=\"1\" byteorder=\"LE\"/>";
byte_position += type_byte_size;
}
}
struct_definition << "</struct>";
return struct_definition.str();
}
}
CsvReader::CsvReader()
{
setConfiguration({ {"stream_name", { "csv", "string" }},
{ "separator", {";", "string"} },
{ "timestamp_column_index", {"0", "uint64"} },
{ "timestamp_factor", {"1.0", "double"} },
{ "timestamp_offset", {"0", "int64"} },
{ "ddl_data_type", {"tFloat64", "string"} } });
}
std::string CsvReader::getReaderIdentifier() const
{
return "csv";
}
void CsvReader::open(const std::string& file_name,
std::shared_ptr<adtf_file::SampleFactory> sample_factory,
std::shared_ptr<adtf_file::StreamTypeFactory> stream_type_factory)
{
_sample_factory = sample_factory;
checkCompatibility(file_name);
try
{
_csv_file.open(file_name);
check_error_bits(_csv_file);
}
catch (...)
{
std::throw_with_nested(std::runtime_error("unable to open input csv file '" + file_name + "'"));
}
const auto& config = getConfiguration();
auto stream_name = adtf_file::getPropertyValue<std::string>(config, "stream_name");
_separator = adtf_file::getPropertyValue<std::string>(config, "separator");
auto ddl_data_type = adtf_file::getPropertyValue<std::string>(config, "ddl_data_type");
_timestamp_column_index = adtf_file::getPropertyValue<uint64_t>(config, "timestamp_column_index");
_timestamp_factor = adtf_file::getPropertyValue<double>(config, "timestamp_factor");
_timestamp_offset = std::chrono::microseconds(adtf_file::getPropertyValue<int64_t>(config, "timestamp_offset"));
std::string first_line;
std::getline(_csv_file, first_line);
if (first_line.empty())
{
throw std::runtime_error("unable to read header line from '" + file_name + "'");
}
auto columns = split_line(first_line, _separator);
auto struct_definition = build_ddl(columns, stream_name, ddl_data_type, _timestamp_column_index);
_codec_factory = ddl::codec::CodecFactory(stream_name.c_str(), struct_definition.c_str());
_codec_indices = ddl::codec::getCodecIndices(_codec_factory);
auto type = stream_type_factory->build();
auto property_type = std::dynamic_pointer_cast<adtf_file::PropertyStreamType>(type);
property_type->setMetaType("adtf/default");
property_type->setProperty("md_definitions", "cString", struct_definition);
property_type->setProperty("md_struct", "cString", stream_name);
property_type->setProperty("md_data_serialized", "tBool", "false");
_streams.push_back({ 0,
stream_name,
0,
std::chrono::seconds(0),
std::chrono::seconds(0),
}
std::vector<adtf_file::Stream> CsvReader::getStreams() const
{
return _streams;
}
{
std::string line;
if (!std::getline(_csv_file, line))
{
if (_csv_file.eof())
{
}
check_error_bits(_csv_file);
}
auto items = split_line(line, _separator);
auto sample = _sample_factory->build();
auto read_sample = std::dynamic_pointer_cast<adtf_file::ReadSample>(sample);
{
auto codec = _codec_factory.makeStaticCodecFor(read_sample->beginBufferWrite(_codec_factory.getStaticBufferSize()),
_codec_factory.getStaticBufferSize());
size_t current_codec_index_to_use = 0;
for (size_t item_index = 0; item_index < items.size(); ++item_index)
{
auto value = std::stod(items[item_index]);
if (item_index == _timestamp_column_index)
{
_last_sample_timestamp = std::chrono::microseconds(static_cast<int64_t>(value * _timestamp_factor)) + _timestamp_offset;
read_sample->setTimeStamp(_last_sample_timestamp);
}
else
{
if (current_codec_index_to_use < _codec_indices.size()) {
try
{
codec.setElementValue(_codec_indices[current_codec_index_to_use++], value);
}
catch (...)
{
std::throw_with_nested(std::runtime_error("unable to update element with index " + std::to_string(current_codec_index_to_use)));
}
}
}
}
read_sample->endBufferWrite();
}
return {0, _last_sample_timestamp, sample};
}
Plugin initializer class to use within a ADTF File Library plugin.
Definition: object.h:104
Default Reader factory implementation for readers using a standard default CTOR.
Definition: reader.h:339
#define A_UTIL_THROW_IF_FAILED(__exp, __msg)
helper macro to throw an exception on failed a_util::result::Result
Definition: ddl_helpers.h:27
Objects & getObjects()
Get the objects singleton of the library.
std::shared_ptr< adtf_file::StreamType > createAdtfDefaultStreamType(const std::string &struct_name, const std::string &struct_definition, bool is_serialized=false)
Definition: ddl_helpers.h:74
utils5ext::exceptions::EndOfFile EndOfFile
Exception to indicate the end of file was reached.
Definition: indexedfile_types.h:114