arrow系列27---表格数据集2(Tabular Datasets)
作者:yunjinqi   类别:    日期:2023-10-17 19:14:17    阅读:478 次   消耗积分:0 分    

读写分区数据 

到目前为止,我们一直在处理包含平坦目录和文件的数据集。很多情况下,数据集将具有一个或多个经常进行筛选的列。与其读取然后筛选数据,通过将文件组织成嵌套的目录结构,我们可以定义一个分区数据集,其中子目录名称包含关于存储在该目录中的数据子集的信息。然后,我们可以使用这些信息更有效地筛选数据,以避免加载不匹配筛选条件的文件。

例如,按年份和月份分区的数据集可能具有以下布局:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上述的分区方案使用了类似于Apache Hive中的"/key=value/"目录名称。按照这种约定,位于dataset_name/year=2007/month=01/data0.parquet的文件仅包含year == 2007和month == 01的数据。

让我们创建一个小的分区数据集。为此,我们将使用Arrow的数据集写入功能。


// Set up a dataset by writing files with partitioning
arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto schema = arrow::schema(
      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
  arrow::StringBuilder string_builder;
  ARROW_RETURN_NOT_OK(
      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
  auto table = arrow::Table::Make(schema, arrays);
  // Write it using Datasets
  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());

  // The partition schema determines which fields are part of the partitioning.
  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
  // We'll write Parquet files.
  auto format = std::make_shared<ds::ParquetFileFormat>();
  ds::FileSystemDatasetWriteOptions write_options;
  write_options.file_write_options = format->DefaultWriteOptions();
  write_options.filesystem = filesystem;
  write_options.base_dir = base_path;
  write_options.partitioning = partitioning;
  write_options.basename_template = "part{i}.parquet";
  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
  return base_path;
}


上述操作创建了一个目录,其中包含两个子目录("part=a"和"part=b"),并且写入这些目录中的Parquet文件不再包括"part"列。

读取此数据集时,我们现在指定数据集应使用类似于Hive的分区方案:

// Read an entire dataset, but with partitioning information.
arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;  // Make sure to search subdirectories
  ds::FileSystemFactoryOptions options;
  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
  // schema.
  options.partitioning = ds::HivePartitioning::MakeFactory();
  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
                                          filesystem, selector, format, options));
  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
  // Print out the fragments
  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
  for (const auto& fragment : fragments) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
    std::cout << "Partition expression: "
              << (*fragment)->partition_expression().ToString() << std::endl;
  }
  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
  return scanner->ToTable();
}


尽管分区字段未包含在实际的Parquet文件中,但在扫描此数据集时,它们将被添加回结果表中:

./debug/dataset_documentation_example file:///tmp parquet_hive partitioned

Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
  -- field metadata --
  PARQUET:field_id: '1'
b: double
  -- field metadata --
  PARQUET:field_id: '2'
c: int64
  -- field metadata --
  PARQUET:field_id: '3'
part: string
----
# snip...

现在,我们可以根据分区键进行筛选,这样如果文件不匹配筛选条件,就不需要加载它们:

// Read an entire dataset, but with partitioning information. Also, filter the dataset on
// the partition values.
arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;
  ds::FileSystemFactoryOptions options;
  options.partitioning = ds::HivePartitioning::MakeFactory();
  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
                                          filesystem, selector, format, options));
  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
  // Filter based on the partition values. This will mean that we won't even read the
  // files whose partition expressions don't match the filter.
  ARROW_RETURN_NOT_OK(
      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
  return scanner->ToTable();
}

不同的分区方案 

上述示例使用类似于Hive的目录方案,例如"/year=2009/month=11/day=15"。我们通过传递Hive分区工厂来指定这一点。在这种情况下,分区键的类型是从文件路径中推断出来的。

也可以直接构建分区并明确定义分区键的模式。例如:

auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
arrow::field("year", arrow::int16()),
arrow::field("month", arrow::int8()),
arrow::field("day", arrow::int32())
}));

Arrow还支持另一种分区方案,即“目录分区”,其中文件路径中的段表示分区键的值,而不包括名称(字段名称在段的索引中是隐式的)。例如,给定字段名称“year”、“month”和“day”,一个路径可能是“/2019/11/15”。

由于文件路径中不包括名称,因此在构建目录分区时必须指定这些名称:

auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});

目录分区还支持提供完整模式,而不是从文件路径中推断类型。

分区性能考虑 

分区数据集涉及两个影响性能的方面:它增加了文件的数量,并在文件周围创建了目录结构。这两者都有好处和成本。根据配置和数据集的大小,成本可能会超过好处。

因为分区将数据集分成多个文件,所以可以使用并行读取和写入分区数据集。但是,每个额外的文件会在文件系统交互方面增加一些处理开销。它还增加了整个数据集的大小,因为每个文件都有一些共享的元数据。例如,每个Parquet文件包含模式和组级别统计信息。分区数量是文件数量的下限。如果将数据集按日期分区,一年的数据将至少有365个文件。如果进一步按另一个维度进行分区,具有1,000个唯一值,将有多达365,000个文件。这种细粒度的分区通常会导致主要由元数据组成的小文件。

分区数据集创建了嵌套的文件夹结构,允许我们修剪加载的扫描文件。但是,这会增加在数据集中查找文件的开销,因为我们需要递归地“列出目录”以查找数据文件。分区太细可以在这里引起问题:按日期分区一个年度数据需要365次列出调用以查找所有文件;如果再添加具有1,000个基数的另一列,将需要365,365次调用。

最佳的分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统,包括Arrow,应该适用于各种文件大小和分区布局,但应避免极端情况。以下几点建议有助于避免一些已知的最差情况:

避免小于20MB和大于2GB的文件。 避免具有超过10,000个不同分区的分区布局。 对于具有文件内组概念的文件格式,例如Parquet,适用类似的指导原则。行组可以在读取时提供并行性,允许基于统计信息跳过数据,但非常小的组可能会导致元数据成为文件大小的重要部分。在大多数情况下,Arrow的文件写入器提供了组大小的合理默认值。

从其他数据源读取 

读取内存中的数据 如果您已经有内存中的数据,希望使用Datasets API(例如,对数据进行筛选/投影或将其写入文件系统),您可以将其封装在arrow::dataset::InMemoryDataset中:

auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();

在这个示例中,我们使用InMemoryDataset将示例数据写入本地磁盘,然后在示例的其余部分中使用它:

// Set up a dataset by writing files with partitioning
arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto schema = arrow::schema(
      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
  arrow::StringBuilder string_builder;
  ARROW_RETURN_NOT_OK(
      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
  auto table = arrow::Table::Make(schema, arrays);
  // Write it using Datasets
  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());

  // The partition schema determines which fields are part of the partitioning.
  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
  // We'll write Parquet files.
  auto format = std::make_shared<ds::ParquetFileFormat>();
  ds::FileSystemDatasetWriteOptions write_options;
  write_options.file_write_options = format->DefaultWriteOptions();
  write_options.filesystem = filesystem;
  write_options.base_dir = base_path;
  write_options.partitioning = partitioning;
  write_options.basename_template = "part{i}.parquet";
  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
  return base_path;
}

从云存储中读取 

除了本地文件,Arrow Datasets还支持从云存储系统(例如Amazon S3)中读取,通过传递不同的文件系统。

有关可用文件系统的更多详细信息,请参阅文件系统文档。

关于事务和ACID保证的注意事项 

数据集API不提供事务支持或任何ACID保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取并发执行可能会导致意外的行为。可以使用各种方法来避免操作相同文件,例如为每个写入者使用唯一的基本名称模板、为新文件使用临时目录或单独存储文件列表,而不是依赖于目录发现。

在写入过程中意外终止进程可能会导致系统处于不一致状态。写入调用通常在字节已完全传递到操作系统页面缓存后立即返回。即使写操作已完成,如果在写操作之后立即发生突然断电,仍然有可能会丢失部分文件。

大多数文件格式都有在文件末尾写入的魔术数。这意味着可以安全地检测并丢弃部分文件写入。CSV文件格式没有这种概念,部分写入的CSV文件可能会被检测为有效。


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/360
上一篇:arrow系列26---表格数据集1(Tabular Datasets)
下一篇:arrow系列28---Arrow Flight RPC