Apache Arrow提供了文件I/O功能,以便从应用程序的开始到结束使用Arrow。在本文中,您将学习:
将Arrow文件读入RecordBatch,然后再将其写出
将CSV文件读入Table,然后再将其写出
将Parquet文件读入Table,然后再将其写出
先决条件
在继续之前,请确保您具备以下条件:
安装了Arrow,您可以在此处设置:在您自己的项目中使用Arrow C++
了解来自基本Arrow数据结构的基本Arrow数据结构
拥有一个用于运行最终应用程序的目录 - 该程序将生成一些文件,因此请做好准备。
设置
在进行文件I/O之前,我们需要填补一些空白:
我们需要包含必要的头文件。
需要一个主函数(main())来将所有内容粘合在一起。
我们需要一些文件来进行操作。
包含 在编写C++代码之前,我们需要一些包含文件。我们将包含输出所需的iostream,然后为本文中将使用的每种文件类型导入Arrow的I/O功能:
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <iostream>
主函数(main())
作为粘合剂,我们将使用前一篇关于数据结构的教程中的main()模式:
int main() {
arrow::Status st = RunMain();
if (!st.ok()) {
std::cerr << st << std::endl;
return 1;
}
return 0;
}
和之前一样,这将与RunMain()一起使用:
arrow::Status RunMain() {
return arrow::Status::OK()
}
生成用于读取的文件
我们需要一些文件来进行实际操作。在实践中,您可能会有自己应用程序的输入。然而,在这里,我们只是为了探索I/O而生成一些文件,以便更容易跟踪。
为了创建这些文件,我们将定义一个帮助函数,首先运行它。随时阅读此内容,但稍后的文章中将解释使用的概念。请注意,我们使用了先前教程中的日期/月份/年份数据。现在,只需复制该函数:
(以下是生成文件的帮助函数,可以稍后使用)
arrow::Status GenInitialFile() {
// 创建一些8位整数数组和一个16位整数数组 - 就像
// 基本的Arrow示例一样。
arrow::Int8Builder int8builder;
int8_t days_raw[5] = {1, 12, 17, 23, 28};
ARROW_RETURN_NOT_OK(int8builder.AppendValues(days_raw, 5));
std::shared_ptr<arrow::Array> days;
ARROW_ASSIGN_OR_RAISE(days, int8builder.Finish());
int8_t months_raw[5] = {1, 3, 5, 7, 1};
ARROW_RETURN_NOT_OK(int8builder.AppendValues(months_raw, 5));
std::shared_ptr<arrow::Array> months;
ARROW_ASSIGN_OR_RAISE(months, int8builder.Finish());
arrow::Int16Builder int16builder;
int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
ARROW_RETURN_NOT_OK(int16builder.AppendValues(years_raw, 5));
std::shared_ptr<arrow::Array> years;
ARROW_ASSIGN_OR_RAISE(years, int16builder.Finish());
// 获取包含我们数组的向量
std::vector<std::shared_ptr<arrow::Array>> columns = {days, months, years};
// 创建用于初始化Table的模式
std::shared_ptr<arrow::Field> field_day, field_month, field_year;
std::shared_ptr<arrow::Schema> schema;
field_day = arrow::field("Day", arrow::int8());
field_month = arrow::field("Month", arrow::int8());
field_year = arrow::field("Year", arrow::int16());
schema = arrow::schema({field_day, field_month, field_year});
// 使用模式和数据,创建Table
std::shared_ptr<arrow::Table> table;
table = arrow::Table::Make(schema, columns);
// 将示例文件写入IPC、CSV和Parquet以供使用。
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.arrow"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchWriter> ipc_writer,
arrow::ipc::MakeFileWriter(outfile, schema));
ARROW_RETURN_NOT_OK(ipc_writer->WriteTable(*table));
ARROW_RETURN_NOT_OK(ipc_writer->Close());
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.csv"));
ARROW_ASSIGN_OR_RAISE(auto csv_writer,
arrow::csv::MakeCSVWriter(outfile, table->schema()));
ARROW_RETURN_NOT_OK(csv_writer->WriteTable(*table));
ARROW_RETURN_NOT_OK(csv_writer->Close());
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.parquet"));
PARQUET_THROW_NOT_OK(
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 5));
return arrow::Status::OK();
}
为了获取其余代码所需的文件,确保在RunMain()的第一行调用GenInitialFile()以初始化环境:
// 生成每种格式的初始文件的帮助函数 - 别担心,
// 我们也将在本示例中写入一个表。
ARROW_RETURN_NOT_OK(GenInitialFile());
Arrow文件I/O
我们将逐步进行读取然后写入,如下所示:
读取文件
打开文件
将文件绑定到ipc::RecordBatchFileReader
从文件中读取到RecordBatch
写文件
获取io::FileOutputStream
从RecordBatch写入文件
打开文件 要实际读取文件,我们需要获取某种指向文件的方式。在Arrow中,这意味着我们将获得一个io::ReadableFile对象 - 就像ArrayBuilder可以清除并创建新数组一样,我们可以将其重新分配给新文件,因此我们将在示例中使用此实例:
// 首先,我们必须设置一个ReadableFile对象,它只是让我们指向磁盘上正确数据的读取器。我们将重用这个对象,并将其重新绑定到多个文件中。
std::shared_ptr<arrow::io::ReadableFile> infile;
单独使用io::ReadableFile并不多 - 我们实际上将它绑定到文件并使用io::ReadableFile::Open()。对于我们的目的,使用默认参数就足够了:
// 获取"test_in.arrow"到我们的文件指针
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(
"test_in.arrow", arrow::default_memory_pool()));
打开Arrow文件阅读器
io::ReadableFile太通用,无法提供读取Arrow文件所需的所有功能。我们需要使用它来获取ipc::RecordBatchFileReader对象。此对象实现了正确格式的Arrow文件所需的所有逻辑。我们可以通过ipc::RecordBatchFileReader::Open()来获取一个
// 使用该读取器,我们可以读取Record Batches。请注意,这仅适用于IPC;对于其他格式,我们专注于Table,但在这里,使用RecordBatches。
std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc_reader;
ARROW_ASSIGN_OR_RAISE(ipc_reader, arrow::ipc::RecordBatchFileReader::Open(infile));
从已打开的Arrow文件读取到RecordBatch 要读取Arrow文件,我们必须使用RecordBatch,因此我们将获取一个RecordBatch。一旦获取了它,我们就可以实际读取文件。Arrow文件可以包含多个RecordBatches,因此我们必须传递一个索引。此文件只有一个RecordBatch,因此传递0:
// 使用读取器,我们可以读取Record Batches。请注意,这仅适用于IPC;对于其他格式,我们专注于Table,但在这里,使用RecordBatches。
std::shared_ptr<arrow::RecordBatch> rbatch;
ARROW_ASSIGN_OR_RAISE(rbatch, ipc_reader->ReadRecordBatch(0));
准备一个FileOutputStream
对于输出,我们需要一个io::FileOutputStream。与我们的io::ReadableFile一样,我们将重复使用它,所以做好准备。打开文件的方式与读取时相同:
// 就像输入一样,我们获取一个输出文件对象。
std::shared_ptr<arrow::io::FileOutputStream> outfile;
// 将其绑定到 "test_out.arrow"
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.arrow"));
从RecordBatch中写Arrow文件 现在,我们获取了先前读入的RecordBatch,并将其与目标文件一起使用,以创建一个ipc::RecordBatchWriter。
ipc::RecordBatchWriter需要两个东西:
目标文件
我们的RecordBatch的Schema(以防需要写入相同格式的多个RecordBatches)
Schema来自我们现有的RecordBatch,目标文件是我们刚刚创建的输出流。
// 使用输出文件和Schema来设置写入器!我们在这里定义了一切,准备开始。
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchWriter> ipc_writer,
arrow::ipc::MakeFileWriter(outfile, rbatch->schema()));
我们可以直接调用ipc::RecordBatchWriter::WriteRecordBatch()来填充我们的文件:
// 写入RecordBatch。
ARROW_RETURN_NOT_OK(ipc_writer->WriteRecordBatch(*rbatch));
特别是对于IPC,写入器必须显式关闭,因为它预计可能会写入多个批次。要这样做:
// 特别是对于IPC,写入器需要明确关闭。
ARROW_RETURN_NOT_OK(ipc_writer->Close());
现在,我们已经读取并写入了IPC文件!
CSV文件I/O
我们将逐步进行这些操作,首先是读取,然后是写入,如下所示:
读取文件
打开文件
准备Table
使用csv::TableReader读取文件
写入文件
获取io::FileOutputStream
从Table写入文件
打开CSV文件
对于CSV文件,我们需要打开一个io::ReadableFile,就像对待Arrow文件一样,并重复使用之前的io::ReadableFile对象来完成:
// 将输入文件绑定到 "test_in.csv"
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("test_in.csv"));
准备Table CSV可以读取到一个Table,因此声明一个指向Table的指针:
std::shared_ptr<arrow::Table> csv_table;
读取CSV文件到Table
CSV读取器具有需要传递的选项结构 - 幸运的是,对于这些选项,我们可以直接使用默认值。有关其他选项的参考,请转到这里:文件格式。由于没有特殊的分隔符,而且文件很小,因此我们可以使用默认值创建读取器:
// CSV读取器有用于各种选项的几个对象。现在,我们将使用默认值。
ARROW_ASSIGN_OR_RAISE(
auto csv_reader,
arrow::csv::TableReader::Make(
arrow::io::default_io_context(), infile, arrow::csv::ReadOptions::Defaults(),
arrow::csv::ParseOptions::Defaults(), arrow::csv::ConvertOptions::Defaults()));
有了CSV读取器,我们可以使用它的csv::TableReader::Read()方法来填充我们的Table:
// 读取Table。
ARROW_ASSIGN_OR_RAISE(csv_table, csv_reader->Read())
从Table写入CSV文件
将CSV写入Table的操作与将IPC写入RecordBatch的操作几乎相同,只是使用我们的Table,并使用ipc::RecordBatchWriter::WriteTable(),而不是ipc::RecordBatchWriter::WriteRecordBatch()。请注意,我们使用相同的写入器类 - 我们之所以使用ipc::RecordBatchWriter::WriteTable()是因为我们有一个Table。我们将定位到一个文件,使用我们Table的Schema,然后写入Table:
// 将输出文件绑定到 "test_out.csv"
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.csv"));
// CSV写入器具有更简单的默认值,如果需要更复杂的用法,请查看API文档。
ARROW_ASSIGN_OR_RAISE(auto csv_writer,
arrow::csv::MakeCSVWriter(outfile, csv_table->schema()));
ARROW_RETURN_NOT_OK(csv_writer->WriteTable(*csv_table));
// 不是必需的,但是一种安全的做法。
ARROW_RETURN_NOT_OK(csv_writer->Close());
现在,我们已经读取并写入了CSV文件!
使用Parquet进行文件I/O
我们将逐步进行这些操作,首先是读取,然后是写入,如下所示:
读取文件
打开文件
准备parquet::arrow::FileReader
读取文件到Table
写入文件
将Table写入文件
打开Parquet文件 再次,这种文件格式,Parquet,需要一个io::ReadableFile,我们已经有了,需要在文件上调用io::ReadableFile::Open()方法:
// 将输入文件绑定到 "test_in.parquet"
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("test_in.parquet"));
设置Parquet读取器
像以往一样,我们需要一个读取器来实际读取文件。我们一直在从Arrow命名空间中获取各种文件格式的读取器。这一次,我们进入Parquet命名空间以获取parquet::arrow::FileReader:
std::unique_ptr<parquet::arrow::FileReader> reader;
现在,为了设置我们的读取器,我们调用parquet::arrow::OpenFile()。是的,即使我们使用了io::ReadableFile::Open(),这仍是必要的。请注意,我们通过引用传递我们的parquet::arrow::FileReader,而不是将其分配给输出:
// 请注意,Parquet的OpenFile()方法通过引用接受读取器,而不是返回读取器。
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
从Parquet文件读取到Table
有了准备好的parquet::arrow::FileReader,我们可以将数据读取到Table中,除非我们必须通过引用传递Table而不是将其分配给输出:
std::shared_ptr<arrow::Table> parquet_table;
// 读取Table。
PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));
从Table写入Parquet文件
对于一次性写入,写入Parquet文件不需要编写器对象。相反,我们将提供Table,指向其必要内存消耗的内存池,告诉它在哪里写入以及如果需要在文件中拆分数据的块大小:
// 写入Parquet不需要声明编写器对象。只需获取输出文件绑定,然后传递Table,内存池,输出和块大小以在磁盘上拆分Table。
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(
*parquet_table, arrow::default_memory_pool(), outfile, 5));
结束程序 最后,我们只需返回Status::OK(),以便main()函数知道我们已经完成,一切都正常,就像在第一个教程中一样:
return arrow::Status::OK();
通过这种方式,您已经学会了在Arrow中读取和写入IPC、CSV和Parquet文件,可以正确加载数据并进行输出!现在,我们可以继续学习下一篇文章中的计算函数来处理数据。
系统当前共有 404 篇文章