Arrow Flight是一种用于高效传输Flight数据的RPC框架,可通过网络进行数据传输。
另请参阅
Flight协议文档 包括如何在概念上使用Flight的Flight协议文档。
Flight API文档 列出所有不同客户端和服务器类型的C++ API文档。
C++ Cookbook 关于在C++中使用Arrow Flight的示例。
编写Flight服务
服务器是arrow::flight::FlightServerBase的子类。要实现单个RPC,需要在此类上覆盖RPC方法。
class MyFlightServer : public FlightServerBase { Status ListFlights(const ServerCallContext& context, const Criteria* criteria, std::unique_ptr<FlightListing>* listings) override { std::vector<FlightInfo> flights = ...; *listings = std::unique_ptr<SimpleFlightListing>(new SimpleFlightListing(flights)); return Status::OK(); } };
每个RPC方法总是使用arrow::flight::ServerCallContext来传递通用参数,并返回arrow::Status以指示成功或失败。可以通过arrow::flight::MakeFlightError()返回Flight特定的错误代码。
除状态之外还返回值的RPC方法将使用out参数,如上所示。通常情况下,有一些提供这些out参数的辅助类。例如,上面的arrow::flight::SimpleFlightListing使用arrow::flight::FlightInfo对象的向量作为ListFlights RPC的结果。
要启动服务器,请创建一个arrow::flight::Location来指定监听位置,并调用arrow::flight::FlightServerBase::Init()。这将启动服务器,但不会阻塞程序的其余部分。使用arrow::flight::FlightServerBase::SetShutdownOnSignals()启用停止服务器的功能,如果接收到中断信号,则调用arrow::flight::FlightServerBase::Serve()来阻塞,直到服务器停止。
std::unique_ptr<arrow::flight::FlightServerBase> server; // 初始化服务器 arrow::flight::Location location; // 监听所有接口上的自由端口 ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location)); arrow::flight::FlightServerOptions options(location); // 启动服务器 ARROW_CHECK_OK(server->Init(options)); // 使用清除的错误代码(0)退出SIGTERM ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM})); std::cout << "服务器在localhost:" << server->port() << "上监听" << std::endl; ARROW_CHECK_OK(server->Serve());
使用Flight客户端
要连接到Flight服务,请通过调用Connect创建arrow::flight::FlightClient的实例。
每个RPC方法都返回arrow::Result以指示请求的成功/失败,并且如果请求成功,还返回结果对象。某些调用是流式调用,因此它们将返回reader和/或writer对象;直到流完成,最终的调用状态才知道。
取消和超时
在进行调用时,客户端可以选择提供FlightCallOptions。这允许客户端设置调用的超时或提供自定义HTTP标头等功能。此外,由客户端RPC调用返回的一些对象公开了Cancel方法,允许提前终止调用。
在服务器端,不需要额外的代码来实现超时。对于取消,服务器需要手动轮询ServerCallContext::is_cancelled来检查客户端是否取消了调用,如果是,则退出服务器当前正在执行的任何处理。
启用TLS
在设置服务器时,可以通过提供证书和密钥对到FlightServerBase::Init来启用TLS。
在客户端端使用Location::ForGrpcTls来构建arrow::flight::Location以进行监听。
启用身份验证
警告 未启用TLS时,身份验证是不安全的。
可以通过实现ServerAuthHandler并在构建期间提供它来启用基于握手的身份验证。
身份验证分为两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行所需的任何协商。然后,客户端身份验证处理程序提供将附加到未来调用的令牌。这是通过使用所需的客户端身份验证实现来调用Authenticate完成的。
此后的每个RPC,客户端处理程序的令牌会自动添加到请求头中的调用。服务器身份验证处理程序验证令牌并提供客户端的身份。在服务器上,可以从arrow::flight::ServerCallContext获取此身份。
自定义中间件
服务器和客户端支持在每个请求上调用的自定义中间件(或拦截器),并且可以在创建客户端或服务器时提供它们。
中间件功能有限,但它们可以添加请求/响应的标头。在服务器上,它们可以检查传入标头并拒绝请求;因此,它们可以用于实现自定义身份验证方法。
最佳实践
gRPC 使用默认的gRPC传输时,可以通过arrow::flight::FlightClientOptions::generic_options传递选项。例如:
auto options = FlightClientOptions::Defaults(); // 设置发送保持活跃ping的间隔时间。 options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
还请参阅最佳gRPC实践和可用的gRPC键。
尽量复用客户端
创建和关闭客户端需要在客户端和服务器端进行设置和拆除,这可能会减少实际处理RPC的时间。尽量在可能的情况下复用客户端以避免这种情况。请注意,客户端是线程安全的,因此可以在多个线程之间共享单个客户端。
不要循环负载均衡
轮询负载均衡意味着每个客户端可以与每个服务器建立开放式连接,从而导致意外的打开连接数量,消耗服务器资源。
调试连接问题
在长时间运行的连接上遇到意外断开时,使用netstat来监视打开连接的数量。如果连接数远远大于客户端的数量,可能会引起问题。
为调试,某些环境变量可用于在gRPC中启用日志记录。例如,env GRPC_VERBOSITY=info GRPC_TRACE=http将打印初始标头(两侧都有),以便您可以查看gRPC是否建立了连接。它还将打印消息何时发送,以便您可以了解连接是否处于打开状态。
gRPC可能不会报告连接错误,直到实际发出调用。因此,要在创建客户端时检测连接错误,应该进行某种虚拟RPC。
Memory管理
Flight试图重用gRPC分配的内存,以避免多余的数据副本。但是,这意味着这些分配可能不会由Arrow内存池跟踪,并且内存使用行为(例如,是否将未使用的内存返回给系统)取决于gRPC使用的分配器(通常是系统分配器)。
快速测试的方法:使用调试器附加到进程并调用malloc_trim,或在系统池上调用ReleaseUnused。如果内存使用下降,那么很可能是由gRPC或应用程序分配的内存,系统分配器一直在保存。可以以特定于平台的方式进行调整;请参见ARROW-16697中关于如何在Linux/glibc上工作的示例。glibc malloc可以明确告知它释放缓存。
过多的流量
gRPC将为并发客户端生成最大线程配额数量的线程。这些线程不一定会被清理(在Java术语中称为“缓存线程池”)。
glibc malloc会清除某些线程的每个状态,而默认调整在某些工作负载中从不清除缓存。
gRPC的默认行为允许一个服务器接受来自许多不同客户端的连接,但如果请求执行了大量工作(如在Flight下可能发生的情况),则服务器可能无法跟上。配置客户端以使用退避进行重试(并可能连接到不同的节点)会提供更一致的服务质量。
auto options = FlightClientOptions::Defaults(); // 设置连续连接尝试之间的最小时间。 options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
限制DoPut的批处理大小
您可能希望限制客户端通过DoPut向服务器提交的最大批处理大小,以防止请求在服务器上占用过多内存。在客户端端,设置arrow::flight::FlightClientOptions::write_size_limit_bytes。在服务器端,设置gRPC选项GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。客户端端的选项将返回一个错误,可以使用较小的批次重试,而服务器端的限制将关闭连接。设置两者可能是明智的,因为前者提供更好的用户体验,但后者可能是必要的,以防御不礼貌的客户端。
关闭无响应的连接
可以使用arrow::flight::FlightCallOptions::stop_token关闭不响应的调用。这需要在建立调用时记录停止令牌。
StopSource stop_source; FlightCallOptions options; options.stop_token = stop_source.token(); stop_source.RequestStop(Status::Cancelled("StopSource")); flight_client->DoAction(options, {});
使用调用超时(这是一种通用的gRPC最佳实践)
FlightCallOptions options; options.timeout = TimeoutDuration{0.2}; Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
客户端超时对于长时间运行的流式调用不太适用,因为很难为整个操作选择超时。因此,通常希望的是每次读取或写入的超时,以便如果操作没有取得进展,则操作失败。可以使用后台线程来调用定时器上的Cancel(),主线程在每次操作成功完成时重置定时器。有关完全解决的示例,请参见Cookbook。
注意
已有一个用于每次写入/读取超时而不是每次调用超时的长期待定的票证(ARROW-6062),但这很难实现。使用阻塞gRPC API。
Alternative Transports
Arrow Flight的标准传输是gRPC。C++实现还支持基于UCX的传输,但仅作为实验性功能。要使用它,请在启动服务器或创建客户端时使用协议方案ucx:。
UCX传输gRPC传输的所有功能都不受支持。有关详细信息,请参阅Flight RPC。此外,请注意以下特定注意事项:
服务器为每个客户端创建一个独立的UCP工作线程。这消耗更多资源,但提供更好的吞吐量。
客户端为每个RPC调用创建一个独立的UCP工作线程。同样,这是在性能和资源消耗之间的权衡。这也意味着与gRPC不同,使用单个客户端或使用多个客户端基本上是等效的。
UCX传输会尽量避免复制数据。在某些情况下,它可以直接重用由UCX分配的缓冲区以支持arrow::Buffer对象,然而,这也将延长关联的UCX资源的生命周期,超出了Flight客户端或服务器对象的生命周期。
根据UCX自身选择的传输,您可能会发现将UCX_MM_SEG_SIZE从默认值(约8KB左右)增加到约60KB可以提高性能(UCX会在单次调用中复制更多数据)。