arrow系列23---读写Parquet文件
作者:yunjinqi   类别:    日期:2023-10-16 21:11:42    阅读:271 次   消耗积分:0 分    

读写Parquet文件 

另请参见 Parquet读取器和写入器API参考。

Parquet格式是一种高效的复杂数据的列式存储格式。Parquet C++实现是Apache Arrow项目的一部分,并受益于与Arrow C++类和工具的紧密集成。

读取Parquet文件 

arrow::FileReader类将数据读入Arrow表和记录批次。

StreamReader类允许使用C++输入流方法按列和按行读取数据。此方法提供了使用的便利性和类型安全性。当数据必须被按列读取并逐步读取和写入文件时,此方法也很有用。

请注意,StreamReader的性能可能不如其他方法好,因为它需要进行类型检查,并且列值会逐个处理。

FileReader 要将Parquet数据读入Arrow结构,使用arrow::FileReader。要构造它,需要一个表示输入文件的::arrow::io::RandomAccessFile实例。要一次读取整个文件,请使用arrow::FileReader::ReadTable():


//#include "arrow/io/api.h"
 
//#include "arrow/parquet/arrow/reader.h"
arrow::MemoryPool* pool = arrow::default_memory_pool(); 
std::shared_ptrarrow::io::RandomAccessFile input; 
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(path_to_file));
// 打开Parquet文件读取器 
std::unique_ptrparquet::arrow::FileReader arrow_reader; 
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, pool, &arrow_reader));
// 将整个文件读取为单个Arrow表 
std::shared_ptrarrow::Table table; 
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));


arrow::FileReaderBuilder帮助类可提供更细粒度的选项,它接受ReaderProperties和ArrowReaderProperties类。

要以批量记录流的形式读取,使用arrow::FileReader::GetRecordBatchReader()方法来检索arrow::RecordBatchReader。它将使用ArrowReaderProperties中设置的批量大小。

// #include "arrow/io/api.h" 
// #include "arrow/parquet/arrow/reader.h"
arrow::MemoryPool* pool = arrow::default_memory_pool();
// 配置通用Parquet读取器设置 
auto reader_properties = parquet::ReaderProperties(pool); 
reader_properties.set_buffer_size(4096 * 4); 
reader_properties.enable_buffered_stream();
// 配置特定于Arrow的Parquet读取器设置 
auto arrow_reader_props = parquet::ArrowReaderProperties(); 
arrow_reader_props.set_batch_size(128 * 1024); // 默认值为64 * 1024
parquet::arrow::FileReaderBuilder reader_builder; 
ARROW_RETURN_NOT_OK(
reader_builder.OpenFile(path_to_file, /memory_map=/false, reader_properties)); 
reader_builder.memory_pool(pool); 
reader_builder.properties(arrow_reader_props);
std::unique_ptrparquet::arrow::FileReader arrow_reader; 
ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader; 
ARROW_RETURN_NOT_OK(arrow_reader->GetRecordBatchReader(&rb_reader));
for (arrow::Result<std::shared_ptrarrow::RecordBatch> maybe_batch : *rb_reader) {
// 操作每个批次...
}

另请参见

有关读取多文件数据集或下推筛选以修剪行组的信息,请参阅Tabular Datasets。

性能和内存效率 

对于远程文件系统,使用读取合并(预缓冲)来减少API调用的数量:

auto arrow_reader_props = parquet::ArrowReaderProperties(); 
reader_properties.set_prebuffer(true);

默认情况下,性能是调优的,但并行列解码默认情况下处于关闭状态。在ArrowReaderProperties的构造函数中启用它:

auto arrow_reader_props = parquet::ArrowReaderProperties(/use_threads=/true);

如果内存效率比性能更重要,那么:

  1. 在parquet::ArrowReaderProperties中不要启用读取合并(预缓冲)。

  2. 使用arrow::FileReader::GetRecordBatchReader()批量读取数据。

  3. 在parquet::ReaderProperties中启用enable_buffered_stream。

