Arrow Compute
Apache Arrow提供计算函数以促进高效和可移植的数据处理。在本文中,您将使用Arrow的计算功能来执行以下操作:
计算某列的总和
计算两列的逐元素和
在某列中搜索特定值
先决条件
在继续之前,请确保您具备以下条件:
Arrow安装,您可以在此处设置:在您自己的项目中使用Arrow C++
对来自基本Arrow数据结构的基本了解
设置
在运行一些计算之前,我们需要填补一些空白:
我们需要包含必要的头文件。
需要一个main()函数将所有内容粘合在一起。
我们需要一些数据来进行操作。
包含
在编写C++代码之前,我们需要一些包含。首先,我们引入iostream以进行输出,然后导入Arrow的计算功能:
#include <arrow/api.h> #include <arrow/compute/api.h> #include <iostream>
Main()
作为我们的粘合,我们将使用前一个教程中有关数据结构的main()模式:
int main() { arrow::Status st = RunMain(); if (!st.ok()) { std::cerr << st << std::endl; return 1; } return 0; }
就像以前使用它时一样,它与RunMain()配对使用:
arrow::Status RunMain() { return arrow::Status::OK(); }
生成用于计算的Table
在开始之前,我们将初始化一个包含两列的Table,以便进行操作。我们将使用基本Arrow数据结构中的方法,因此如果有什么令人困惑的地方,请回顾一下:
// 创建一些32位整数数组。 arrow::Int32Builder int32builder; int32_t some_nums_raw[5] = {34, 624, 2223, 5654, 4356}; ARROW_RETURN_NOT_OK(int32builder.AppendValues(some_nums_raw, 5)); std::shared_ptr<arrow::Array> some_nums; ARROW_ASSIGN_OR_RAISE(some_nums, int32builder.Finish()); int32_t more_nums_raw[5] = {75342, 23, 64, 17, 736}; ARROW_RETURN_NOT_OK(int32builder.AppendValues(more_nums_raw, 5)); std::shared_ptr<arrow::Array> more_nums; ARROW_ASSIGN_OR_RAISE(more_nums, int32builder.Finish()); // 使用我们的一对数组创建一个Table。 std::shared_ptr<arrow::Field> field_a, field_b; std::shared_ptr<arrow::Schema> schema; field_a = arrow::field("A", arrow::int32()); field_b = arrow::field("B", arrow::int32()); schema = arrow::schema({field_a, field_b}); std::shared_ptr<arrow::Table> table; table = arrow::Table::Make(schema, {some_nums, more_nums}, 5);
计算一个数组的总和
使用计算函数有两个一般步骤,我们在这里分开:
为输出准备一个Datum
调用compute::Sum(),这是一个方便的函数,用于对数组求和
检索并打印输出
为Datum准备内存以供输出 计算完成后,我们需要一个地方来存储结果。在Arrow中,用于存储此类输出的对象称为Datum。此对象用于在计算函数中传递输入和输出,并可以包含许多不同形状的Arrow数据结构。我们需要它来检索计算函数的输出。
// Datum类是所有计算函数的输出,它们可以接受Datum作为输入。 arrow::Datum sum;
调用Sum()
在这里,我们将获取我们的Table,其中包含列“A”和“B”,然后对列“A”进行求和。对于求和,有一个方便的函数,称为compute::Sum(),它简化了计算接口的复杂性。我们将在下一个计算中查看更复杂的版本。对于给定的函数,请参考计算函数以查看是否存在方便的函数。
compute::Sum()接受给定的Array或ChunkedArray - 在这里,我们使用Table::GetColumnByName()来传递列A。然后,它输出到Datum。将所有这些组合在一起,我们得到以下内容:
// 在这里,我们可以使用arrow::compute::Sum。这是一个方便的函数,下一个计算将不那么简单。然而,在可能的情况下,使用这些函数有助于提高可读性。 ARROW_ASSIGN_OR_RAISE(sum, arrow::compute::Sum({table->GetColumnByName("A")}));
从Datum中获取结果
前面的步骤留下了一个包含我们的总和的Datum。但是,我们不能直接打印它 - 由于它可以包含任意Arrow数据结构的灵活性,我们必须仔细地检索我们的数据。首先,为了了解它包含的内容,我们可以检查它是哪种数据结构,然后包含的是什么类型的基元数据:
// 获取Datum的种类和它包含的内容 - 这是一个Scalar,包含int64。 std::cout << "Datum kind: " << sum.ToString() << " content type: " << sum.type()->ToString() << std::endl;
这应该报告Datum存储了一个包含64位整数的标量。为了查看值是什么,我们可以像这样打印它,这将产生12891:
// 请注意,我们明确要求一个标量 - Datum不能简单地提供它是什么,您必须请求正确的类型。 std::cout << sum.scalar_as<arrow::Int64Scalar>().value << std::endl;
现在,我们已经使用compute::Sum()并从中获取了我们想要的结果!
使用CallFunction()进行逐元素数组加法计算
下一层复杂度使用了compute::Sum()隐藏的内容:compute::CallFunction()。在本示例中,我们将探讨如何使用更强大的compute::CallFunction()与“add”计算函数。
模式保持相似:
为输出准备一个Datum
使用“add”调用compute::CallFunction()
检索并打印输出
为Datum准备内存以供输出 再次,我们需要一个Datum以存储任何输出:
arrow::Datum element_wise_sum;
使用CallFunction()与“add” compute::CallFunction()的第一个参数是所需函数的名称,第二个参数是所需函数的数据输入的向量。现在,我们想要在列“A”和“B”之间进行逐元素加法。因此,我们将请求“add”,传入列“A和B”,并输出到我们的Datum。将所有这些组合在一起,我们得到以下内容:
// 获取Table中列A和B的逐元素和。请注意,这里我们使用了CallFunction(),它的第一个参数是函数的名称。 ARROW_ASSIGN_OR_RAISE(element_wise_sum, arrow::compute::CallFunction( "add", {table->GetColumnByName("A"), table->GetColumnByName("B")}));
另请参见
可用函数以获取compute::CallFunction()的其他函数列表
从Datum中获取结果 再次,需要谨慎处理Datum。当我们知道其中包含什么内容时,处理会更容易。此Datum包含一个包含32位整数的ChunkedArray,但我们可以打印以确认:
// 获取Datum的种类和它包含的内容 -- 这是一个ChunkedArray,包含int32。 std::cout << "Datum kind: " << element_wise_sum.ToString() << " content type: " << element_wise_sum.type()->ToString() << std::endl;
由于它是一个ChunkedArray,我们从Datum中请求它 - ChunkedArray有一个ChunkedArray::ToString()方法,因此我们将使用它来打印其内容:
// 这一次,我们得到了一个ChunkedArray,而不是标量。 std::cout << element_wise_sum.chunked_array()->ToString() << std.endl;
输出如下:
Datum kind: ChunkedArray content type: int32 [ [ 75376, 647, 2287, 5671, 5092 ] ]
现在,我们已经使用了compute::CallFunction(),而不是使用方便的函数!这使得更多可用的计算变得更加广泛。
使用CallFunction()和Options进行值搜索
仍然有一类计算。compute::CallFunction()使用数据输入的向量,但计算通常需要函数的附加参数。为了提供这些参数,计算函数可以关联到可以定义其参数的结构中。您可以检查给定函数,以查看它使用哪个结构。
在本示例中,我们将使用“index”计算函数在列“A”中搜索一个值。这个过程与之前的两个步骤不同,它有三个步骤:
为输出准备一个Datum
准备compute::IndexOptions
使用“index”和compute::IndexOptions调用compute::CallFunction()
检索并打印输出
为Datum准备内存以供输出 我们将需要一个Datum以存储任何输出:
// 使用一个选项结构来设置在列A中搜索2223(第三个项目)。 arrow::Datum third_item;
配置IndexOptions以进行“index” 对于这个探索,我们将使用“index”函数 - 这是一种搜索方法,它返回输入值的索引。为了传递这个输入值,我们需要一个compute::IndexOptions结构。所以,让我们制作这个结构:
// 一个选项结构用于代替传递任意数量的参数。 arrow::compute::IndexOptions index_options;
在搜索函数中,需要一个目标值。在这里,我们将使用2223,即列A中的第三个项目,并相应地配置我们的结构:
// 我们需要一个Arrow标量,而不是原始值。 index_options.value = arrow::MakeScalar(2223);
使用“index”和IndexOptions调用CallFunction() 要实际运行函数,我们再次使用compute::CallFunction(),这次将我们的IndexOptions结构通过引用作为第三个参数传递。与之前一样,第一个参数是函数的名称,第二个是我们的数据输入:
ARROW_ASSIGN_OR_RAISE( third_item, arrow::compute::CallFunction("index", {table->GetColumnByName("A")}, &index_options));
从Datum中获取结果 最后一次,让我们看看我们的Datum包含什么!这将是一个包含64位整数的标量,输出将是2:
// 获取Datum的种类和它包含的内容 -- 这是一个Scalar,包含int64。 std::cout << "Datum kind: " << third_item.ToString() << " content type: " << third_item.type()->ToString() << std::endl; // 我们得到一个标量 -- 列A中2223的位置,基于0的索引是2。 std::cout << third_item.scalar_as<arrow::Int64Scalar>().value << std::endl;
结束程序 最后,我们只需返回arrow::Status::OK(),这样main()函数就知道我们已经完成了,一切都正常,就像前面的教程一样。
return arrow::Status::OK();
有了这些,您已经使用了三种主要类型的compute函数 - 使用方便函数,使用带有Options结构和不使用方便函数。现在,您可以处理需要的任何Table,并解决适合内存的数据问题!
这意味着现在我们必须看看如何处理大于内存的数据集,通过下一篇文章中的Arrow数据集。
有关完整代码的副本,请参考下文: