Tabular Datasets
另请参阅 数据集 API 参考
警告
arrow::dataset 命名空间是实验性的,尚未保证稳定 API。
Arrow Datasets 库提供了与表格数据、可能超出内存范围以及多文件数据集高效工作的功能。这包括:
支持不同源和文件格式以及不同文件系统(本地、云)的统一接口。
发现源(爬行目录、处理具有各种分区方案的数据集、基本模式规范化等)。
通过谓词下推(过滤行)、投影(选择和导出列)和可选并行读取进行优化。
目前支持的文件格式包括 Parquet、Feather / Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,尚不能写入)。目标是将来扩展对其他文件格式和数据源(例如数据库连接)的支持。
读取数据集
在下面的示例中,让我们创建一个由两个 Parquet 文件组成的目录的小数据集:
// 为本示例的其余部分生成一些数据。 arrow::Result<std::shared_ptrarrow::Table> CreateTable() { auto schema = arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), arrow::field("c", arrow::int64())}); std::shared_ptrarrow::Array array_a; std::shared_ptrarrow::Array array_b; std::shared_ptrarrow::Array array_c; arrow::NumericBuilderarrow::Int64Type builder; ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); ARROW_RETURN_NOT_OK(builder.Finish(&array_a)); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); ARROW_RETURN_NOT_OK(builder.Finish(&array_b)); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); ARROW_RETURN_NOT_OK(builder.Finish(&array_c)); return arrow::Table::Make(schema, {array_a, array_b, array_c}); } // 通过写入两个 Parquet 文件设置数据集。 arrow::Resultstd::string CreateExampleParquetDataset( const std::shared_ptrfs::FileSystem& filesystem, const std::string& root_path) { auto base_path = root_path + "/parquet_dataset"; ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); // 创建 Arrow 表 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable()); // 将其写入两个 Parquet 文件 ARROW_ASSIGN_OR_RAISE(auto output, filesystem->OpenOutputStream(base_path + "/data1.parquet")); ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *table->Slice(0, 5), arrow::default_memory_pool(), output, /chunk_size=/2048)); ARROW_ASSIGN_OR_RAISE(output, filesystem->OpenOutputStream(base_path + "/data2.parquet")); ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *table->Slice(5), arrow::default_memory_pool(), output, /chunk_size=/2048)); return base_path; }
(查看底部的完整示例:有关事务和 ACID 保证的注释。)
数据集发现
可以使用不同的 arrow::dataset::DatasetFactory 对象创建 arrow::dataset::Dataset 对象。在这里,我们将使用 arrow::dataset::FileSystemDatasetFactory,它可以根据基本目录路径创建数据集:
// Read the whole dataset with the given format, without partitioning. arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset( const std::shared_ptr<fs::FileSystem>& filesystem, const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { // Create a dataset by scanning the filesystem for files fs::FileSelector selector; selector.base_dir = base_dir; ARROW_ASSIGN_OR_RAISE( auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // Print out the fragments ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments()) for (const auto& fragment : fragments) { std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl; } // Read the entire dataset as a Table ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
我们还将传递要使用的文件系统和要用于读取的文件格式。这使我们能够在(例如)读取本地文件或Amazon S3中的文件之间进行选择,或在 Parquet 和 CSV 之间进行选择。
除了搜索基本目录之外,我们还可以手动列出文件路径。
创建 arrow::dataset::Dataset 不会开始读取数据本身。它仅会遍历目录以查找所有文件(如果需要的话),这些文件可以使用 arrow::dataset::FileSystemDataset::files() 方法检索:
// 打印已遍历的文件(仅适用于 FileSystemDataset) for (const auto& filename : dataset->files()) { std::cout << filename << std::endl; }
…并推断数据集的模式(默认从第一个文件推断):
std::cout << dataset->schema()->ToString() << std::endl;
使用 arrow::dataset::Dataset::NewScan() 方法,我们可以构建一个 arrow::dataset::Scanner 并将数据集(或其一部分)读入 arrow::Table,使用 arrow::dataset::Scanner::ToTable() 方法:
// 使用给定格式读取整个数据集,无需分区。 arrow::Result<std::shared_ptrarrow::Table> ScanWholeDataset( const std::shared_ptrfs::FileSystem& filesystem, const std::shared_ptrds::FileFormat& format, const std::string& base_dir) { // 通过扫描文件系统以查找文件来创建数据集 fs::FileSelector selector; selector.base_dir = base_dir; ARROW_ASSIGN_OR_RAISE( auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // 打印片段 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments()) for (const auto& fragment : fragments) { std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl; } // 作为表格读取整个数据集 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
注意
根据数据集的大小,这可能需要大量内存;请参阅下面的关于过滤/投影的数据过滤部分。
阅读不同的文件格式
上述示例使用本地磁盘上的 Parquet 文件,但 Dataset API 提供了一致的接口,支持多种文件格式和文件系统。 (有关后者的更多信息,请参见从云存储中读取。)目前支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;将来计划支持更多格式。
如果我们将表保存为 Feather 文件而不是 Parquet 文件:
// 设置一个通过写两个 Feather 文件来创建的数据集。 arrow::Resultstd::string CreateExampleFeatherDataset( const std::shared_ptrfs::FileSystem& filesystem, const std::string& root_path) { auto base_path = root_path + "/feather_dataset"; ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); // 创建 Arrow 表 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable()); // 将其写入两个 Feather 文件 ARROW_ASSIGN_OR_RAISE(auto output, filesystem->OpenOutputStream(base_path + "/data1.feather")); ARROW_ASSIGN_OR_RAISE(auto writer, arrow::ipc::MakeFileWriter(output.get(), table->schema())); ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5))); ARROW_RETURN_NOT_OK(writer->Close()); ARROW_ASSIGN_OR_RAISE(output, filesystem->OpenOutputStream(base_path + "/data2.feather")); ARROW_ASSIGN_OR_RAISE(writer, arrow::ipc::MakeFileWriter(output.get(), table->schema())); ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5))); ARROW_RETURN_NOT_OK(writer->Close()); return base_path; }
...然后,我们可以通过传递 arrow::dataset::IpcFileFormat 来读取 Feather 文件:
auto format = std::make_sharedds::ParquetFileFormat(); // ... auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options) .ValueOrDie();
自定义文件格式
arrow::dataset::FileFormat 对象具有控制文件读取方式的属性。例如:
auto format = std::make_sharedds::ParquetFileFormat(); format->reader_options.dict_columns.insert("a");
将配置列 "a" 在读取时采用字典编码。类似地,设置 arrow::dataset::CsvFileFormat::parse_options 可以让我们更改例如读取逗号分隔或制表符分隔的数据等内容。
此外,通过传递 arrow::dataset::FragmentScanOptions 到 arrow::dataset::ScannerBuilder::FragmentScanOptions() 方法,可以对数据扫描进行细粒度控制。例如,对于 CSV 文件,我们可以更改在扫描时将哪些值转换为布尔值 true 和 false。
过滤数据
到目前为止,我们已经读取整个数据集,但如果只需要数据的一部分,这可能会浪费时间或内存,读取不需要的数据。arrow::dataset::Scanner 提供了控制要读取哪些数据的选项。
在这段代码中,我们使用 arrow::dataset::ScannerBuilder::Project() 来选择要读取的列:
// 读取数据集,但仅选择列 "b" 并仅选择 b < 4 的行。 // // 当您只需要数据集中的少数列时,这非常有用。在可能的情况下, // 数据集将推送列选择,从而减少工作量。 arrow::Result<std::shared_ptrarrow::Table> FilterAndSelectDataset( const std::shared_ptrfs::FileSystem& filesystem, const std::shared_ptrds::FileFormat& format, const std::string& base_dir) { fs::FileSelector selector; selector.base_dir = base_dir; ARROW_ASSIGN_OR_RAISE( auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // 使用行筛选读取指定的列 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); ARROW_RETURN_NOT_OK(scan_builder->Project({"b"})); ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4)))); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
一些格式,如 Parquet,可以在这里减少 I/O 成本,只从文件系统中读取指定的列。
可以使用 arrow::dataset::ScannerBuilder::Filter() 提供过滤器,这样不符合过滤器谓词的行将不包括在返回的表中。同样,一些格式,如 Parquet,可以使用此过滤器减少所需的 I/O 量。
// 读取数据集,但仅选择列“b”和仅选择b < 4的行。 // // 当您只需要数据集中的几列时,这很有用。在可能的情况下, // 数据集将推送列选择,以减少工作量。 arrow::Result<std::shared_ptrarrow::Table> FilterAndSelectDataset( const std::shared_ptrfs::FileSystem& filesystem, const std::shared_ptrds::FileFormat& format, const std::string& base_dir) { fs::FileSelector selector; selector.base_dir = base_dir; ARROW_ASSIGN_OR_RAISE( auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // 使用行筛选读取指定列 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); ARROW_RETURN_NOT_OK(scan_builder->Project({"b"})); ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4)))); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
列投影
除了选择列外,arrow::dataset::ScannerBuilder::Project()还可用于更复杂的投影,例如重命名列、将它们转换为其他类型,甚至根据评估表达式派生新列。
在这种情况下,我们传递了用于构建列值的表达式向量和列名称向量:
// 读取数据集,但进行列投影。 // // 这对于从现有数据派生新列非常有用。例如,在这里 // 我们演示将列转换为不同类型,并根据谓词将数值列转换为布尔列。 // 您还可以重命名列或执行涉及多个列的计算。 arrow::Result<std::shared_ptrarrow::Table> ProjectDataset( const std::shared_ptrfs::FileSystem& filesystem, const std::shared_ptrds::FileFormat& format, const std::string& base_dir) { fs::FileSelector selector; selector.base_dir = base_dir; ARROW_ASSIGN_OR_RAISE( auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // 使用行筛选读取指定列 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); ARROW_RETURN_NOT_OK(scan_builder->Project( { // 保留列“a”不变。 cp::field_ref("a"), // 将列“b”转换为float32。 cp::call("cast", {cp::field_ref("b")}, arrow::compute::CastOptions::Safe(arrow::float32())), // 从“c”派生布尔列。 cp::equal(cp::field_ref("c"), cp::literal(1)), }, {"a_renamed", "b_as_float32", "c_1"})); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
这还确定了列选择;在生成的表中仅会包含给定的列。如果要在现有列之外包含派生列,可以根据数据集模式构建表达式:
// 读取数据集,但进行列投影。 // // 这一次,我们读取所有原始列以及一个派生列。这只是结合了 // 前面两个示例:按名称选择列的子集,以及使用表达式派生新列。 arrow::Result<std::shared_ptrarrow::Table> SelectAndProjectDataset( const std::shared_ptrfs::FileSystem& filesystem, the shared_ptrds::FileFormat& format, the std::string& base_dir) { fs::FileSelector selector; selector.base_dir = base_dir; ARROW_ASSIGN_OR_RAISE( auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // 使用行筛选读取指定列 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); std::vectorstd::string names; std::vectorcp::Expression exprs; // 读取所有原始列。 for (const auto& field : dataset->schema()->fields()) { names.push_back(field->name()); exprs.push_back(cp::field_ref(field->name())); } // 同样派生一个新列。 names.emplace_back("b_large"); exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1))); ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names)); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
注意
在组合筛选和投影时,Arrow 将确定需要读取的所有必要列。例如,如果对不会最终被选择的列进行筛选,Arrow 仍然会读取列以评估筛选条件。