arrow系列13---内存管理(Memory Management)
作者:yunjinqi   类别:    日期:2023-10-15 20:06:25    阅读:498 次   消耗积分:0 分    

缓冲区 

为了避免传递具有不同和不明显的生命周期规则的原始数据指针,Arrow提供了一个称为arrow::Buffer的通用抽象。缓冲区封装了指针和数据大小,并通常将其生命周期与底层提供者的生命周期绑定(换句话说,缓冲区应始终指向有效的内存,直到被销毁)。缓冲区是无类型的:它们只表示物理内存区域,而不考虑其预期的含义或解释。

缓冲区可以由Arrow本身分配,也可以由第三方例程分配。例如,可以将Python字节字符串的数据作为Arrow缓冲区传递,以保持Python对象的存活。

此外,缓冲区有各种不同的类型:可变或不可变,可调整大小或不可调整大小。通常,在构建数据片段时,您将持有可变缓冲区,然后将其冻结为不可变容器,例如数组。

注意

某些缓冲区可能指向非CPU内存,例如由CUDA上下文提供的GPU支持内存。如果您正在编写一个GPU感知应用程序,需要小心不要将GPU内存指针解释为可由CPU访问的指针,反之亦然。


访问缓冲区内存 

通过size()和data()访问器(或mutable_data()以可写方式访问可变缓冲区)可以快速访问底层内存。

切片 

可以创建零拷贝的缓冲区切片,以获取指向底层数据的某个连续子集的缓冲区。可以通过调用arrow::SliceBuffer()和arrow::SliceMutableBuffer()函数来实现这一点。

分配缓冲区 

可以通过调用arrow::AllocateBuffer()或arrow::AllocateResizableBuffer()的重载之一来自己分配缓冲区:

arrow::Result<std::unique_ptr<Buffer>> maybe_buffer = arrow::AllocateBuffer(4096);
if (!maybe_buffer.ok()) {
   // ... 处理分配错误
}

std::shared_ptr<arrow::Buffer> buffer = *std::move(maybe_buffer);
uint8_t* buffer_data = buffer->mutable_data();
memcpy(buffer_data, "hello world", 11);

以这种方式分配缓冲区可以确保它是64字节对齐的,并按照Arrow内存规范建议进行了填充。


构建缓冲区 

还可以使用arrow::BufferBuilder API来逐步分配和构建缓冲区:

BufferBuilder builder;
builder.Resize(11);  // 为11字节预留足够的空间
builder.Append("hello ", 6);
builder.Append("world", 5);

auto maybe_buffer = builder.Finish();
if (!maybe_buffer.ok()) {
   // ... 处理缓冲区分配错误
}
std::shared_ptr<arrow::Buffer> buffer = *maybe_buffer;

如果缓冲区用于包含给定的固定宽度类型的值(例如List数组的32位偏移量),则使用模板arrow::TypedBufferBuilder API可能更方便:

TypedBufferBuilder<int32_t> builder;
builder.Reserve(2);  // 为两个int32_t值预留足够的空间
builder.Append(0x12345678);
builder.Append(-0x765643210);

auto maybe_buffer = builder.Finish();
if (!maybe_buffer.ok()) {
   // ... 处理缓冲区分配错误
}
std::shared_ptr<arrow::Buffer> buffer = *maybe_buffer;

内存池 

在使用Arrow C++ API分配缓冲区时,缓冲区的底层内存由arrow::MemoryPool实例分配。通常情况下,这将是进程范围内的默认内存池,但许多Arrow API允许您为其内部分配传递另一个MemoryPool实例。

内存池用于大型长寿命数据,例如数组缓冲区。其他数据,如小型C++对象和临时工作区,通常使用常规的C++分配器。

默认内存池 

默认内存池取决于Arrow C++编译方式:

如果在编译时启用,使用jemalloc堆; 否则,如果在编译时启用,使用mimalloc堆; 否则,使用C库malloc堆。

覆盖默认内存池 

可以通过设置ARROW_DEFAULT_MEMORY_POOL环境变量来覆盖上述选择算法。

STL集成 

如果您希望使用Arrow内存池来分配STL容器的数据,可以使用arrow::stl::allocator包装器来实现。

另外,您还可以使用STL分配器来分配Arrow内存,使用arrow::stl::STLMemoryPool类。但是,这可能性能较差,因为STL分配器不提供调整大小的操作。

设备 

许多Arrow应用程序仅访问主机(CPU)内存。但是,在某些情况下,也希望处理设备内存(例如GPU上的内存)以及主机内存。

Arrow使用arrow::Device抽象表示CPU和其他设备。相关的类arrow::MemoryManager指定如何在给定设备上分配内存。每个设备都有一个默认的内存管理器,但也可以构建其他实例(例如,包装自定义的arrow::MemoryPool在CPU上)。arrow::MemoryManager实例指定如何在给定设备上分配内存(例如,在CPU上使用特定的arrow::MemoryPool)。

设备无关编程 

如果从第三方代码中接收到缓冲区,可以通过调用其is_cpu()方法来查询是否可以由CPU读取。

此外,您还可以以通用方式在给定设备上查看缓冲区,通过调用arrow::Buffer::View()或arrow::Buffer::ViewOrCopy()。如果源设备和目标设备相同,这将是一个无操作。否则,将尝试构建目标设备的内存地址,以便访问缓冲区内容。实际的设备到设备的传输可能会在读取缓冲区内容时延迟进行。

类似地,如果要在不假定有CPU可读取缓冲区的情况下进行I/O操作,可以调用arrow::Buffer::GetReader()和arrow::Buffer::GetWriter()。

