arrow系列26---表格数据集1(Tabular Datasets)
作者:yunjinqi   类别:    日期:2023-10-17 09:32:29    阅读:469 次   消耗积分:0 分    

Tabular Datasets 

另请参阅 数据集 API 参考

警告

arrow::dataset 命名空间是实验性的,尚未保证稳定 API。

Arrow Datasets 库提供了与表格数据、可能超出内存范围以及多文件数据集高效工作的功能。这包括:

  • 支持不同源和文件格式以及不同文件系统(本地、云)的统一接口。

  • 发现源(爬行目录、处理具有各种分区方案的数据集、基本模式规范化等)。

  • 通过谓词下推(过滤行)、投影(选择和导出列)和可选并行读取进行优化。

目前支持的文件格式包括 Parquet、Feather / Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,尚不能写入)。目标是将来扩展对其他文件格式和数据源(例如数据库连接)的支持。

读取数据集 

在下面的示例中,让我们创建一个由两个 Parquet 文件组成的目录的小数据集:

// 为本示例的其余部分生成一些数据。
arrow::Result<std::shared_ptrarrow::Table> CreateTable() {
auto schema =
arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
arrow::field("c", arrow::int64())});
std::shared_ptrarrow::Array array_a;
std::shared_ptrarrow::Array array_b;
std::shared_ptrarrow::Array array_c;
arrow::NumericBuilderarrow::Int64Type builder;
ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
builder.Reset();
ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
builder.Reset();
ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
return arrow::Table::Make(schema, {array_a, array_b, array_c});
}
// 通过写入两个 Parquet 文件设置数据集。
arrow::Resultstd::string CreateExampleParquetDataset(
const std::shared_ptrfs::FileSystem& filesystem, const std::string& root_path) {
auto base_path = root_path + "/parquet_dataset";
ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
// 创建 Arrow 表
ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
// 将其写入两个 Parquet 文件
ARROW_ASSIGN_OR_RAISE(auto output,
filesystem->OpenOutputStream(base_path + "/data1.parquet"));
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
*table->Slice(0, 5), arrow::default_memory_pool(), output, /chunk_size=/2048));
ARROW_ASSIGN_OR_RAISE(output,
filesystem->OpenOutputStream(base_path + "/data2.parquet"));
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
*table->Slice(5), arrow::default_memory_pool(), output, /chunk_size=/2048));
return base_path;
}

(查看底部的完整示例:有关事务和 ACID 保证的注释。)

数据集发现 

可以使用不同的 arrow::dataset::DatasetFactory 对象创建 arrow::dataset::Dataset 对象。在这里,我们将使用 arrow::dataset::FileSystemDatasetFactory,它可以根据基本目录路径创建数据集:

// Read the whole dataset with the given format, without partitioning.
arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  // Create a dataset by scanning the filesystem for files
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  ARROW_ASSIGN_OR_RAISE(
      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                       ds::FileSystemFactoryOptions()));
  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
  // Print out the fragments
  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
  for (const auto& fragment : fragments) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
  }
  // Read the entire dataset as a Table
  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
  return scanner->ToTable();
}

我们还将传递要使用的文件系统和要用于读取的文件格式。这使我们能够在(例如)读取本地文件或Amazon S3中的文件之间进行选择,或在 Parquet 和 CSV 之间进行选择。

除了搜索基本目录之外,我们还可以手动列出文件路径。

创建 arrow::dataset::Dataset 不会开始读取数据本身。它仅会遍历目录以查找所有文件(如果需要的话),这些文件可以使用 arrow::dataset::FileSystemDataset::files() 方法检索:

// 打印已遍历的文件(仅适用于 FileSystemDataset)
for (const auto& filename : dataset->files()) {
std::cout << filename << std::endl;
}

…并推断数据集的模式(默认从第一个文件推断):

std::cout << dataset->schema()->ToString() << std::endl;