此外,如果你知道某些列包含许多重复的值,你可以将它们读取为字典编码的列。使用ArrowReaderProperties上的set_read_dictionary设置启用此功能。如果文件是使用Arrow C++编写的,并且启用了store_schema,则原始的Arrow模式将自动读取并将覆盖此设置。


StreamReader 


StreamReader允许使用标准的C++输入运算符来读取Parquet文件,以确保类型安全。

请注意,类型必须与模式完全匹配,即如果模式字段是无符号的16位整数,那么必须提供uint16_t类型。

异常用于表示错误。在以下情况下会引发ParquetException:

  • 尝试使用不正确的类型读取字段。

  • 尝试超出行的末尾。

  • 尝试超出文件的末尾。

#include "arrow/io/file.h" 
#include "parquet/stream_reader.h"
{
std::shared_ptrarrow::io::ReadableFile infile;
PARQUET_ASSIGN_OR_THROW(
infile,
arrow::io::ReadableFile::Open("test.parquet"));
parquet::StreamReader stream{parquet::ParquetFileReader::Open(infile)};
std::string article;
float price;
uint32_t quantity;
while ( !stream.eof() )
{
stream >> article >> price >> quantity >> parquet::EndRow;
// ...
}
}

写入Parquet文件 

WriteTable 

arrow::WriteTable()函数将整个::arrow::Table写入输出文件。

// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;
ARROW_ASSIGN_OR_RAISE(std::shared_ptrarrow::Table table, GetTable());
// 选择压缩
std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
// 选择存储Arrow模式以便后续更容易读取到Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();
std::shared_ptrarrow::io::FileOutputStream outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(),
arrow::default_memory_pool(), outfile,
/chunk_size=/3, props, arrow_props));

请注意

默认情况下,C++中不启用列压缩。有关如何选择写入属性中的压缩编解码器的详细信息,请参阅下文。


要按批次写出数据,请使用arrow::FileWriter。


// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;
// 数据以RBR格式存在
std::shared_ptrarrow::RecordBatchReader batch_stream;
ARROW_ASSIGN_OR_RAISE(batch_stream, GetRBR());
// 选择压缩
std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
// 选择存储Arrow模式以便后续更容易读取到Arrow
std的shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();
// 创建一个写入器
std::shared_ptrarrow::io::FileOutputStream outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
std::unique_ptrparquet::arrow::FileWriter writer;
ARROW_ASSIGN_OR_RAISE(
writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(),
arrow::default_memory_pool(), outfile,
props, arrow_props));
// 将每个批次写入为一个row_group
for (arrow::Result<std::shared_ptrarrow::RecordBatch> maybe_batch : *batch_stream) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
ARROW_ASSIGN_OR_RAISE(auto table,
arrow::Table::FromRecordBatches(batch->schema(), {batch}));
ARROW_RETURN_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows()));
}
// 写入文件页脚并关闭
ARROW_RETURN_NOT_OK(writer->Close());

StreamWriter 

StreamWriter允许使用标准C++输出运算符将Parquet文件写入,类似于使用StreamReader类进行读取。这种类型安全的方法还确保行是按列写入的,允许在数据量达到一定程度后自动(或显式)创建新的行组(使用EndRowGroup流修改器)。

异常用于表示错误。在以下情况下会引发ParquetException:

  • 尝试使用不正确的类型写字段。

  • 尝试在一行中写入太多字段。

  • 尝试跳过必填字段。

#include "arrow/io/file.h"
#include "parquet/stream_writer.h"
{
std::shared_ptrarrow::io::FileOutputStream outfile;
PARQUET_ASSIGN_OR_THROW(
outfile,
arrow::io::FileOutputStream::Open("test.parquet"));
parquet::WriterProperties::Builder builder;
std::shared_ptrparquet::schema::GroupNode schema;
// 使用需要的压缩类型等设置构建器。
// 定义模式。
// ...
parquet::StreamWriter os{
parquet::ParquetFileWriter::Open(outfile, schema, builder.build())};
// 遍历提供所需字段的数据结构,并写入每行。
for (const auto& a : getArticles())
{
os << a.name() << a.price() << a.quantity() << parquet::EndRow;
}
}

写入属性 