例如,要获取任意缓冲区的CPU视图或副本,可以简单地执行:

std::shared_ptr<arrow::Buffer> arbitrary_buffer = ... ;
std::shared_ptr<arrow::Buffer> cpu_buffer = arrow::Buffer::ViewOrCopy(
   arbitrary_buffer, arrow::default_cpu_memory_manager());

内存分析 

在Linux上,可以使用perf record生成内存分配的详细配置文件,而不需要修改二进制文件。这些配置文件还可以显示回溯,以及分配大小。这需要调试符号,可以来自调试构建或具有调试符号的发布构建。

注意

如果在另一个平台上对Arrow的测试进行性能分析,可以使用Archery运行以下Docker容器以访问Linux环境:

archery docker run ubuntu-cpp bash
# Inside the Docker container...
/arrow/ci/scripts/cpp_build.sh /arrow /build
cd build/cpp/debug
./arrow-array-test # Run a test
apt-get update
apt-get install -y linux-tools-generic
alias perf=/usr/lib/linux-tools/<version-path>/perf

要跟踪分配,可以在所使用的每个分配器方法上创建探针点。收集$params允许我们记录所请求的分配大小,而收集$retval允许我们记录已分配的地址,以便可以将其与free/de-allocate调用相关联。

jemalloc
perf probe -x libarrow.so je_arrow_mallocx '$params'
perf probe -x libarrow.so je_arrow_mallocx%return '$retval'
perf probe -x libarrow.so je_arrow_rallocx '$params'
perf probe -x libarrow.so je_arrow_rallocx%return '$retval'
perf probe -x libarrow.so je_arrow_dallocx '$params'
PROBE_ARGS="-e probe_libarrow:je_arrow_mallocx \
   -e probe_libarrow:je_arrow_mallocx__return \
   -e probe_libarrow:je_arrow_rallocx \
   -e probe_libarrow:je_arrow_rallocx__return \
   -e probe_libarrow:je_arrow_dallocx"


一旦设置了探针,就可以使用perf record记录带有关联回溯的调用。在此示例中,我们正在运行Arrow中的StructArray单元测试:

perf record -g --call-graph dwarf \  $PROBE_ARGS \  ./arrow-array-test --gtest_filter=StructArray*

如果要对运行中的进程进行性能分析,可以运行perf record -p <PID>,它将一直记录,直到使用CTRL+C中断。或者,您可以运行perf record -P <PID> sleep 10,以记录10秒钟。 生成的数据可以使用标准工具进行处理,以处理perf,或者可以使用perf script将数据的文本格式传输到自定义脚本中。以下脚本解析perf脚本输出并以新行分隔的JSON格式打印输出,以便更容易处理。

process_perf_events.py

import sys
import re
import json

# Example non-traceback line
# arrow-array-tes 14344 [003]  7501.073802: probe_libarrow:je_arrow_mallocx: (7fbcd20bb640) size=0x80 flags=6

current = {}
current_traceback = ''

def new_row():
    global current_traceback
    current['traceback'] = current_traceback
    print(json.dumps(current))
    current_traceback = ''

for line in sys.stdin:
    if line == '\n':
        continue
    elif line[0] == '\t':
        # traceback line
        current_traceback += line.strip("\t")
    else:
        line = line.rstrip('\n')
        if not len(current) == 0:
            new_row()
        parts = re.sub(' +', ' ', line).split(' ')

        parts.reverse()
        parts.pop() # file
        parts.pop() # "14344"
        parts.pop() # "[003]"

        current['time'] = float(parts.pop().rstrip(":"))
        current['event'] = parts.pop().rstrip(":")

        parts.pop() # (7fbcd20bddf0)
        if parts[-1] == "<-":
            parts.pop()
            parts.pop()

        params = {}

        for pair in parts:
            key, value = pair.split("=")
            params[key] = value

        current['params'] = params

以下是该脚本的示例调用以及输出数据的预览:

perf script | python3 /arrow/process_perf_events.py > processed_events.jsonl
head processed_events.jsonl | cut -c -120


输出的JSON数据可以进一步处理,以回答许多问题。例如,以下脚本将查找未释放的分配,以及与悬空分配的回溯及悬空分配的计数:

count_tracebacks.py

'''Find tracebacks of allocations with no corresponding free'''
import sys
import json
from collections import defaultdict

allocated = dict()

for line in sys.stdin:
    line = line.rstrip('\n')
    data = json.loads(line)

    if data['event'] == "probe_libarrow:je_arrow_mallocx__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:je_arrow_rallocx":
        address = data['params']['ptr']
        del allocated[address]
    elif data['event'] == "probe_libarrow:je_arrow_rallocx__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:je_arrow_dallocx":
        address = data['params']['ptr']
        if address in allocated:
            del allocated[address]
    elif data['event'] == "probe_libarrow:mi_malloc_aligned__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:mi_realloc_aligned":
        address = data['params']['p']
        del allocated[address]
    elif data['event'] == "probe_libarrow:mi_realloc_aligned__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:mi_free":
        address = data['params']['p']
        if address in allocated:
            del allocated[address]

traceback_counts = defaultdict(int)

for traceback in allocated.values():
    traceback_counts[traceback] += 1

for traceback, count in sorted(traceback_counts.items(), key=lambda x: -x[1]):
    print("Num of dangling allocations:", count)
    print(traceback)


可以像这样调用脚本:

cat processed_events.jsonl | python3 /arrow/count_tracebacks.py


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/346
上一篇:arrow系列12---arrow的组成部分(High-Level Overview)
下一篇:arrow系列14---数组(Arrays)