arrow系列28---Arrow Flight RPC
作者:yunjinqi   类别:    日期:2023-10-17 19:27:04    阅读:375 次   消耗积分:0 分    

Arrow Flight是一种用于高效传输Flight数据的RPC框架,可通过网络进行数据传输。

另请参阅

Flight协议文档 包括如何在概念上使用Flight的Flight协议文档。

Flight API文档 列出所有不同客户端和服务器类型的C++ API文档。

C++ Cookbook 关于在C++中使用Arrow Flight的示例。

编写Flight服务 

服务器是arrow::flight::FlightServerBase的子类。要实现单个RPC,需要在此类上覆盖RPC方法。

class MyFlightServer : public FlightServerBase {
  Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
                     std::unique_ptr<FlightListing>* listings) override {
    std::vector<FlightInfo> flights = ...;
    *listings = std::unique_ptr<SimpleFlightListing>(new SimpleFlightListing(flights));
    return Status::OK();
  }
};

每个RPC方法总是使用arrow::flight::ServerCallContext来传递通用参数,并返回arrow::Status以指示成功或失败。可以通过arrow::flight::MakeFlightError()返回Flight特定的错误代码。

除状态之外还返回值的RPC方法将使用out参数,如上所示。通常情况下,有一些提供这些out参数的辅助类。例如,上面的arrow::flight::SimpleFlightListing使用arrow::flight::FlightInfo对象的向量作为ListFlights RPC的结果。

要启动服务器,请创建一个arrow::flight::Location来指定监听位置,并调用arrow::flight::FlightServerBase::Init()。这将启动服务器,但不会阻塞程序的其余部分。使用arrow::flight::FlightServerBase::SetShutdownOnSignals()启用停止服务器的功能,如果接收到中断信号,则调用arrow::flight::FlightServerBase::Serve()来阻塞,直到服务器停止。

std::unique_ptr<arrow::flight::FlightServerBase> server;
// 初始化服务器
arrow::flight::Location location;
// 监听所有接口上的自由端口
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);

// 启动服务器
ARROW_CHECK_OK(server->Init(options));
// 使用清除的错误代码(0)退出SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

std::cout << "服务器在localhost:" << server->port() << "上监听" << std::endl;
ARROW_CHECK_OK(server->Serve());

使用Flight客户端 

要连接到Flight服务,请通过调用Connect创建arrow::flight::FlightClient的实例。

每个RPC方法都返回arrow::Result以指示请求的成功/失败,并且如果请求成功,还返回结果对象。某些调用是流式调用,因此它们将返回reader和/或writer对象;直到流完成,最终的调用状态才知道。

取消和超时

 在进行调用时,客户端可以选择提供FlightCallOptions。这允许客户端设置调用的超时或提供自定义HTTP标头等功能。此外,由客户端RPC调用返回的一些对象公开了Cancel方法,允许提前终止调用。

在服务器端,不需要额外的代码来实现超时。对于取消,服务器需要手动轮询ServerCallContext::is_cancelled来检查客户端是否取消了调用,如果是,则退出服务器当前正在执行的任何处理。

启用TLS 

在设置服务器时,可以通过提供证书和密钥对到FlightServerBase::Init来启用TLS。

在客户端端使用Location::ForGrpcTls来构建arrow::flight::Location以进行监听。

启用身份验证 

警告 未启用TLS时,身份验证是不安全的。

可以通过实现ServerAuthHandler并在构建期间提供它来启用基于握手的身份验证。

身份验证分为两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行所需的任何协商。然后,客户端身份验证处理程序提供将附加到未来调用的令牌。这是通过使用所需的客户端身份验证实现来调用Authenticate完成的。

此后的每个RPC,客户端处理程序的令牌会自动添加到请求头中的调用。服务器身份验证处理程序验证令牌并提供客户端的身份。在服务器上,可以从arrow::flight::ServerCallContext获取此身份。

自定义中间件 

服务器和客户端支持在每个请求上调用的自定义中间件(或拦截器),并且可以在创建客户端或服务器时提供它们。

中间件功能有限,但它们可以添加请求/响应的标头。在服务器上,它们可以检查传入标头并拒绝请求;因此,它们可以用于实现自定义身份验证方法。

最佳实践 

gRPC 使用默认的gRPC传输时,可以通过arrow::flight::FlightClientOptions::generic_options传递选项。例如:

auto options = FlightClientOptions::Defaults();
// 设置发送保持活跃ping的间隔时间。
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);

还请参阅最佳gRPC实践和可用的gRPC键。


尽量复用客户端 

创建和关闭客户端需要在客户端和服务器端进行设置和拆除,这可能会减少实际处理RPC的时间。尽量在可能的情况下复用客户端以避免这种情况。请注意,客户端是线程安全的,因此可以在多个线程之间共享单个客户端。

不要循环负载均衡 

