缓冲区
为了避免传递具有不同和不明显的生命周期规则的原始数据指针,Arrow提供了一个称为arrow::Buffer的通用抽象。缓冲区封装了指针和数据大小,并通常将其生命周期与底层提供者的生命周期绑定(换句话说,缓冲区应始终指向有效的内存,直到被销毁)。缓冲区是无类型的:它们只表示物理内存区域,而不考虑其预期的含义或解释。
缓冲区可以由Arrow本身分配,也可以由第三方例程分配。例如,可以将Python字节字符串的数据作为Arrow缓冲区传递,以保持Python对象的存活。
此外,缓冲区有各种不同的类型:可变或不可变,可调整大小或不可调整大小。通常,在构建数据片段时,您将持有可变缓冲区,然后将其冻结为不可变容器,例如数组。
注意
某些缓冲区可能指向非CPU内存,例如由CUDA上下文提供的GPU支持内存。如果您正在编写一个GPU感知应用程序,需要小心不要将GPU内存指针解释为可由CPU访问的指针,反之亦然。
访问缓冲区内存
通过size()和data()访问器(或mutable_data()以可写方式访问可变缓冲区)可以快速访问底层内存。
切片
可以创建零拷贝的缓冲区切片,以获取指向底层数据的某个连续子集的缓冲区。可以通过调用arrow::SliceBuffer()和arrow::SliceMutableBuffer()函数来实现这一点。
分配缓冲区
可以通过调用arrow::AllocateBuffer()或arrow::AllocateResizableBuffer()的重载之一来自己分配缓冲区:
arrow::Result<std::unique_ptr<Buffer>> maybe_buffer = arrow::AllocateBuffer(4096); if (!maybe_buffer.ok()) { // ... 处理分配错误 } std::shared_ptr<arrow::Buffer> buffer = *std::move(maybe_buffer); uint8_t* buffer_data = buffer->mutable_data(); memcpy(buffer_data, "hello world", 11);
以这种方式分配缓冲区可以确保它是64字节对齐的,并按照Arrow内存规范建议进行了填充。
构建缓冲区
还可以使用arrow::BufferBuilder API来逐步分配和构建缓冲区:
BufferBuilder builder; builder.Resize(11); // 为11字节预留足够的空间 builder.Append("hello ", 6); builder.Append("world", 5); auto maybe_buffer = builder.Finish(); if (!maybe_buffer.ok()) { // ... 处理缓冲区分配错误 } std::shared_ptr<arrow::Buffer> buffer = *maybe_buffer;
如果缓冲区用于包含给定的固定宽度类型的值(例如List数组的32位偏移量),则使用模板arrow::TypedBufferBuilder API可能更方便:
TypedBufferBuilder<int32_t> builder; builder.Reserve(2); // 为两个int32_t值预留足够的空间 builder.Append(0x12345678); builder.Append(-0x765643210); auto maybe_buffer = builder.Finish(); if (!maybe_buffer.ok()) { // ... 处理缓冲区分配错误 } std::shared_ptr<arrow::Buffer> buffer = *maybe_buffer;
内存池
在使用Arrow C++ API分配缓冲区时,缓冲区的底层内存由arrow::MemoryPool实例分配。通常情况下,这将是进程范围内的默认内存池,但许多Arrow API允许您为其内部分配传递另一个MemoryPool实例。
内存池用于大型长寿命数据,例如数组缓冲区。其他数据,如小型C++对象和临时工作区,通常使用常规的C++分配器。
默认内存池
默认内存池取决于Arrow C++编译方式:
如果在编译时启用,使用jemalloc堆; 否则,如果在编译时启用,使用mimalloc堆; 否则,使用C库malloc堆。
覆盖默认内存池
可以通过设置ARROW_DEFAULT_MEMORY_POOL环境变量来覆盖上述选择算法。
STL集成
如果您希望使用Arrow内存池来分配STL容器的数据,可以使用arrow::stl::allocator包装器来实现。
另外,您还可以使用STL分配器来分配Arrow内存,使用arrow::stl::STLMemoryPool类。但是,这可能性能较差,因为STL分配器不提供调整大小的操作。
设备
许多Arrow应用程序仅访问主机(CPU)内存。但是,在某些情况下,也希望处理设备内存(例如GPU上的内存)以及主机内存。
Arrow使用arrow::Device抽象表示CPU和其他设备。相关的类arrow::MemoryManager指定如何在给定设备上分配内存。每个设备都有一个默认的内存管理器,但也可以构建其他实例(例如,包装自定义的arrow::MemoryPool在CPU上)。arrow::MemoryManager实例指定如何在给定设备上分配内存(例如,在CPU上使用特定的arrow::MemoryPool)。
设备无关编程
如果从第三方代码中接收到缓冲区,可以通过调用其is_cpu()方法来查询是否可以由CPU读取。
此外,您还可以以通用方式在给定设备上查看缓冲区,通过调用arrow::Buffer::View()或arrow::Buffer::ViewOrCopy()。如果源设备和目标设备相同,这将是一个无操作。否则,将尝试构建目标设备的内存地址,以便访问缓冲区内容。实际的设备到设备的传输可能会在读取缓冲区内容时延迟进行。
类似地,如果要在不假定有CPU可读取缓冲区的情况下进行I/O操作,可以调用arrow::Buffer::GetReader()和arrow::Buffer::GetWriter()。
例如,要获取任意缓冲区的CPU视图或副本,可以简单地执行:
std::shared_ptr<arrow::Buffer> arbitrary_buffer = ... ; std::shared_ptr<arrow::Buffer> cpu_buffer = arrow::Buffer::ViewOrCopy( arbitrary_buffer, arrow::default_cpu_memory_manager());
内存分析
在Linux上,可以使用perf record生成内存分配的详细配置文件,而不需要修改二进制文件。这些配置文件还可以显示回溯,以及分配大小。这需要调试符号,可以来自调试构建或具有调试符号的发布构建。
注意
如果在另一个平台上对Arrow的测试进行性能分析,可以使用Archery运行以下Docker容器以访问Linux环境:
archery docker run ubuntu-cpp bash # Inside the Docker container... /arrow/ci/scripts/cpp_build.sh /arrow /build cd build/cpp/debug ./arrow-array-test # Run a test apt-get update apt-get install -y linux-tools-generic alias perf=/usr/lib/linux-tools/<version-path>/perf
要跟踪分配,可以在所使用的每个分配器方法上创建探针点。收集$params允许我们记录所请求的分配大小,而收集$retval允许我们记录已分配的地址,以便可以将其与free/de-allocate调用相关联。
jemalloc
perf probe -x libarrow.so je_arrow_mallocx '$params' perf probe -x libarrow.so je_arrow_mallocx%return '$retval' perf probe -x libarrow.so je_arrow_rallocx '$params' perf probe -x libarrow.so je_arrow_rallocx%return '$retval' perf probe -x libarrow.so je_arrow_dallocx '$params' PROBE_ARGS="-e probe_libarrow:je_arrow_mallocx \ -e probe_libarrow:je_arrow_mallocx__return \ -e probe_libarrow:je_arrow_rallocx \ -e probe_libarrow:je_arrow_rallocx__return \ -e probe_libarrow:je_arrow_dallocx"
一旦设置了探针,就可以使用perf record记录带有关联回溯的调用。在此示例中,我们正在运行Arrow中的StructArray单元测试:
perf record -g --call-graph dwarf \ $PROBE_ARGS \ ./arrow-array-test --gtest_filter=StructArray*
如果要对运行中的进程进行性能分析,可以运行perf record -p <PID>,它将一直记录,直到使用CTRL+C中断。或者,您可以运行perf record -P <PID> sleep 10,以记录10秒钟。 生成的数据可以使用标准工具进行处理,以处理perf,或者可以使用perf script将数据的文本格式传输到自定义脚本中。以下脚本解析perf脚本输出并以新行分隔的JSON格式打印输出,以便更容易处理。
process_perf_events.py
import sys import re import json # Example non-traceback line # arrow-array-tes 14344 [003] 7501.073802: probe_libarrow:je_arrow_mallocx: (7fbcd20bb640) size=0x80 flags=6 current = {} current_traceback = '' def new_row(): global current_traceback current['traceback'] = current_traceback print(json.dumps(current)) current_traceback = '' for line in sys.stdin: if line == '\n': continue elif line[0] == '\t': # traceback line current_traceback += line.strip("\t") else: line = line.rstrip('\n') if not len(current) == 0: new_row() parts = re.sub(' +', ' ', line).split(' ') parts.reverse() parts.pop() # file parts.pop() # "14344" parts.pop() # "[003]" current['time'] = float(parts.pop().rstrip(":")) current['event'] = parts.pop().rstrip(":") parts.pop() # (7fbcd20bddf0) if parts[-1] == "<-": parts.pop() parts.pop() params = {} for pair in parts: key, value = pair.split("=") params[key] = value current['params'] = params
以下是该脚本的示例调用以及输出数据的预览:
perf script | python3 /arrow/process_perf_events.py > processed_events.jsonl head processed_events.jsonl | cut -c -120
输出的JSON数据可以进一步处理,以回答许多问题。例如,以下脚本将查找未释放的分配,以及与悬空分配的回溯及悬空分配的计数:
count_tracebacks.py
'''Find tracebacks of allocations with no corresponding free''' import sys import json from collections import defaultdict allocated = dict() for line in sys.stdin: line = line.rstrip('\n') data = json.loads(line) if data['event'] == "probe_libarrow:je_arrow_mallocx__return": address = data['params']['arg1'] allocated[address] = data['traceback'] elif data['event'] == "probe_libarrow:je_arrow_rallocx": address = data['params']['ptr'] del allocated[address] elif data['event'] == "probe_libarrow:je_arrow_rallocx__return": address = data['params']['arg1'] allocated[address] = data['traceback'] elif data['event'] == "probe_libarrow:je_arrow_dallocx": address = data['params']['ptr'] if address in allocated: del allocated[address] elif data['event'] == "probe_libarrow:mi_malloc_aligned__return": address = data['params']['arg1'] allocated[address] = data['traceback'] elif data['event'] == "probe_libarrow:mi_realloc_aligned": address = data['params']['p'] del allocated[address] elif data['event'] == "probe_libarrow:mi_realloc_aligned__return": address = data['params']['arg1'] allocated[address] = data['traceback'] elif data['event'] == "probe_libarrow:mi_free": address = data['params']['p'] if address in allocated: del allocated[address] traceback_counts = defaultdict(int) for traceback in allocated.values(): traceback_counts[traceback] += 1 for traceback, count in sorted(traceback_counts.items(), key=lambda x: -x[1]): print("Num of dangling allocations:", count) print(traceback)
可以像这样调用脚本:
cat processed_events.jsonl | python3 /arrow/count_tracebacks.py