arrow系列11---datasets小例子的运行
作者:yunjinqi   类别:    日期:2023-10-15 16:33:09    阅读:127 次   消耗积分:0 分    

1、CMakeLists.txt


cmake_minimum_required(VERSION 3.16)
project(MyExample)
find_package(Arrow REQUIRED)
find_package(Parquet REQUIRED)
find_package(ArrowDataset REQUIRED)
add_executable(my_example my_example.cc)
target_link_libraries(my_example PRIVATE Arrow::arrow_shared Parquet::parquet_shared ArrowDataset::arrow_dataset_shared)


2. my_example.cc

// (文档部分: 包含)
#include <arrow/api.h>
#include <arrow/dataset/api.h>
// 我们在示例设置中使用Parquet头文件,但在使用数据集时它们不是必需的。
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <iostream>
// (文档部分: 包含)
// (文档部分: 辅助函数)
// 为接下来的示例生成一些数据。
arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
  // 这段代码应该看起来很熟悉,与基本的Arrow示例类似,不是本示例的重点。然而,我们需要数据来处理,这就是生成数据的目的!
auto schema =
arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
arrow::field("c", arrow::int64())});
std::shared_ptr<arrow::Array> array_a;
std::shared_ptr<arrow::Array> array_b;
std::shared_ptr<arrow::Array> array_c;
arrow::NumericBuilder<arrow::Int64Type> builder;
ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
builder.Reset();
ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
builder.Reset();
ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
return arrow::Table::Make(schema, {array_a, array_b, array_c});
}
// 通过编写两个Parquet文件来设置一个数据集。
arrow::Result<std::string> CreateExampleParquetDataset( const std::shared_ptr<arrow::fs::FileSystem>& filesystem, const std::string& root_path)
{
  // 与CreateTable()类似,这是一个实用程序,使我们可以读取的数据集,不必担心,示例中我们还会编写一个数据集。
auto base_path = root_path + "parquet_dataset";
ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
  // 创建一个Arrow表
ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
  // 将其写入两个Parquet文件
ARROW_ASSIGN_OR_RAISE(auto output,
filesystem->OpenOutputStream(base_path + "/data1.parquet"));
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
*table->Slice(0, 5), arrow::default_memory_pool(), output, 2048));
ARROW_ASSIGN_OR_RAISE(output,
filesystem->OpenOutputStream(base_path + "/data2.parquet"));
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
*table->Slice(5), arrow::default_memory_pool(), output, 2048));
return base_path;
}
arrow::Status PrepareEnv() {
  // 通过设置一些快速写入来准备好我们的环境以供读取。
ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable())
std::shared_ptr<arrow::fs::FileSystem> setup_fs;
  // 请注意,这是在构建可执行文件的目录中运行的。
char setup_path[256];
char* result = getcwd(setup_path, 256);
if (result == NULL) {
return arrow::Status::IOError("获取PWD失败。");
}
ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path));
ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, ""));
return arrow::Status::OK();
}
// (文档部分: 辅助函数)
// (文档部分: RunMain)
arrow::Status RunMain() {
  // (文档部分: RunMain)
  // (文档部分: PrepareEnv)
ARROW_RETURN_NOT_OK(PrepareEnv());
  // (文档部分: PrepareEnv)
  // (文档部分: FileSystem声明)
  // 首先,我们需要一个文件系统对象,它允许我们从给定的路径开始与本地文件系统交互。为了简单起见,这将是当前目录。
std::shared_ptr<arrow::fs::FileSystem> fs;
  // (文档部分: FileSystem声明)
  // (文档部分: FileSystem初始化)
  // 获取当前工作目录并使用它来创建FileSystem对象。
char init_path[256];
char* result = getcwd(init_path, 256);
if (result == NULL) {
return arrow::Status::IOError("获取PWD失败。");
}
ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));
  // (文档部分: FileSystem初始化)
  // (文档部分: FileSelector声明)
  // 文件选择器允许我们实际遍历多文件数据集。
arrow::fs::FileSelector selector;
  // (文档部分: FileSelector声明)
  // (文档部分: FileSelector配置)
selector.base_dir = "parquet_dataset";
  // 如果不知道数据集的嵌套结构,递归是一个安全的选择。
selector.recursive = true;
  // (文档部分: FileSelector配置)
  // (文档部分: FileSystemFactoryOptions)
  // 创建选项对象允许我们配置数据集读取。
arrow::dataset::FileSystemFactoryOptions options;
  // 我们将使用Hive风格的分区。我们将让Arrow Datasets推断分区模式。我们不会设置其他选项,使用默认值就足够了。
