读取和写入CSV文件
Arrow提供了一个快速的CSV阅读器,允许摄取外部数据以创建Arrow表或Arrow RecordBatches的流。
另请参见 CSV读取器/写入器API参考。
读取CSV文件
CSV文件中的数据可以使用TableReader读取为单个Arrow表,也可以使用StreamingReader流式传输为RecordBatches。有关这两种方法之间的权衡讨论,请参见权衡。
这两个阅读器都需要表示输入文件的arrow::io::InputStream实例。它们的行为可以使用ReadOptions、ParseOptions和ConvertOptions的组合进行自定义。
TableReader
#include "arrow/csv/api.h"
{
   // ...
   arrow::io::IOContext io_context = arrow::io::default_io_context();
   std::shared_ptr<arrow::io::InputStream> input = ...;
   auto read_options = arrow::csv::ReadOptions::Defaults();
   auto parse_options = arrow::csv::ParseOptions::Defaults();
   auto convert_options = arrow::csv::ConvertOptions::Defaults();
   // 从输入流和选项实例化TableReader
   auto maybe_reader =
     arrow::csv::TableReader::Make(io_context,
                                   input,
                                   read_options,
                                   parse_options,
                                   convert_options);
   if (!maybe_reader.ok()) {
     // 处理TableReader实例化错误...
   }
   std::shared_ptr<arrow::csv::TableReader> reader = *maybe_reader;
   // 从CSV文件中读取表格
   auto maybe_table = reader->Read();
   if (!maybe_table.ok()) {
     // 处理CSV读取错误
     //(例如CSV语法错误或类型转换失败)
   }
   std::shared_ptr<arrow::Table> table = *maybe_table;
}StreamingReader
#include "arrow/csv/api.h"
{
   // ...
   arrow::io::IOContext io_context = arrow::io::default_io_context();
   std::shared_ptr<arrow::io::InputStream> input = ...;
   auto read_options = arrow::csv::ReadOptions::Defaults();
   auto parse_options = arrow::csv::ParseOptions::Defaults();
   auto convert_options = arrow::csv::ConvertOptions::Defaults();
   // 从输入流和选项实例化StreamingReader
   auto maybe_reader =
     arrow::csv::StreamingReader::Make(io_context,
                                       input,
                                       read_options,
                                       parse_options,
                                       convert_options);
   if (!maybe_reader.ok()) {
     // 处理StreamingReader实例化错误...
   }
   std::shared_ptr<arrow::csv::StreamingReader> reader = *maybe_reader;
   // 为在流式传输时重用的RecordBatch指针保留
   std::shared_ptr<RecordBatch> batch;
   while (true) {
       // 尝试读取第一个RecordBatch
       arrow::Status status = reader->ReadNext(&batch);
       if (!status.ok()) {
         // 处理读取错误
       }
       if (batch == NULL) {
         // 处理文件末尾
         break;
       }
       // 对批处理执行某些操作
   }
}权衡
选择使用TableReader还是StreamingReader最终取决于用例,但需要注意以下一些权衡:
- 内存使用:TableReader一次性将所有数据加载到内存中,根据数据量的不同,可能需要比StreamingReader多得多的内存。这对用户来说可能是最重要的权衡之一。 
- 速度:在读取整个CSV内容时,TableReader通常会比StreamingReader更快,因为它更好地利用了可用的核心。有关更多详细信息,请参见性能。 
- 灵活性:StreamingReader可能被认为不如TableReader灵活,因为它只对首个读取的数据块执行类型推断,在此后,类型将被冻结,任何后续数据块中不能转换为这些类型的数据将导致错误。请注意,可以通过将ReadOptions::block_size设置为足够大的值或使用ConvertOptions::column_types显式设置所需的数据类型来解决这个问题。 
写入CSV文件
CSV文件写入到OutputStream。
#include <arrow/csv/api.h>
{
    // 单次写入
    // ...
    std::shared_ptr<arrow::io::OutputStream> output = ...;
    auto write_options = arrow::csv::WriteOptions::Defaults();
    if (WriteCSV(table, write_options, output.get()).ok()) {
        // 处理写入错误...
    }
}
{
    // 逐步写入
    // ...
    std::shared_ptr<arrow::io::OutputStream> output = ...;
    auto write_options = arrow::csv::WriteOptions::Defaults();
    auto maybe_writer = arrow::csv::MakeCSVWriter(output, schema, write_options);
    if (!maybe_writer.ok()) {
        // 处理写入器实例化错误...
    }
    std::shared_ptr<arrow::ipc::RecordBatchWriter> writer = *maybe_writer;
    // 写入批处理...
    if (!writer->WriteRecordBatch(*batch).ok()) {
        // 处理写入错误...
    }
    if (!writer->Close().ok()) {
        // 处理关闭错误...
    }
    if (!output->Close().ok()) {
        // 处理文件关闭错误...
    }
}注意
该写入器尚不支持所有Arrow类型。
列名称
有三种可能的方式可以从CSV文件中推断列名称:
- 默认情况下,列名从CSV文件的第一行中读取 
- 如果设置了ReadOptions::column_names,它会强制将表格中的列名设置为这些值(CSV文件的第一行将被读取为数据) 
- 如果ReadOptions::autogenerate_column_names为true,列名将自动生成,模式为“f0”、“f1”...(CSV文件的第一行将被读取为数据) 
列选择
默认情决中,Arrow会读取CSV文件中的所有列。您可以使用ConvertOptions::include_columns选项来缩小列的选择。
如果在ConvertOptions::include_columns中的某些列在CSV文件中缺失,除非ConvertOptions::include_missing_columns为true,否则会发出错误,此时假定缺失的列包含全空值。
与列名的交互
如果同时指定了ReadOptions::column_names和ConvertOptions::include_columns,那么ReadOptions::column_names将被认为是CSV列的映射,而ConvertOptions::include_columns是Arrow表中将包含的那些列名的子集。
数据类型
默认情况下,CSV读取器会为每列推断最合适的数据类型。类型推断考虑以下数据类型,按顺序:
- Null 
- Int64 
- Boolean 
- Date32 
- Time32(带秒的单位) 
- Timestamp(带秒的单位) 
- Timestamp(带纳秒的单位) 
- Float64 
- Dictionary<String>(如果ConvertOptions::auto_dict_encode为true) 
- Dictionary<Binary>(如果ConvertOptions::auto_dict_encode为true) 
- String 
- Binary 
您可以通过设置ConvertOptions::column_types选项来覆盖对选定列的类型推断。可以从以下列表中选择显式的数据类型:
- Null 
- 所有整数类型 
- Float32和Float64 
- Decimal128 
- Boolean 
- Date32和Date64 
- Time32和Time64 
- Timestamp 
- Binary和Large Binary 
- String和Large String(具有可选的UTF8输入验证) 
- 固定大小的二进制 
- 字典,索引类型为Int32,值类型为以下之一:Binary、String、LargeBinary、LargeString、Int32、UInt32、Int64、UInt64、Float32、Float64、Decimal128 
其他数据类型不支持从CSV值进行转换,将导致错误。
字典推断
如果启用了类型推断并且ConvertOptions::auto_dict_encode为true,CSV读取器首先尝试将类似字符串的列转换为字典编码的字符串样式数组。当ConvertOptions::auto_dict_max_cardinality中的阈值达到时,它切换到普通的字符串样式数组。
时间戳推断/解析
如果启用了类型推断,CSV读取器首先尝试将类似字符串的列解释为时间戳。如果所有行都有某个区域偏移量(例如Z或+0100),即使偏移量不一致,推断的类型也将是UTC时间戳。如果没有行具有区域偏移量,那么推断的类型将是没有时区的时间戳。具有/没有偏移的行混合将导致一个字符串列。
如果明确指定了时间戳的类型(带/不带时区),那么读取器将在该列中没有/有区域偏移量的值上报错。请注意,这意味着目前无法使读取器将没有区域偏移量的时间戳列解析为特定时区的本地时间;相反,请将列解析为没有时区的时间戳,然后使用assume_timezone计算函数后转换值。

Nulls
从ConvertOptions::null_values中存储的拼写中识别空值。ConvertOptions::Defaults()工厂方法将初始化一些常规的空值拼写,例如N/A。
字符编码
CSV文件应编码为UTF8。但是,Binary列可以接受非UTF8数据。
写入选项
可以通过WriteOptions自定义写入的CSV文件格式。目前提供了少量选项;将在未来的发布版本中添加更多选项。
性能
默认情况下,TableReader将并行化读取,以充分利用计算机上的所有CPU核心。您可以在ReadOptions::use_threads中更改此设置。对于性能良好的台式机或笔记本电脑,合理的期望是每个核心至少为100 MB/s(以源CSV字节而不是目标Arrow数据字节来衡量)。
 
                    
系统当前共有 481 篇文章