读写分区数据
到目前为止,我们一直在处理包含平坦目录和文件的数据集。很多情况下,数据集将具有一个或多个经常进行筛选的列。与其读取然后筛选数据,通过将文件组织成嵌套的目录结构,我们可以定义一个分区数据集,其中子目录名称包含关于存储在该目录中的数据子集的信息。然后,我们可以使用这些信息更有效地筛选数据,以避免加载不匹配筛选条件的文件。
例如,按年份和月份分区的数据集可能具有以下布局:
dataset_name/ year=2007/ month=01/ data0.parquet data1.parquet ... month=02/ data0.parquet data1.parquet ... month=03/ ... year=2008/ month=01/ ... ...
上述的分区方案使用了类似于Apache Hive中的"/key=value/"目录名称。按照这种约定,位于dataset_name/year=2007/month=01/data0.parquet的文件仅包含year == 2007和month == 01的数据。
让我们创建一个小的分区数据集。为此,我们将使用Arrow的数据集写入功能。
// Set up a dataset by writing files with partitioning arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset( const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) { auto base_path = root_path + "/parquet_dataset"; ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); // Create an Arrow Table auto schema = arrow::schema( {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())}); std::vector<std::shared_ptr<arrow::Array>> arrays(4); arrow::NumericBuilder<arrow::Int64Type> builder; ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0])); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1])); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2])); arrow::StringBuilder string_builder; ARROW_RETURN_NOT_OK( string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"})); ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3])); auto table = arrow::Table::Make(schema, arrays); // Write it using Datasets auto dataset = std::make_shared<ds::InMemoryDataset>(table); ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan()); ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish()); // The partition schema determines which fields are part of the partitioning. auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())}); // We'll use Hive-style partitioning, which creates directories with "key=value" pairs. auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema); // We'll write Parquet files. auto format = std::make_shared<ds::ParquetFileFormat>(); ds::FileSystemDatasetWriteOptions write_options; write_options.file_write_options = format->DefaultWriteOptions(); write_options.filesystem = filesystem; write_options.base_dir = base_path; write_options.partitioning = partitioning; write_options.basename_template = "part{i}.parquet"; ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner)); return base_path; }
上述操作创建了一个目录,其中包含两个子目录("part=a"和"part=b"),并且写入这些目录中的Parquet文件不再包括"part"列。
读取此数据集时,我们现在指定数据集应使用类似于Hive的分区方案:
// Read an entire dataset, but with partitioning information. arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset( const std::shared_ptr<fs::FileSystem>& filesystem, const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { fs::FileSelector selector; selector.base_dir = base_dir; selector.recursive = true; // Make sure to search subdirectories ds::FileSystemFactoryOptions options; // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition // schema. options.partitioning = ds::HivePartitioning::MakeFactory(); ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make( filesystem, selector, format, options)); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); // Print out the fragments ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments()); for (const auto& fragment : fragments) { std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl; std::cout << "Partition expression: " << (*fragment)->partition_expression().ToString() << std::endl; } ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
尽管分区字段未包含在实际的Parquet文件中,但在扫描此数据集时,它们将被添加回结果表中:
./debug/dataset_documentation_example file:///tmp parquet_hive partitioned Found fragment: /tmp/parquet_dataset/part=a/part0.parquet Partition expression: (part == "a") Found fragment: /tmp/parquet_dataset/part=b/part1.parquet Partition expression: (part == "b") Read 20 rows a: int64 -- field metadata -- PARQUET:field_id: '1' b: double -- field metadata -- PARQUET:field_id: '2' c: int64 -- field metadata -- PARQUET:field_id: '3' part: string ---- # snip...
现在,我们可以根据分区键进行筛选,这样如果文件不匹配筛选条件,就不需要加载它们:
// Read an entire dataset, but with partitioning information. Also, filter the dataset on // the partition values. arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset( const std::shared_ptr<fs::FileSystem>& filesystem, const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { fs::FileSelector selector; selector.base_dir = base_dir; selector.recursive = true; ds::FileSystemFactoryOptions options; options.partitioning = ds::HivePartitioning::MakeFactory(); ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make( filesystem, selector, format, options)); ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); // Filter based on the partition values. This will mean that we won't even read the // files whose partition expressions don't match the filter. ARROW_RETURN_NOT_OK( scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b")))); ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); return scanner->ToTable(); }
不同的分区方案
上述示例使用类似于Hive的目录方案,例如"/year=2009/month=11/day=15"。我们通过传递Hive分区工厂来指定这一点。在这种情况下,分区键的类型是从文件路径中推断出来的。
也可以直接构建分区并明确定义分区键的模式。例如:
auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({ arrow::field("year", arrow::int16()), arrow::field("month", arrow::int8()), arrow::field("day", arrow::int32()) }));
Arrow还支持另一种分区方案,即“目录分区”,其中文件路径中的段表示分区键的值,而不包括名称(字段名称在段的索引中是隐式的)。例如,给定字段名称“year”、“month”和“day”,一个路径可能是“/2019/11/15”。
由于文件路径中不包括名称,因此在构建目录分区时必须指定这些名称:
auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});
目录分区还支持提供完整模式,而不是从文件路径中推断类型。
分区性能考虑
分区数据集涉及两个影响性能的方面:它增加了文件的数量,并在文件周围创建了目录结构。这两者都有好处和成本。根据配置和数据集的大小,成本可能会超过好处。
因为分区将数据集分成多个文件,所以可以使用并行读取和写入分区数据集。但是,每个额外的文件会在文件系统交互方面增加一些处理开销。它还增加了整个数据集的大小,因为每个文件都有一些共享的元数据。例如,每个Parquet文件包含模式和组级别统计信息。分区数量是文件数量的下限。如果将数据集按日期分区,一年的数据将至少有365个文件。如果进一步按另一个维度进行分区,具有1,000个唯一值,将有多达365,000个文件。这种细粒度的分区通常会导致主要由元数据组成的小文件。
分区数据集创建了嵌套的文件夹结构,允许我们修剪加载的扫描文件。但是,这会增加在数据集中查找文件的开销,因为我们需要递归地“列出目录”以查找数据文件。分区太细可以在这里引起问题:按日期分区一个年度数据需要365次列出调用以查找所有文件;如果再添加具有1,000个基数的另一列,将需要365,365次调用。
最佳的分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统,包括Arrow,应该适用于各种文件大小和分区布局,但应避免极端情况。以下几点建议有助于避免一些已知的最差情况:
避免小于20MB和大于2GB的文件。 避免具有超过10,000个不同分区的分区布局。 对于具有文件内组概念的文件格式,例如Parquet,适用类似的指导原则。行组可以在读取时提供并行性,允许基于统计信息跳过数据,但非常小的组可能会导致元数据成为文件大小的重要部分。在大多数情况下,Arrow的文件写入器提供了组大小的合理默认值。
从其他数据源读取
读取内存中的数据 如果您已经有内存中的数据,希望使用Datasets API(例如,对数据进行筛选/投影或将其写入文件系统),您可以将其封装在arrow::dataset::InMemoryDataset中:
auto table = arrow::Table::FromRecordBatches(...); auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table)); // Scan the dataset, filter, it, etc. auto scanner_builder = dataset->NewScan();
在这个示例中,我们使用InMemoryDataset将示例数据写入本地磁盘,然后在示例的其余部分中使用它:
// Set up a dataset by writing files with partitioning arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset( const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) { auto base_path = root_path + "/parquet_dataset"; ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); // Create an Arrow Table auto schema = arrow::schema( {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())}); std::vector<std::shared_ptr<arrow::Array>> arrays(4); arrow::NumericBuilder<arrow::Int64Type> builder; ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0])); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1])); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2])); arrow::StringBuilder string_builder; ARROW_RETURN_NOT_OK( string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"})); ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3])); auto table = arrow::Table::Make(schema, arrays); // Write it using Datasets auto dataset = std::make_shared<ds::InMemoryDataset>(table); ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan()); ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish()); // The partition schema determines which fields are part of the partitioning. auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())}); // We'll use Hive-style partitioning, which creates directories with "key=value" pairs. auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema); // We'll write Parquet files. auto format = std::make_shared<ds::ParquetFileFormat>(); ds::FileSystemDatasetWriteOptions write_options; write_options.file_write_options = format->DefaultWriteOptions(); write_options.filesystem = filesystem; write_options.base_dir = base_path; write_options.partitioning = partitioning; write_options.basename_template = "part{i}.parquet"; ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner)); return base_path; }
从云存储中读取
除了本地文件,Arrow Datasets还支持从云存储系统(例如Amazon S3)中读取,通过传递不同的文件系统。
有关可用文件系统的更多详细信息,请参阅文件系统文档。
关于事务和ACID保证的注意事项
数据集API不提供事务支持或任何ACID保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取并发执行可能会导致意外的行为。可以使用各种方法来避免操作相同文件,例如为每个写入者使用唯一的基本名称模板、为新文件使用临时目录或单独存储文件列表,而不是依赖于目录发现。
在写入过程中意外终止进程可能会导致系统处于不一致状态。写入调用通常在字节已完全传递到操作系统页面缓存后立即返回。即使写操作已完成,如果在写操作之后立即发生突然断电,仍然有可能会丢失部分文件。
大多数文件格式都有在文件末尾写入的魔术数。这意味着可以安全地检测并丢弃部分文件写入。CSV文件格式没有这种概念,部分写入的CSV文件可能会被检测为有效。