要配置如何写入Parquet文件,请使用WriterProperties::Builder:

#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"
using parquet::WriterProperties;
using parquet::ParquetVersion;
using parquet::ParquetDataPageVersion;
using arrow::Compression;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.max_row_group_length(64 * 1024)
.created_by("My Application")
.version(ParquetVersion::PARQUET_2_6)
.data_page_version(ParquetDataPageVersion::V2)
.compression(Compression::SNAPPY)
.build();

max_row_group_length设置了每个行组的行数的上限,这会优先于写方法中传递的chunk_size。

您可以选择要使用的Parquet版本,version确定可用的逻辑类型。此外,您可以设置data_page_version。它默认为V1;设置为V2将允许更优化的压缩(跳过无法获得空间优势的页面的压缩),但并非所有读取器都支持此数据页版本。

默认情况下不启用压缩,但为了充分利用Parquet,您还应该选择压缩编解码器。您可以为整个文件选择一个,或为各列选择一个。如果选择混合,文件级选项将应用于没有特定压缩编解码器的列。有关选项,请参阅::arrow::Compression。

列数据编码也可以应用于文件级别或列级别。默认情况下,编写器将尝试对所有支持的列进行字典编码,除非字典太大。此行为可以在文件级别或列级别使用disable_dictionary()更改。当不使用字典编码时,它将回退到为列或整个文件设置的编码;默认情况下是Encoding::PLAIN,但可以使用encoding()更改。

#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"
using parquet::WriterProperties;
using arrow::Compression;
using parquet::Encoding;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.compression(Compression::SNAPPY) // 回退
->compression("colA", Compression::ZSTD) // 仅应用于列"colA"
->encoding(Encoding::BIT_PACKED) // 回退
->encoding("colB", Encoding::RLE) // 仅应用于列"colB"
->disable_dictionary("colB") // 不要为列"colB"进行字典编码
->build();

默认情况下,为所有列启用统计信息。您可以使用disable_statistics构建器上的方法为所有列或特定列禁用统计信息。max_statistics_size可以限制用于最小值和最大值的最大字节数,对于诸如字符串或二进制块之类的类型很有用。如果列启用了页面索引,使用enable_write_page_index,则它将不会将统计信息写入页面标头,因为它在ColumnIndex中重复。

还可以使用parquet::ArrowWriterProperties配置与Arrow特定的设置:

#include "parquet/arrow/writer.h"
using parquet::ArrowWriterProperties;
std::shared_ptr<ArrowWriterProperties> arrow_props = ArrowWriterProperties::Builder()
.enable_deprecated_int96_timestamps() // 默认为False
->store_schema() // 默认为False
->build();

这些选项主要规定如何将Arrow类型转换为Parquet类型。启用store_schema将使编写器将序列化的Arrow模式存储在文件元数据中。由于Parquet模式和Arrow模式之间没有双射,存储Arrow模式允许Arrow读取器更准确地重新创建原始数据。从Parquet类型到原始Arrow类型的映射包括:

  • 使用原始时区信息读取时间戳(Parquet不支持时区);

  • 从存储类型中读取Arrow类型(例如从int64列读取的Duration);

  • 将字符串和二进制列读回具有64位偏移量的大变体;

  • 读回列作为字典编码的列(无论Arrow列和序列化的Parquet版本是否是字典编码都是独立的)。

支持的Parquet功能 

Parquet格式具有许多功能,Parquet C++支持其中的一个子集。

页面类型 


  • DATA_PAGE

  • DATA_PAGE_V2

  • DICTIONARY_PAGE

  • 不支持的页面类型:INDEX_PAGE。在读取Parquet文件时,会忽略此类型的页面。

压缩 压缩编解码器

  • SNAPPY

  • GZIP

  • BROTLI

  • LZ4 (1)

  • ZSTD

  • 不支持的压缩编解码器:LZO。

(1) 在读取方面,Parquet C++能够解压缩常规LZ4块格式和参考Parquet实现使用的自定义Hadoop LZ4格式。在写入方面,Parquet C++始终生成自定义Hadoop LZ4格式。


