arrow系列19---Gandiva表达式编译器(The Gandiva Expression Compiler)
作者:yunjinqi   类别:    日期:2023-10-16 20:27:13    阅读:480 次   消耗积分:0 分    


Gandiva是一个运行时表达式编译器,使用LLVM生成用于在Arrow记录批次上进行计算的高效本机代码。Gandiva仅处理投影和过滤操作;对于其他转换,请参见Compute Functions(计算函数)。

Gandiva的设计旨在利用Arrow内存格式和现代硬件。从Arrow内存模型出发,由于Arrow数组为值和有效性位图分别提供了单独的缓冲区,因此通常可以独立处理值及其空值状态,从而实现更好的指令流水线处理。在现代硬件上,使用LLVM编译表达式使执行能够针对本地运行时环境和硬件进行优化,包括可用的SIMD指令。为减少优化开销,许多Gandiva函数都预先编译成LLVM IR(中间表示)。

构建表达式 

Gandiva提供了一个通用的表达式表示,其中表达式由节点树表示。使用TreeExprBuilder构建表达式树。表达式树的叶子通常是字段引用,由TreeExprBuilder::MakeField()创建,以及字面值,由TreeExprBuilder::MakeLiteral()创建。可以使用以下方式将节点组合成更复杂的表达式树:

  • 使用TreeExprBuilder::MakeFunction()创建函数节点。(您可以调用GetRegisteredFunctionSignatures()获取有效函数签名列表。)

  • 使用TreeExprBuilder::MakeIf()创建if-else逻辑。

  • 使用TreeExprBuilder::MakeAnd()和TreeExprBuilder::MakeOr()创建布尔表达式。(对于“not”,使用MakeFunction中的not(bool)函数。)

  • 使用TreeExprBuilder::MakeInExpressionInt32()和其他“in表达式”函数创建集合成员测试。

每个这些函数都创建新的复合节点,其中包含叶子节点(字面值和字段引用)或其他复合节点作为子节点。通过组合它们,您可以创建任意复杂的表达式树。

一旦构建了表达式树,它们将被包装为Expression(表达式)或Condition(条件),具体取决于它们将如何使用。Expression用于投影,而Condition用于过滤。

例如,以下是如何创建一个表示x + 3的Expression和一个表示x < 3的Condition:

std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());

std::shared_ptr<Node> add_node =
    TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
    TreeExprBuilder::MakeExpression(add_node, field_result);

std::shared_ptr<Node> less_than_node =
    TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);

投影器和过滤器 

Gandiva的两个执行内核是Projector(投影器)和Filter(过滤器)。Projector用于消耗记录批次并将其投影到新的记录批次中。Filter用于消耗记录批次并生成包含匹配条件的索引的SelectionVector(选择向量)。

对于Projector和Filter,表达式IR的优化发生在创建实例时。它们根据静态模式进行编译,因此必须在此时了解记录批次的模式。

继续使用上一节中创建的表达式和条件,以下是如何创建Projector和Filter的示例:

std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);

std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);

一旦创建了Projector或Filter,就可以在Arrow记录批次上进行评估。这些执行内核本身是单线程的,但设计成可重复使用,以并行处理不同的记录批次。

评估投影 

使用Projector::Evaluate()执行投影。这将输出一个数组的向量,可以与输出模式一起传递给arrow::RecordBatch::Make()。

auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});

arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);

评估过滤器 

Filter::Evaluate()生成SelectionVector,即匹配过滤条件的行索引向量。选择向量是一个围绕arrow整数数组的包装,由位宽参数化。在创建选择向量时(在传递给Evaluate()之前必须初始化),必须选择位宽,它确定它可以容纳的最大索引值,以及最大的槽位数,它确定它可以包含的索引数量。通常情况下,最大槽位数应设置为批量大小,位宽应选择可以表示小于批量大小的所有整数的最小整数大小。例如,如果批量大小为100k,将最大槽位数设置为100k,位宽设置为32(因为2^16 = 64k,这将太小)。

一旦运行Evaluate()并填充了SelectionVector,使用SelectionVector::ToArray()方法获取底层数组,然后使用::arrow::compute::Take()将其实现为输出记录批次。

std.shared_ptr<gandiva::SelectionVector> result_indices;
// 为索引使用16位整数。结果不能长于输入大小,
// 所以使用批量num_rows作为max_slots。
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
                                             &result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
                      arrow::compute::Take(Datum(in_batch), Datum(take_indices),
                                           TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();

评估投影和过滤器 

最后,您还可以在应用选择向量时进行投影,使用Projector::Evaluate()。要这样做,首先确保使用SelectionVector::GetMode()初始化Projector,以便投影器使用正确的位宽进行编译。然后,可以将选择向量传递到Projector::Evaluate()方法中。

// 确保投影器已针对适当的选择向量模式进行编译
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
                         ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);

arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);

result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/352
上一篇:arrow系列18---计算函数2(Computer Functions)
下一篇:arrow系列20---输入/输出和文件系统