options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();
  // (文档部分: FileSystemFactoryOptions)
  // (文档部分: 文件格式设置)
auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
  // (文档部分: 文件格式设置)
  // (文档部分: FileSystemDatasetFactory创建)
  // 现在,我们获得一个工厂,它允许我们获取数据集,我们还没有数据集!
ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
fs, selector, read_format, options));
  // (文档部分: FileSystemDatasetFactory创建)
  // (文档部分: FileSystemDatasetFactory完成)
  // 现在,我们从工厂构建数据集。
ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());
  // (文档部分: FileSystemDatasetFactory完成)
  // (文档部分: 数据集片段)
  // 打印出数据片段
ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments());
for (const auto& fragment : fragments) {
std::cout << "找到片段:" << (*fragment)->ToString() << std::endl;
std::cout << "分区表达式:" << (*fragment)->partition_expression().ToString() << std::endl;
}
  // (文档部分: 数据集片段)
  // (文档部分: 读取扫描生成器)
  // 将数据集扫描为表格,完成后,您可以对其执行常规表格操作,如计算和打印。不过,现在您也需要在内存中运行。
ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan());
  // (文档部分: 读取扫描生成器)
  // (文档部分: 读取扫描器)
ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish());
  // (文档部分: 读取扫描器)
  // (文档部分: 转为表格)
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable());
std::cout << table->ToString();
  // (文档部分: 转为表格)
  // (文档部分: TableBatchReader)
  // 现在,让我们将表格输出到磁盘作为数据集!
  // 我们从表格中创建一个RecordBatchReader,然后设置扫描器,以便我们可以保存到文件。
std::shared_ptr<arrow::TableBatchReader> write_dataset =
std::make_shared<arrow::TableBatchReader>(table);
  // (文档部分: TableBatchReader)
  // (文档部分: 写扫描器)
auto write_scanner_builder =
arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset);
ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())
  // (文档部分: 写扫描器)
  // (文档部分: 分区模式)
  // 分区模式确定用于分区的键是哪些字段。
auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});
  // (文档部分: 分区模式)
  // (文档部分: 分区创建)
  // 我们将使用Hive风格的分区,这将创建具有 "key=value" 对的目录。
auto partitioning =
std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
  // (文档部分: 分区创建)
  // (文档部分: 写入格式)
  // 现在,声明我们将写入Parquet文件。
auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
  // (文档部分: 写入格式)
  // (文档部分: 写入选项)
  // 这一次,我们创建用于写入的选项,但会进行更多配置。
arrow::dataset::FileSystemDatasetWriteOptions write_options;
  // 默认值开始。
write_options.file_write_options = write_format->DefaultWriteOptions();
  // (文档部分: 写入选项)
  // (文档部分: 选项文件系统)
  // 使用我们已经有的文件系统。
write_options.filesystem = fs;
  // (文档部分: 选项文件系统)
  // (文档部分: 选项目标)
  // 写入到当前目录中的 "write_dataset" 文件夹。
write_options.base_dir = "write_dataset";
  // (文档部分: 选项目标)
  // (文档部分: 选项分区)
  // 使用上面声明的分区。
write_options.partitioning = partitioning;
  // (文档部分: 选项分区)
  // (文档部分: 选项文件名称模板)
  // 定义构成数据集的文件的名称。
write_options.basename_template = "part{i}.parquet";
  // (文档部分: 选项文件名称模板)
  // (文档部分: 选项已存在数据行为)
  // 设置行为以覆盖现有数据 - 具体来说,这使得可以多次运行此示例,并允许您的代码覆盖已有的内容。
write_options.existing_data_behavior =
arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;
  // (文档部分: 选项已存在数据行为)
  // (文档部分: 写入数据集)
  // 将数据写入磁盘!
ARROW_RETURN_NOT_OK(
arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));
  // (文档部分: 写入数据集)
  // (文档部分: 返回)
return arrow::Status::OK();
}
// (文档部分: 返回)
// (文档部分: 主函数)
int main() {
arrow::Status st = RunMain();
if (!st.ok()) {
std::cerr << st << std::endl;
return 1;
}
return 0;
}
// (文档部分: 主函数)



把这两个文件放到一个文件夹下,在该文件夹下运行:

cmake -B build
cmake --build build
./build/my_example


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/344
上一篇:arrow系列10 --- arrow数据集(Arrow Datasets)
下一篇:arrow系列12---arrow的组成部分(High-Level Overview)