编码 编码

截图 2023-10-16 21-02-40.png

(1)仅支持编码定义和重复级别以及布尔值。

(2)在写入路径上,只有当WriterProperties::version()中选择了Parquet格式版本2.4或更高版本时,才启用RLE_DICTIONARY。

类型 

物理类型 

截图 2023-10-16 21-06-54.png


  • (1)可以映射到其他Arrow类型,具体取决于逻辑类型(见下文)。

  • (2)在写入端,始终会发出一个FIXED_LENGTH_BYTE_ARRAY。

  • (3)在写入端,Arrow Date64也映射到Parquet DATE INT32。

逻辑类型 

特定的逻辑类型可以覆盖给定物理类型的默认Arrow类型映射。

截图 2023-10-16 21-07-37.png

  • (1)在写入端,Parquet的物理类型INT32是生成的。

  • (2)在写入端,始终会发出一个FIXED_LENGTH_BYTE_ARRAY。

  • (3)在写入端,Arrow Date64也映射到Parquet DATE INT32。

  • (4)在写入端,Arrow LargeUtf8也映射到Parquet STRING。

  • (5)在写入端,Arrow LargeList或FixedSizedList也映射到Parquet LIST。

  • (6)在读取端,具有多个值的键不会被去重,与Parquet规范相矛盾。

不支持的逻辑类型:JSON、BSON、UUID。如果在读取Parquet文件时遇到这种类型,将使用默认的物理类型映射(例如,Parquet的JSON列可能被读取为Arrow的Binary或FixedSizeBinary)。

已转换类型 

尽管已转换类型在Parquet格式中已不建议使用(它们已被逻辑类型取代),但Parquet C++实现可以识别并发出这些类型,以便最大限度地提高与其他Parquet实现的兼容性。

特殊情况 

Arrow扩展类型按其存储类型写出。可以在读取时使用Parquet元数据重新创建它(请参见下文的“Arrow类型的回转”)。

Arrow字典类型按其值类型写出。可以在读取时使用Parquet元数据重新创建它(请参见下文的“Arrow类型的回转”)。

Arrow类型的回转 

虽然Arrow类型和Parquet类型之间没有双射,但可以将Arrow模式作为Parquet文件元数据的一部分进行序列化。这是通过启用ArrowWriterProperties::store_schema()来实现的。

在读取路径上,将自动识别序列化的模式,并将重新创建原始的Arrow数据,根据需要转换Parquet数据(例如,如果在写入文件时启用了ArrowWriterProperties::store_schema(),则将从Parquet LIST类型重新创建Arrow LargeList)。

例如,当将Arrow LargeList序列化为Parquet时:

数据被写出为Parquet LIST

在读取时,如果在写入文件时启用了ArrowWriterProperties::store_schema(),则Parquet LIST数据将被解码为Arrow LargeList;否则,它将被解码为Arrow List。

序列化细节 

Arrow模式被序列化为Arrow IPC模式消息,然后进行base64编码,并存储在Parquet文件元数据中的ARROW:schema元数据键下。

限制 

不支持写入或读取带有空条目的FixedSizeList数据。

加密 

Parquet C++实现了加密规范中指定的所有功能,但不支持列索引和布隆过滤器模块的加密。

具体而言,Parquet C++支持:

  • AES_GCM_V1和AES_GCM_CTR_V1加密算法。

  • Footer、ColumnMetaData、Data Page、Dictionary Page、Data PageHeader、Dictionary PageHeader模块类型的AAD后缀。不支持其他模块类型(ColumnIndex、OffsetIndex、BloomFilter Header、BloomFilter Bitset)。

  • EncryptionWithFooterKey和EncryptionWithColumnKey模式。

  • 加密Footer和明文Footer模式。

其他 

截图 2023-10-16 21-10-50.png

(1)提供对列和偏移索引结构的访问,但数据读取API目前不会使用它们。

(2)提供用于创建、序列化和反序列化布隆过滤器的API,但它们尚未集成到数据读取API中。


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/356
上一篇:arrow系列22---读写ORC文件
下一篇:arrow系列24---读写csv文件