使用 arrow::dataset::Dataset::NewScan() 方法,我们可以构建一个 arrow::dataset::Scanner 并将数据集(或其一部分)读入 arrow::Table,使用 arrow::dataset::Scanner::ToTable() 方法:

// 使用给定格式读取整个数据集,无需分区。
arrow::Result<std::shared_ptrarrow::Table> ScanWholeDataset(
const std::shared_ptrfs::FileSystem& filesystem,
const std::shared_ptrds::FileFormat& format, const std::string& base_dir) {
// 通过扫描文件系统以查找文件来创建数据集
fs::FileSelector selector;
selector.base_dir = base_dir;
ARROW_ASSIGN_OR_RAISE(
auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
ds::FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// 打印片段
ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
for (const auto& fragment : fragments) {
std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
}
// 作为表格读取整个数据集
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
return scanner->ToTable();
}

注意

根据数据集的大小,这可能需要大量内存;请参阅下面的关于过滤/投影的数据过滤部分。


阅读不同的文件格式 

上述示例使用本地磁盘上的 Parquet 文件,但 Dataset API 提供了一致的接口,支持多种文件格式和文件系统。 (有关后者的更多信息,请参见从云存储中读取。)目前支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;将来计划支持更多格式。

如果我们将表保存为 Feather 文件而不是 Parquet 文件:

// 设置一个通过写两个 Feather 文件来创建的数据集。
arrow::Resultstd::string CreateExampleFeatherDataset(
const std::shared_ptrfs::FileSystem& filesystem, const std::string& root_path) {
auto base_path = root_path + "/feather_dataset";
ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
// 创建 Arrow 表
ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
// 将其写入两个 Feather 文件
ARROW_ASSIGN_OR_RAISE(auto output,
filesystem->OpenOutputStream(base_path + "/data1.feather"));
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeFileWriter(output.get(), table->schema()));
ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_ASSIGN_OR_RAISE(output,
filesystem->OpenOutputStream(base_path + "/data2.feather"));
ARROW_ASSIGN_OR_RAISE(writer,
arrow::ipc::MakeFileWriter(output.get(), table->schema()));
ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
ARROW_RETURN_NOT_OK(writer->Close());
return base_path;
}

...然后,我们可以通过传递 arrow::dataset::IpcFileFormat 来读取 Feather 文件:

auto format = std::make_sharedds::ParquetFileFormat();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
.ValueOrDie();

自定义文件格式 

arrow::dataset::FileFormat 对象具有控制文件读取方式的属性。例如:

auto format = std::make_sharedds::ParquetFileFormat();
format->reader_options.dict_columns.insert("a");

将配置列 "a" 在读取时采用字典编码。类似地,设置 arrow::dataset::CsvFileFormat::parse_options 可以让我们更改例如读取逗号分隔或制表符分隔的数据等内容。

此外,通过传递 arrow::dataset::FragmentScanOptions 到 arrow::dataset::ScannerBuilder::FragmentScanOptions() 方法,可以对数据扫描进行细粒度控制。例如,对于 CSV 文件,我们可以更改在扫描时将哪些值转换为布尔值 true 和 false。

过滤数据 

到目前为止,我们已经读取整个数据集,但如果只需要数据的一部分,这可能会浪费时间或内存,读取不需要的数据。arrow::dataset::Scanner 提供了控制要读取哪些数据的选项。

在这段代码中,我们使用 arrow::dataset::ScannerBuilder::Project() 来选择要读取的列:

// 读取数据集,但仅选择列 "b" 并仅选择 b < 4 的行。
//
// 当您只需要数据集中的少数列时,这非常有用。在可能的情况下,
// 数据集将推送列选择,从而减少工作量。
arrow::Result<std::shared_ptrarrow::Table> FilterAndSelectDataset(
const std::shared_ptrfs::FileSystem& filesystem,
const std::shared_ptrds::FileFormat& format, const std::string& base_dir) {
fs::FileSelector selector;
selector.base_dir = base_dir;
ARROW_ASSIGN_OR_RAISE(
auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
ds::FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// 使用行筛选读取指定的列
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
return scanner->ToTable();
}

一些格式,如 Parquet,可以在这里减少 I/O 成本,只从文件系统中读取指定的列。

可以使用 arrow::dataset::ScannerBuilder::Filter() 提供过滤器,这样不符合过滤器谓词的行将不包括在返回的表中。同样,一些格式,如 Parquet,可以使用此过滤器减少所需的 I/O 量。

// 读取数据集,但仅选择列“b”和仅选择b < 4的行。
//
// 当您只需要数据集中的几列时,这很有用。在可能的情况下,
// 数据集将推送列选择,以减少工作量。
arrow::Result<std::shared_ptrarrow::Table> FilterAndSelectDataset(
const std::shared_ptrfs::FileSystem& filesystem,
const std::shared_ptrds::FileFormat& format, const std::string& base_dir) {
fs::FileSelector selector;
selector.base_dir = base_dir;
ARROW_ASSIGN_OR_RAISE(
auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
ds::FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// 使用行筛选读取指定列
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
return scanner->ToTable();
}

列投影 

除了选择列外,arrow::dataset::ScannerBuilder::Project()还可用于更复杂的投影,例如重命名列、将它们转换为其他类型,甚至根据评估表达式派生新列。

在这种情况下,我们传递了用于构建列值的表达式向量和列名称向量:

// 读取数据集,但进行列投影。
//
// 这对于从现有数据派生新列非常有用。例如,在这里
// 我们演示将列转换为不同类型,并根据谓词将数值列转换为布尔列。
// 您还可以重命名列或执行涉及多个列的计算。
arrow::Result<std::shared_ptrarrow::Table> ProjectDataset(
const std::shared_ptrfs::FileSystem& filesystem,
const std::shared_ptrds::FileFormat& format, const std::string& base_dir) {
fs::FileSelector selector;
selector.base_dir = base_dir;
ARROW_ASSIGN_OR_RAISE(
auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
ds::FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// 使用行筛选读取指定列
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
ARROW_RETURN_NOT_OK(scan_builder->Project(
{
// 保留列“a”不变。
cp::field_ref("a"),
// 将列“b”转换为float32。
cp::call("cast", {cp::field_ref("b")},
arrow::compute::CastOptions::Safe(arrow::float32())),
// 从“c”派生布尔列。
cp::equal(cp::field_ref("c"), cp::literal(1)),
},
{"a_renamed", "b_as_float32", "c_1"}));
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
return scanner->ToTable();
}

这还确定了列选择;在生成的表中仅会包含给定的列。如果要在现有列之外包含派生列,可以根据数据集模式构建表达式:

// 读取数据集,但进行列投影。
//
// 这一次,我们读取所有原始列以及一个派生列。这只是结合了
// 前面两个示例:按名称选择列的子集,以及使用表达式派生新列。
arrow::Result<std::shared_ptrarrow::Table> SelectAndProjectDataset(
const std::shared_ptrfs::FileSystem& filesystem,
the shared_ptrds::FileFormat& format, the std::string& base_dir) {
fs::FileSelector selector;
selector.base_dir = base_dir;
ARROW_ASSIGN_OR_RAISE(
auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
ds::FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// 使用行筛选读取指定列
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
std::vectorstd::string names;
std::vectorcp::Expression exprs;
// 读取所有原始列。
for (const auto& field : dataset->schema()->fields()) {
names.push_back(field->name());
exprs.push_back(cp::field_ref(field->name()));
}
// 同样派生一个新列。
names.emplace_back("b_large");
exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
return scanner->ToTable();
}

注意

在组合筛选和投影时,Arrow 将确定需要读取的所有必要列。例如,如果对不会最终被选择的列进行筛选,Arrow 仍然会读取列以评估筛选条件。



版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/359
上一篇:arrow系列25---读取json文件
下一篇:arrow系列27---表格数据集2(Tabular Datasets)