arrow系列10 --- arrow数据集(Arrow Datasets)
作者:yunjinqi   类别:    日期:2023-10-15 15:38:46    阅读:278 次   消耗积分:0 分    

Arrow Datasets 

Arrow C++提供了数据集(Datasets)的概念和实现,用于处理分段的数据,这些数据可以超出内存大小,可能是由于生成大量数据、从流中读取数据或在磁盘上具有大型文件。在本文中,您将学到如何:

  • 读取一个多文件分区数据集并将其放入一个Table中。

  • 从Table中写出一个分区数据集。

先决条件 

在继续之前,请确保您具备以下条件:

  • 已安装Arrow,您可以在此处设置:在您自己的项目中使用Arrow C++。

  • 对Arrow的基本数据结构有一定的了解,可参考基本的Arrow数据结构。

为了更好地理解,可能还有必要阅读Arrow文件I/O。但这不是必需的。

设置 

在运行一些计算之前,我们需要填补一些空白:

  • 我们需要包括必要的头文件。

  • 需要一个main()函数来将所有部分连接在一起。

  • 我们需要一些磁盘上的数据来进行操作。

包括 

在编写C++代码之前,我们需要一些包括。我们将引入用于输出的iostream,然后导入parquet和arrow,以便在本文中使用的每种文件类型:

#include <arrow/api.h>
#include <arrow/dataset/api.h>
// 我们使用Parquet头文件来设置示例;它们不是使用数据集所必需的。
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>

#include <iostream>

主函数 

为了将所有部分连接在一起,我们将使用先前介绍的基本数据结构教程中的main()模式:

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

与之前的用法类似,此主函数与RunMain()函数配对使用:

arrow::Status RunMain() {
    return arrow::Status::OK();
}


生成用于读取的文件 

我们需要一些实际操作的文件。在实践中,您可能会有一些用于自己的应用程序的输入数据。然而,在这里,我们希望探索而不需要提供或查找数据集,因此让我们生成一些数据以便于后续的演示。可以阅读下面的内容,但本文将更详细地讨论这些概念。现在,只需将其复制并意识到最后生成了一个分区数据集在磁盘上:

// 为本示例生成一些数据。
arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
  // 此代码应该看起来与基本的Arrow示例相似,不是本示例的重点。然而,我们需要数据来处理,这就是数据生成的目的!
  auto schema =
      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
                     arrow::field("c", arrow::int64())});
  std::shared_ptr<arrow::Array> array_a;
  std::shared_ptr<arrow::Array> array_b;
  std::shared_ptr<arrow::Array> array_c;
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
  return arrow::Table::Make(schema, {array_a, array_b, array_c});
}

// 通过写入两个Parquet文件来设置一个数据集。
arrow::Result<std::string> CreateExampleParquetDataset(
    const std::shared_ptr<arrow::fs::FileSystem>& filesystem,
    const std::string& root_path) {
  // 与CreateTable()类似,这是帮助我们读取的数据集。不用担心,我们在示例中也会写入一个数据集。
  auto base_path = root_path + "parquet_dataset";
  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
  // 创建一个Arrow Table
  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
  // 将其写入两个Parquet文件
  ARROW_ASSIGN_OR_RAISE(auto output,
                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
      *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048));
  ARROW_ASSIGN_OR_RAISE(output,
                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
      *table->Slice(5), arrow::default_memory_pool(), output, 2048));
  return base_path;
}

arrow::Status PrepareEnv() {
  // 通过设置一些快速的写入来为我们的环境准备好以供读取。
  ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable())
  std::shared_ptr<arrow::fs::FileSystem> setup_fs;
  // 请注意,此操作在可执行文件构建的目录中执行。
  char setup_path[256];
  char* result = getcwd(setup_path, 256);
  if (result == NULL) {
    return arrow::Status::IOError("获取PWD失败。");
  }

  ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path));
  ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, ""));

  return arrow::Status::OK();
}

为了确实生成这些文件,确保RunMain()函数中的第一个调用是我们的辅助函数PrepareEnv(),它将为我们提供磁盘上的一个数据集:

  ARROW_RETURN_NOT_OK(PrepareEnv());