轮询负载均衡意味着每个客户端可以与每个服务器建立开放式连接,从而导致意外的打开连接数量,消耗服务器资源。

调试连接问题 

在长时间运行的连接上遇到意外断开时,使用netstat来监视打开连接的数量。如果连接数远远大于客户端的数量,可能会引起问题。

为调试,某些环境变量可用于在gRPC中启用日志记录。例如,env GRPC_VERBOSITY=info GRPC_TRACE=http将打印初始标头(两侧都有),以便您可以查看gRPC是否建立了连接。它还将打印消息何时发送,以便您可以了解连接是否处于打开状态。

gRPC可能不会报告连接错误,直到实际发出调用。因此,要在创建客户端时检测连接错误,应该进行某种虚拟RPC。

Memory管理

Flight试图重用gRPC分配的内存,以避免多余的数据副本。但是,这意味着这些分配可能不会由Arrow内存池跟踪,并且内存使用行为(例如,是否将未使用的内存返回给系统)取决于gRPC使用的分配器(通常是系统分配器)。

快速测试的方法:使用调试器附加到进程并调用malloc_trim,或在系统池上调用ReleaseUnused。如果内存使用下降,那么很可能是由gRPC或应用程序分配的内存,系统分配器一直在保存。可以以特定于平台的方式进行调整;请参见ARROW-16697中关于如何在Linux/glibc上工作的示例。glibc malloc可以明确告知它释放缓存。

过多的流量

gRPC将为并发客户端生成最大线程配额数量的线程。这些线程不一定会被清理(在Java术语中称为“缓存线程池”)。

glibc malloc会清除某些线程的每个状态,而默认调整在某些工作负载中从不清除缓存。

gRPC的默认行为允许一个服务器接受来自许多不同客户端的连接,但如果请求执行了大量工作(如在Flight下可能发生的情况),则服务器可能无法跟上。配置客户端以使用退避进行重试(并可能连接到不同的节点)会提供更一致的服务质量。

auto options = FlightClientOptions::Defaults();
// 设置连续连接尝试之间的最小时间。
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);

限制DoPut的批处理大小 

您可能希望限制客户端通过DoPut向服务器提交的最大批处理大小,以防止请求在服务器上占用过多内存。在客户端端,设置arrow::flight::FlightClientOptions::write_size_limit_bytes。在服务器端,设置gRPC选项GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。客户端端的选项将返回一个错误,可以使用较小的批次重试,而服务器端的限制将关闭连接。设置两者可能是明智的,因为前者提供更好的用户体验,但后者可能是必要的,以防御不礼貌的客户端。

关闭无响应的连接 

可以使用arrow::flight::FlightCallOptions::stop_token关闭不响应的调用。这需要在建立调用时记录停止令牌。

StopSource stop_source;
FlightCallOptions options;
options.stop_token = stop_source.token();
stop_source.RequestStop(Status::Cancelled("StopSource"));
flight_client->DoAction(options, {});

使用调用超时(这是一种通用的gRPC最佳实践)

FlightCallOptions options;
options.timeout = TimeoutDuration{0.2};
Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();

客户端超时对于长时间运行的流式调用不太适用,因为很难为整个操作选择超时。因此,通常希望的是每次读取或写入的超时,以便如果操作没有取得进展,则操作失败。可以使用后台线程来调用定时器上的Cancel(),主线程在每次操作成功完成时重置定时器。有关完全解决的示例,请参见Cookbook。

注意

已有一个用于每次写入/读取超时而不是每次调用超时的长期待定的票证(ARROW-6062),但这很难实现。使用阻塞gRPC API。

Alternative Transports

Arrow Flight的标准传输是gRPC。C++实现还支持基于UCX的传输,但仅作为实验性功能。要使用它,请在启动服务器或创建客户端时使用协议方案ucx:。

UCX传输gRPC传输的所有功能都不受支持。有关详细信息,请参阅Flight RPC。此外,请注意以下特定注意事项:

服务器为每个客户端创建一个独立的UCP工作线程。这消耗更多资源,但提供更好的吞吐量。

客户端为每个RPC调用创建一个独立的UCP工作线程。同样,这是在性能和资源消耗之间的权衡。这也意味着与gRPC不同,使用单个客户端或使用多个客户端基本上是等效的。

UCX传输会尽量避免复制数据。在某些情况下,它可以直接重用由UCX分配的缓冲区以支持arrow::Buffer对象,然而,这也将延长关联的UCX资源的生命周期,超出了Flight客户端或服务器对象的生命周期。

根据UCX自身选择的传输,您可能会发现将UCX_MM_SEG_SIZE从默认值(约8KB左右)增加到约60KB可以提高性能(UCX会在单次调用中复制更多数据)。


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/361
上一篇:arrow系列27---表格数据集2(Tabular Datasets)
下一篇:arrow系列29---使用Arrow调试代码