Arrow Datasets
Arrow C++提供了数据集(Datasets)的概念和实现,用于处理分段的数据,这些数据可以超出内存大小,可能是由于生成大量数据、从流中读取数据或在磁盘上具有大型文件。在本文中,您将学到如何:
读取一个多文件分区数据集并将其放入一个Table中。
从Table中写出一个分区数据集。
先决条件
在继续之前,请确保您具备以下条件:
已安装Arrow,您可以在此处设置:在您自己的项目中使用Arrow C++。
对Arrow的基本数据结构有一定的了解,可参考基本的Arrow数据结构。
为了更好地理解,可能还有必要阅读Arrow文件I/O。但这不是必需的。
设置
在运行一些计算之前,我们需要填补一些空白:
我们需要包括必要的头文件。
需要一个main()函数来将所有部分连接在一起。
我们需要一些磁盘上的数据来进行操作。
包括
在编写C++代码之前,我们需要一些包括。我们将引入用于输出的iostream,然后导入parquet和arrow,以便在本文中使用的每种文件类型:
#include <arrow/api.h> #include <arrow/dataset/api.h> // 我们使用Parquet头文件来设置示例;它们不是使用数据集所必需的。 #include <parquet/arrow/reader.h> #include <parquet/arrow/writer.h> #include <iostream>
主函数
为了将所有部分连接在一起,我们将使用先前介绍的基本数据结构教程中的main()模式:
int main() { arrow::Status st = RunMain(); if (!st.ok()) { std::cerr << st << std::endl; return 1; } return 0; }
与之前的用法类似,此主函数与RunMain()函数配对使用:
arrow::Status RunMain() { return arrow::Status::OK(); }
生成用于读取的文件
我们需要一些实际操作的文件。在实践中,您可能会有一些用于自己的应用程序的输入数据。然而,在这里,我们希望探索而不需要提供或查找数据集,因此让我们生成一些数据以便于后续的演示。可以阅读下面的内容,但本文将更详细地讨论这些概念。现在,只需将其复制并意识到最后生成了一个分区数据集在磁盘上:
// 为本示例生成一些数据。 arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() { // 此代码应该看起来与基本的Arrow示例相似,不是本示例的重点。然而,我们需要数据来处理,这就是数据生成的目的! auto schema = arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), arrow::field("c", arrow::int64())}); std::shared_ptr<arrow::Array> array_a; std::shared_ptr<arrow::Array> array_b; std::shared_ptr<arrow::Array> array_c; arrow::NumericBuilder<arrow::Int64Type> builder; ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); ARROW_RETURN_NOT_OK(builder.Finish(&array_a)); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); ARROW_RETURN_NOT_OK(builder.Finish(&array_b)); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); ARROW_RETURN_NOT_OK(builder.Finish(&array_c)); return arrow::Table::Make(schema, {array_a, array_b, array_c}); } // 通过写入两个Parquet文件来设置一个数据集。 arrow::Result<std::string> CreateExampleParquetDataset( const std::shared_ptr<arrow::fs::FileSystem>& filesystem, const std::string& root_path) { // 与CreateTable()类似,这是帮助我们读取的数据集。不用担心,我们在示例中也会写入一个数据集。 auto base_path = root_path + "parquet_dataset"; ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); // 创建一个Arrow Table ARROW_ASSIGN_OR_RAISE(auto table, CreateTable()); // 将其写入两个Parquet文件 ARROW_ASSIGN_OR_RAISE(auto output, filesystem->OpenOutputStream(base_path + "/data1.parquet")); ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048)); ARROW_ASSIGN_OR_RAISE(output, filesystem->OpenOutputStream(base_path + "/data2.parquet")); ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *table->Slice(5), arrow::default_memory_pool(), output, 2048)); return base_path; } arrow::Status PrepareEnv() { // 通过设置一些快速的写入来为我们的环境准备好以供读取。 ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable()) std::shared_ptr<arrow::fs::FileSystem> setup_fs; // 请注意,此操作在可执行文件构建的目录中执行。 char setup_path[256]; char* result = getcwd(setup_path, 256); if (result == NULL) { return arrow::Status::IOError("获取PWD失败。"); } ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path)); ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, "")); return arrow::Status::OK(); }
为了确实生成这些文件,确保RunMain()函数中的第一个调用是我们的辅助函数PrepareEnv(),它将为我们提供磁盘上的一个数据集:
ARROW_RETURN_NOT_OK(PrepareEnv());
读取分区数据集
读取数据集与读取单个文件是不同的任务。这个任务需要比读取单个文件更多的工作,因为它需要能够解析多个文件和/或文件夹。这个过程可以分为以下步骤:
获取用于本地文件系统的fs::FileSystem对象
创建一个fs::FileSelector并使用它来准备一个dataset::FileSystemDatasetFactory
使用dataset::FileSystemDatasetFactory构建一个dataset::Dataset
使用dataset::Scanner读入一个Table
为了开始,我们需要能够与本地文件系统进行交互。为了做到这一点,我们需要一个 fs::FileSystem 对象。fs::FileSystem 是一个抽象,允许我们使用相同的接口,无论是使用Amazon S3、Google Cloud Storage还是本地磁盘,而我们将使用本地磁盘。所以,让我们声明它:
// 首先,我们需要一个文件系统对象,它允许我们与本地文件系统交互,从给定的路径开始。为了简单起见,这将是当前目录。 std::shared_ptr<arrow::fs::FileSystem> fs;
在本例中,我们的文件系统的基本路径将存在于与可执行文件相同的目录中。fs::FileSystemFromUriOrPath() 允许我们获取适用于支持的文件系统类型的 fs::FileSystem 对象。在这里,我们只需传递我们的路径:
// 获取当前工作目录并使用它来创建FileSystem对象。 char init_path[256]; char* result = getcwd(init_path, 256); if (result == NULL) { return arrow::Status::IOError("获取PWD失败。"); } ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));
创建一个 FileSystemDatasetFactory:
// 获取一个数据集::Dataset的时候需要准备一个数据集::FileSystemDatasetFactory。 //这是一个很长但非常描述性的名称 - 它将为我们提供一个从我们的fs::FileSystem获取数据的工厂。 //首先,我们通过填充一个数据集::FileSystemFactoryOptions结构来配置它: // 创建一个选项对象允许我们配置数据集读取。 arrow::dataset::FileSystemFactoryOptions options; // 我们将使用Hive风格的分区。我们将让Arrow Datasets推断分区模式。我们不会设置其他选项,使用默认值就足够了。 options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();
有很多文件格式可用,我们必须选择一个在实际读取时会被期望的文件格式。由于磁盘上有Parquet,所以当我们进行读取时,当然会要求Parquet:
auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
设置好 fs::FileSystem、fs::FileSelector、选项和文件格式后,我们可以创建 FileSystemDatasetFactory。这只需要将我们准备的所有内容传递并分配给一个变量:
// 现在,我们得到一个工厂,允许我们获取我们的数据集 - 我们还没有数据集! ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make( fs, selector, read_format, options));
使用工厂构建数据集:
// 现在,我们从工厂构建数据集。 ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());
现在,我们在内存中有一个 Dataset 对象。这并不意味着整个数据集都在内存中,但我们现在可以访问工具,允许我们探索和使用磁盘上的数据集。例如,我们可以获取构成整个数据集的片段(文件),并打印出来,以及一些小信息:
// 打印出片段 ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments()); for (const auto& fragment : fragments) { std::cout << "找到片段:" << (*fragment)->ToString() << std::endl; std. cout << "分区表达式:" << (*fragment)->partition_expression().ToString() << std::endl; }
将数据集移入表格:
// 一个处理数据集的方法是将它们转换为表格,其中我们可以对表格进行的操作与之前我们了解的操作相同。 // 将数据集扫描为表格,完成后,您可以对其执行常规表格操作,如计算和打印。不过,现在您也需要在内存中运行。 ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan()); // (文档部分: 读取扫描生成器) // (文档部分: 读取扫描器) ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish()); // 获取数据集进入表格 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable()); std::cout << table->ToString();
从表格将数据集写入磁盘:
// 写入数据集::Dataset与写入单个文件不同。这项任务比写入单个文件需要更多的工作,因为需要能够处理跨多个文件和文件夹的分区方案。这个过程可以分为以下步骤: // 准备TableBatchReader // 创建一个数据集::Scanner来从TableBatchReader中提取数据 // 准备模式、分区和文件格式选项 // 配置数据集::FileSystemDatasetWriteOptions - 一个配置写入函数的结构 // 将数据集写入磁盘 // 准备从表格写入的数据 // 我们有一个表格,想要将数据集::Dataset写入磁盘。事实上,为了进行探索,我们将使用不同的分区方案,而不仅仅是像原始片段那样划分为两半, //而是根据每行在 "a" 列中的值进行分区。 // 为了开始这项任务,让我们获取一个TableBatchReader!这样很容易将数据写入数据集,并且可以在其他地方使用,每当需要将表格分解成RecordBatches的流时, //我们可以使用它。在这里,我们只需使用TableBatchReader的构造函数,传入我们的表格: // 获取一个Table以进行写入准备 std::shared_ptr<arrow::TableBatchReader> write_dataset = std::make_shared<arrow::TableBatchReader>(table);
创建用于移动表格数据的 Scanner:
// 写入数据集::Dataset的过程,一旦有数据源可用,与读取相反。在之前,我们使用了一个数据集::Scanner,以便将数据扫描到表格中 - 现在,我们需要一个数据集::Scanner,以从我们的TableBatchReader中读取数据。为了获取数据集::Scanner,我们将基于我们的TableBatchReader创建一个数据集::ScannerBuilder,然后使用该构建器构建数据集::Scanner: auto write_scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset); ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())
准备模式、分区和文件格式选项:
// 由于我们想根据 "a" 列分区,我们需要声明这一点。在定义我们的分区模式时,我们只有一个包含 "a" 的字段: // 分区模式确定用于分区的键是哪些字段。 auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});
选择分区算法,我们将再次使用 Hive 风格分区,这次将模式传递给它作为配置:
// 我们将使用 Hive 风格分区,这将创建带有 "key=value" 对的目录。 auto partitioning = std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
选择文件格式,通常使用 Parquet:
// 现在,我们声明我们将写入Parquet文件。 auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
配置 FileSystemDatasetWriteOptions:
// 为了将数据写入磁盘,我们需要一些配置。我们将通过在一个`dataset::FileSystemDatasetWriteOptions`结构中设置值来完成这项任务。我们将尽量使用默认值进行初始化: // 这一次,我们创建用于写入的选项,但会进行更多配置。 arrow::dataset::FileSystemDatasetWriteOptions write_options; // 默认值开始。 write_options.file_write_options = write_format->DefaultWriteOptions();
目标文件系统:
// 目标是使用我们已经有的文件系统。 write_options.filesystem = fs;
目标目录:
// 写入到当前目录中的 "write_dataset" 文件夹。 write_options.base_dir = "write_dataset";
分区方法:
// 使用上面声明的分区。 write_options.partitioning = partitioning;
文件名称模板:
// 定义构成数据集的文件的名称。 write_options.basename_template = "part{i}.parquet";
处理已存在的数据的行为:
// 设置行为以覆盖现有数据 - 具体来说,这使得可以多次运行此示例,并允许您的代码覆盖已有的内容。 write_options.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;
将数据集写入磁盘:
// 一旦配置了`dataset::FileSystemDatasetWriteOptions`,并且准备好了`dataset::Scanner`来解析数据,我们可以将选项和`dataset::Scanner`传递给`dataset::FileSystemDataset::Write()`,将数据写入磁盘: ARROW_RETURN_NOT_OK( arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));
最后,返回 Status::OK(),以便 main() 知道我们已经完成了,一切正常,就像前面的教程一样:
return arrow::Status::OK();
这样,您已经读取和写入了分区数据集!这种方法在进行任何支持的数据集格式时,经过一些配置,都可以工作。对于这样一个数据集的示例,纽约出租车数据集是一个众所周知的数据集,您可以在这里找到。
这意味着现在我们必须能够处理这些数据,而不需要一次将所有数据都加载到内存中。对于这一点,可以尝试使用 Acero。
系统当前共有 404 篇文章