读取分区数据集 

读取数据集与读取单个文件是不同的任务。这个任务需要比读取单个文件更多的工作,因为它需要能够解析多个文件和/或文件夹。这个过程可以分为以下步骤:

  • 获取用于本地文件系统的fs::FileSystem对象

  • 创建一个fs::FileSelector并使用它来准备一个dataset::FileSystemDatasetFactory

  • 使用dataset::FileSystemDatasetFactory构建一个dataset::Dataset

  • 使用dataset::Scanner读入一个Table

为了开始,我们需要能够与本地文件系统进行交互。为了做到这一点,我们需要一个 fs::FileSystem 对象。fs::FileSystem 是一个抽象,允许我们使用相同的接口,无论是使用Amazon S3、Google Cloud Storage还是本地磁盘,而我们将使用本地磁盘。所以,让我们声明它:

// 首先,我们需要一个文件系统对象,它允许我们与本地文件系统交互,从给定的路径开始。为了简单起见,这将是当前目录。
std::shared_ptr<arrow::fs::FileSystem> fs;

在本例中,我们的文件系统的基本路径将存在于与可执行文件相同的目录中。fs::FileSystemFromUriOrPath() 允许我们获取适用于支持的文件系统类型的 fs::FileSystem 对象。在这里,我们只需传递我们的路径:

// 获取当前工作目录并使用它来创建FileSystem对象。
char init_path[256];
char* result = getcwd(init_path, 256);
if (result == NULL) {
  return arrow::Status::IOError("获取PWD失败。");
}
ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));

创建一个 FileSystemDatasetFactory:

// 获取一个数据集::Dataset的时候需要准备一个数据集::FileSystemDatasetFactory。
//这是一个很长但非常描述性的名称 - 它将为我们提供一个从我们的fs::FileSystem获取数据的工厂。
//首先,我们通过填充一个数据集::FileSystemFactoryOptions结构来配置它:

// 创建一个选项对象允许我们配置数据集读取。
arrow::dataset::FileSystemFactoryOptions options;
// 我们将使用Hive风格的分区。我们将让Arrow Datasets推断分区模式。我们不会设置其他选项,使用默认值就足够了。
options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();

有很多文件格式可用,我们必须选择一个在实际读取时会被期望的文件格式。由于磁盘上有Parquet,所以当我们进行读取时,当然会要求Parquet:

auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();

设置好 fs::FileSystem、fs::FileSelector、选项和文件格式后,我们可以创建 FileSystemDatasetFactory。这只需要将我们准备的所有内容传递并分配给一个变量:

// 现在,我们得到一个工厂,允许我们获取我们的数据集 - 我们还没有数据集!
ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
                                      fs, selector, read_format, options));

使用工厂构建数据集:

// 现在,我们从工厂构建数据集。
ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());

现在,我们在内存中有一个 Dataset 对象。这并不意味着整个数据集都在内存中,但我们现在可以访问工具,允许我们探索和使用磁盘上的数据集。例如,我们可以获取构成整个数据集的片段(文件),并打印出来,以及一些小信息:

// 打印出片段
ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments());
for (const auto& fragment : fragments) {
  std::cout << "找到片段:" << (*fragment)->ToString() << std::endl;
  std.	cout << "分区表达式:" << (*fragment)->partition_expression().ToString() << std::endl;
}

将数据集移入表格:

// 一个处理数据集的方法是将它们转换为表格,其中我们可以对表格进行的操作与之前我们了解的操作相同。
// 将数据集扫描为表格,完成后,您可以对其执行常规表格操作,如计算和打印。不过,现在您也需要在内存中运行。
ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan());
  // (文档部分: 读取扫描生成器)
  // (文档部分: 读取扫描器)
ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish());
// 获取数据集进入表格
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable());
std::cout << table->ToString();

从表格将数据集写入磁盘:

// 写入数据集::Dataset与写入单个文件不同。这项任务比写入单个文件需要更多的工作,因为需要能够处理跨多个文件和文件夹的分区方案。这个过程可以分为以下步骤:

// 准备TableBatchReader

// 创建一个数据集::Scanner来从TableBatchReader中提取数据

// 准备模式、分区和文件格式选项

// 配置数据集::FileSystemDatasetWriteOptions - 一个配置写入函数的结构

// 将数据集写入磁盘

// 准备从表格写入的数据

// 我们有一个表格,想要将数据集::Dataset写入磁盘。事实上,为了进行探索,我们将使用不同的分区方案,而不仅仅是像原始片段那样划分为两半,
//而是根据每行在 "a" 列中的值进行分区。

// 为了开始这项任务,让我们获取一个TableBatchReader!这样很容易将数据写入数据集,并且可以在其他地方使用,每当需要将表格分解成RecordBatches的流时,
//我们可以使用它。在这里,我们只需使用TableBatchReader的构造函数,传入我们的表格:

// 获取一个Table以进行写入准备
std::shared_ptr<arrow::TableBatchReader> write_dataset =
std::make_shared<arrow::TableBatchReader>(table);

创建用于移动表格数据的 Scanner:

// 写入数据集::Dataset的过程,一旦有数据源可用,与读取相反。在之前,我们使用了一个数据集::Scanner,以便将数据扫描到表格中 - 现在,我们需要一个数据集::Scanner,以从我们的TableBatchReader中读取数据。为了获取数据集::Scanner,我们将基于我们的TableBatchReader创建一个数据集::ScannerBuilder,然后使用该构建器构建数据集::Scanner:

auto write_scanner_builder =
  arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset);
ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())

准备模式、分区和文件格式选项:

// 由于我们想根据 "a" 列分区,我们需要声明这一点。在定义我们的分区模式时,我们只有一个包含 "a" 的字段:

// 分区模式确定用于分区的键是哪些字段。
auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});

选择分区算法,我们将再次使用 Hive 风格分区,这次将模式传递给它作为配置:

// 我们将使用 Hive 风格分区,这将创建带有 "key=value" 对的目录。
auto partitioning = std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);

选择文件格式,通常使用 Parquet:

// 现在,我们声明我们将写入Parquet文件。
auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();

配置 FileSystemDatasetWriteOptions:

// 为了将数据写入磁盘,我们需要一些配置。我们将通过在一个`dataset::FileSystemDatasetWriteOptions`结构中设置值来完成这项任务。我们将尽量使用默认值进行初始化:

// 这一次,我们创建用于写入的选项,但会进行更多配置。
arrow::dataset::FileSystemDatasetWriteOptions write_options;
// 默认值开始。
write_options.file_write_options = write_format->DefaultWriteOptions();

目标文件系统:

// 目标是使用我们已经有的文件系统。
write_options.filesystem = fs;

目标目录:

// 写入到当前目录中的 "write_dataset" 文件夹。
write_options.base_dir = "write_dataset";

分区方法:

// 使用上面声明的分区。
write_options.partitioning = partitioning;

文件名称模板:

// 定义构成数据集的文件的名称。
write_options.basename_template = "part{i}.parquet";

处理已存在的数据的行为:

// 设置行为以覆盖现有数据 - 具体来说,这使得可以多次运行此示例,并允许您的代码覆盖已有的内容。
write_options.existing_data_behavior =
  arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;

将数据集写入磁盘:

// 一旦配置了`dataset::FileSystemDatasetWriteOptions`,并且准备好了`dataset::Scanner`来解析数据,我们可以将选项和`dataset::Scanner`传递给`dataset::FileSystemDataset::Write()`,将数据写入磁盘:

ARROW_RETURN_NOT_OK(
  arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));

最后,返回 Status::OK(),以便 main() 知道我们已经完成了,一切正常,就像前面的教程一样:

return arrow::Status::OK();

这样,您已经读取和写入了分区数据集!这种方法在进行任何支持的数据集格式时,经过一些配置,都可以工作。对于这样一个数据集的示例,纽约出租车数据集是一个众所周知的数据集,您可以在这里找到。

这意味着现在我们必须能够处理这些数据,而不需要一次将所有数据都加载到内存中。对于这一点,可以尝试使用 Acero。


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/343
上一篇:arrow系列9---arrow计算的小例子
下一篇:arrow系列11---datasets小例子的运行