0x00 前言
本文主要介绍eBPF应用开发涉及到的数据结构,基于内核版本6.6.47
BPF Map 是 eBPF 程序与用户态程序、以及 eBPF 程序之间共享数据的核心机制。Map 本质上是驻留在内核空间的key-value存储,通过 bpf_xxxx 系统调用进行创建和操作。不同类型的 Map 适用于不同场景,选择合适的 Map 类型对程序性能至关重要
0x01 BPF Hash Map
数据结构概览
数据结构设计如下:
+--------------------+
| struct bpf_map map |--> general BPF map (metadata)
|--------------------|
| struct bucket * |--> bucket linked-list
|--------------------|
| void *elems |--> elements (hash+key+value), link-listed
|--------------------|
| |
|--------------------|
| count, n_buckets, |--> hash map metadata
| elem_size, hashrnd |
+--------------------+
内核表示 BPF map 的结构体 struct bpf_map 是不区分 map 类型的,因此 hash map 在 BPF map 之上又封装了一层,即 struct bpf_htab,表示一个内核 hash map。Hash map 又主要分为两部分:
Buckets:对key进行哈希之后找到对应的buckets,但这里存放的只是buckets链表和锁等元数据,不存放数据Elements:即真正需要存放的数据,也组织成链表
内核核心结构
struct bpf_htab是内核哈希表的实现,内核 bpf map struct bpf_map 结构记录的只是通用 map 元数据(不区分 map 类型)。对于 hash map 来说,真正的数据是存储在哈希表 struct bpf_htab 里。以下是内核 kernel/bpf/hashtab.c 中的核心结构体定义
struct bucket {
struct hlist_nulls_head head;
raw_spinlock_t raw_lock;
};
// 哈希表(hash table)数据结构
struct bpf_htab {
struct bpf_map map; // 这个 hash map 对应的 BPF map(元数据)
struct bpf_mem_alloc ma;
struct bpf_mem_alloc pcpu_ma;
struct bucket *buckets; // 链表 + 锁
void *elems; // 每个 elem 的结构:struct htab_elem + key + value
union {
struct pcpu_freelist freelist;
struct bpf_lru lru; //LRU实现
};
struct htab_elem *__percpu *extra_elems;
struct percpu_counter pcount;
atomic_t count; // element 数量
u32 n_buckets; // buckets 数量
u32 elem_size; // element 大小,单位 bytes
u32 hashrnd; // 哈希随机数
struct lock_class_key lockdep_key;
int __percpu *map_locked[HASHTAB_MAP_LOCK_COUNT];
};
struct htab_elem {
union {
struct hlist_nulls_node hash_node;
struct {
void *padding;
union {
struct bpf_htab *htab;
struct pcpu_freelist_node fnode;
struct htab_elem *batch_flink;
};
};
};
union {
void *ptr_to_pptr;
struct bpf_lru_node lru_node;
};
u32 hash;
char key[] __aligned(8);
};
Hash Map 的内部结构可以用以下关系图表示:
graph TD
BpfHtab["bpf_htab"] --> BpfMap["bpf_map (metadata)"]
BpfHtab --> Buckets["buckets array"]
BpfHtab --> Elems["pre-allocated elements"]
BpfHtab --> Meta["count, n_buckets,\nelem_size, hashrnd"]
Buckets --> B0["bucket 0\n(hlist_nulls_head + raw_spinlock)"]
Buckets --> B1["bucket 1"]
Buckets --> Bn["bucket n"]
B0 --> E1["htab_elem\n(hash + key + value)"]
E1 --> E2["htab_elem\n(next in chain)"]
E2 --> Null["NULL"]
关键要点:
- 每个
bucket包含一把自旋锁raw_spinlock_t,保护该 bucket 下的链表并发操作 htab_elem通过hlist_nulls_node链接到 bucket 的哈希链表中key[]是柔性数组成员(flexible array member),key 之后紧跟 value 数据- 对于 LRU 类型的 hash map,
htab_elem中会使用lru_node字段替代普通的 freelist 节点
从bpf_htab的定义可以看出,这个哈希表支持下面的所有 hash map 类型(percpu、lru 等等)
BPF_MAP_TYPE_HASH
BPF_MAP_TYPE_PERCPU_HASH
BPF_MAP_TYPE_LRU_HASH
BPF_MAP_TYPE_LRU_PERCPU_HASH
BPF_MAP_TYPE_HASH_OF_MAPS
此外,这个结构体里有三段内存区域,分别存储 bucket 信息、 elements(hash+key+value 等)信息和额外的 elements 信息:
+--+--+--+--+--+----------+
buckets | | | | | | ... |
+--+--+--+--+--+----------+
+-------+--------+--------+------------------------------+
elems | | | | ... |
+-------+--------+--------+------------------------------+
+-------+--------+-----------------+
extra_elemes | | | ... | 只有 8 个空间,普通 hash map 用;PERCPU、LRU map 不用
+-------+--------+-----------------+
1、struct bucket:哈希槽(链表,不存放实际数据)
Bucket 存放的只是链表和锁,并不存放实际数据
// kernel/bpf/hashtab.c
struct bucket {
struct hlist_nulls_head head;
union {
raw_spinlock_t raw_lock;
spinlock_t lock;
};
};
2、struct htab_elem:存放 hash+key+value 等数据(实际的存储数据),特别注意下面的字段
u32 hash:对 key 进行哈希之后得到的哈希值char key[]:指针,在初始化时会指向 key+value 的连续内存区域
struct htab_elem {
// 不同 hash map 类型特定的字段
union {
struct hlist_nulls_node hash_node; // 内核通用链表类型之一 hlist_nulls
struct {
void *padding;
union {
struct bpf_htab *htab;
struct pcpu_freelist_node fnode;
struct htab_elem *batch_flink;
};
};
};
union {
struct rcu_head rcu;
struct bpf_lru_node lru_node;
};
// 公共字段
u32 hash; // 哈希值
char key[] __aligned(8); // 指针,指向接下来的 key+value 数据
};
BPF_MAP_TYPE_HASH的操作函数实现
1、操作函数回调的注册
注册的操作方法(回调函数)集合如下
// kernel/bpf/hashtab.c
const struct bpf_map_ops htab_map_ops = {
.map_alloc_check = htab_map_alloc_check,
.map_alloc = htab_map_alloc,
.map_free = htab_map_free,
.map_get_next_key = htab_map_get_next_key,
.map_lookup_elem = htab_map_lookup_elem, // 查找
.map_update_elem = htab_map_update_elem, // 创建或更新
.map_delete_elem = htab_map_delete_elem, // 删除
.map_gen_lookup = htab_map_gen_lookup,
.map_seq_show_elem = htab_map_seq_show_elem,
BATCH_OPS(htab),
};
2、创建 map的函数:struct bpf_map *htab_map_alloc()
// kernel/bpf/hashtab.c
static struct bpf_map *htab_map_alloc(union bpf_attr *attr)
实现在 kernel/bpf/hashtab.c, 调用栈:
htab_map_alloc # kernel/bpf/hashtab.c
|-htab = kzalloc(sizeof(*htab), GFP_USER)
|-bpf_map_init_from_attr(&htab->map, attr) # 初始化 map 元数据;htab->map 就是 struct bpf_map
|
|-htab->n_buckets = ...
|-htab->elem_size = ...
|-bpf_map_charge_init(&htab->map.memory, cost) # 确保 map size 不会过大
|
| # 分配 bucket 内存
|-htab->buckets = bpf_map_area_alloc(size, numa_node) // kernel/bpf/syscall.c
| |-__bpf_map_area_alloc(size, numa_node, false)
| |-if condition
| | kmalloc_node(GFP_USER)
| |-else
| __vmalloc_node_range(GFP_KERNEL)
|
|-htab->hashrnd = ... # 初始化哈希种子
|
|-if (prealloc) { # 提前为所有 elements 分配内存
prealloc_init(htab);
|-htab->elems = bpf_map_area_alloc(elem_size*n_entries, numa_node);
|-per-cpu and lru initiazations if needed
# 分配 extra 内存
if (!percpu && !lru) // lru itself can remove the least used element, so
alloc_extra_elems(htab); // there is no need for an extra elem during map_update
|-__alloc_percpu_gfp(sizeof(struct htab_elem *), 8, GFP_USER);
}
从上面的流程易知,需要分配三段内存:
buckets空间,只是一个链表,里面存放了链表、锁等简单数据- 如果启用了预分配,为一次性为所有 elements 分配内存空间。预分配的优点是性能更高(不需要为每个 element 动态分配内存)
- extra_elems
3、查询 map:void *htab_map_lookup_elem()
- 根据 key 计算出一个哈希值
hash - 以 hash 作为数组索引,直接定位到对应的 bucket,
O(1) - 顺序遍历 bucket 内的
struct htab_elem元素,如果hash和key都相同,就返回对应的value其实地址;否则返回空。这里虽然是顺序遍历,但除非有哈希冲突,否则第一次就返回了
传入参数是 bpf map 和 key,返回的是 value 的起始地址
// kernel/bpf/hashtab.c
// 返回 value 起始地址
static void *htab_map_lookup_elem(struct bpf_map *map, void *key)
{
struct htab_elem *l = __htab_map_lookup_elem(map, key);
if (l)
// l->key 指向的是 key+value 的地址,因此 l->key + key_size 指向的才是 value,
// 此外还需要考虑 key_size 对其 8 字节,因此得到下面一行代码:
return l->key + round_up(map->key_size, 8);
return NULL;
}
static void *__htab_map_lookup_elem(struct bpf_map *map, void *key)
{
struct bpf_htab *htab = container_of(map, struct bpf_htab, map);
hash = htab_map_hash(key, key_size, htab->hashrnd);
head = select_bucket(htab, hash); // 以 hash 作为数组索引,直接定位到 bucket 起始地址,返回 bucket 内的链表头指针
// 简化之后:head = htab->buckets[hash]->head
return lookup_nulls_elem_raw(head, hash, key, key_size, htab->n_buckets);
}
// can be called without bucket lock. it will repeat the loop in
// the unlikely event when elements moved from one bucket into another while link list is being walked
static struct htab_elem *
lookup_nulls_elem_raw(struct hlist_nulls_head *head, u32 hash, void *key, u32 key_size, u32 n_buckets)
{
struct hlist_nulls_node *n; // 通用链表类型之一,与普通链表的区别是最后一个元素不是 NULL 指针,而是 'nulls' 元素
struct htab_elem *l;
again:
// 顺序遍历 head 指向的(bucket)链表。
hlist_nulls_for_each_entry_rcu(l, n, head, hash_node)
if (l->hash == hash && !memcmp(&l->key, key, key_size)) // 哈希值和 key 都相同
return l;
// 为便于理解,以上代码展开和简化之后等价于下面的(伪)代码
// for (n=head; n!=nulls && l=(struct htab_elem *)n->hash_node; n=n->next)
// if (l->hash == hash && !memcmp(&l->key, key, key_size)) // 哈希值和 key 都相同
// return l;
if (get_nulls_value(n) != (hash & (n_buckets - 1)))
goto again;
return NULL;
}
4、插入或更新 map:int htab_map_update_elem(),插入和更新都是执行此函数,返回值0表示succ,其他对应错误码
调用栈
htab_map_update_elem
|-hash = htab_map_hash(key, key_size, htab->hashrnd)
|-b = __select_bucket(htab, hash)
|-l_old = lookup_nulls_elem_raw(head, hash, key, key_size, htab->n_buckets)
|
|-if l_old
| copy_map_value_locked(map, l_old->key + round_up(key_size, 8), value, false)
| return 0
|
|-l_old = lookup_elem_raw(head, hash, key, key_size);
|-check_flags(htab, l_old, map_flags);
|-l_new = alloc_htab_elem(htab, key, value, key_size, hash, false, false, l_old);
| |-if prealloc
| | ...
| |-else
| | atomic_inc_return(&htab->count)
| | l_new = kmalloc_node()
| |
| |-memcpy(l_new->key, key, key_size);
| |-copy_map_value(&htab->map, l_new->key + round_up(key_size, 8), value);
| |-l_new->hash = hash;
| |-return l_new
|
|-hlist_nulls_add_head_rcu(&l_new->hash_node, head);
|
|-if l_old
| hlist_nulls_del_rcu(&l_old->hash_node);
| if (!htab_is_prealloc(htab))
| free_htab_elem(htab, l_old);
|-return 0
实现代码如下:
// kernel/bpf/hashtab.c
/* Called from syscall or from eBPF program */
static int htab_map_update_elem(struct bpf_map *map, void *key, void *value, u64 map_flags)
{
struct bpf_htab *htab = container_of(map, struct bpf_htab, map);
struct htab_elem *l_new = NULL, *l_old;
key_size = map->key_size;
hash = htab_map_hash(key, key_size, htab->hashrnd); // 根据 key 计算出一个哈希值
struct bucket *b = __select_bucket(htab, hash);
head = &b->head;
if (unlikely(map_flags & BPF_F_LOCK)) { // BPF_F_LOCK: spin_lock-ed map_lookup/map_update, defined in include/uapi/linux/bpf.h
/* find an element without taking the bucket lock */
l_old = lookup_nulls_elem_raw(head, hash, key, key_size, htab->n_buckets);
ret = check_flags(htab, l_old, map_flags);
if (ret)
return ret;
if (l_old) { // 如果已经存在:获取 elem lock,然后原地更新 value
copy_map_value_locked(map, l_old->key + round_up(key_size, 8), value, false);
return 0;
}
}
// 至此,确认老记录不存在。
// 接下来:获取 bucket lock 然后再查询一次。99.9% 的概率仍然是查不到,但这一步必须做。
flags = htab_lock_bucket(htab, b);
l_old = lookup_elem_raw(head, hash, key, key_size);
check_flags(htab, l_old, map_flags);
if (unlikely(l_old && (map_flags & BPF_F_LOCK))) {
// first lookup without the bucket lock didn't find the element,
// but second lookup with the bucket lock found it.
// This case is highly unlikely, but has to be dealt with:
// grab the element lock in addition to the bucket lock and update element in place
copy_map_value_locked(map, l_old->key + round_up(key_size, 8), value, false);
ret = 0;
goto err;
}
// 分配新 element。内部:根据 map 是否是 prealloc 模式,处理逻辑会有所不同
l_new = alloc_htab_elem(htab, key, value, key_size, hash, false, false, l_old);
// 插到链表头,so that concurrent search will find it before old elem */
hlist_nulls_add_head_rcu(&l_new->hash_node, head);
// 如果老的还在,删掉
if (l_old) {
hlist_nulls_del_rcu(&l_old->hash_node);
if (!htab_is_prealloc(htab))
free_htab_elem(htab, l_old);
}
ret = 0;
err:
htab_unlock_bucket(htab, b, flags);
return ret;
}
static struct htab_elem *
alloc_htab_elem(struct bpf_htab *htab, void *key, void *value, u32 key_size, u32 hash,
bool percpu, bool onallcpus, struct htab_elem *old_elem)
{
u32 size = htab->map.value_size;
bool prealloc = htab_is_prealloc(htab);
struct htab_elem *l_new, **pl_new;
void __percpu *pptr;
if (prealloc) {
if (old_elem) {
pl_new = this_cpu_ptr(htab->extra_elems);
l_new = *pl_new;
htab_put_fd_value(htab, old_elem);
*pl_new = old_elem;
} else {
struct pcpu_freelist_node *l;
l = __pcpu_freelist_pop(&htab->freelist);
l_new = container_of(l, struct htab_elem, fnode);
}
} else {
if (atomic_inc_return(&htab->count) > htab->map.max_entries)
if (!old_elem) {
l_new = ERR_PTR(-E2BIG);
goto dec_count;
}
l_new = kmalloc_node(htab->elem_size, GFP_ATOMIC, htab->map.numa_node);
check_and_init_map_lock(&htab->map, l_new->key + round_up(key_size, 8));
}
memcpy(l_new->key, key, key_size);
if (percpu) {
...
} else if (fd_htab_map_needs_adjust(htab)) {
size = round_up(size, 8);
memcpy(l_new->key + round_up(key_size, 8), value, size);
} else {
copy_map_value(&htab->map, l_new->key + round_up(key_size, 8), value);
}
l_new->hash = hash;
return l_new;
dec_count:
atomic_dec(&htab->count);
return l_new;
}
5、获取下一个 key:int htab_map_get_next_key(map, key, void *next_key),通常用于用户态遍历map
获取第一个 key:只能顺序遍历 buckets 及每个 buckets 内的各元素
获取非第一个 key:
- 根据 key 计算一个哈希值
hash - 以
hash作为数组索引,直接定位到对应的 bucket - 在该 bucket 内查找给定 key;如果找到,按顺序返回下一个 key
需要注意的是,下一个 key 可能在当前 bucket,也可能在下一个 bucket
// kernel/bpf/hashtab.c
/* Called from syscall */
static int htab_map_get_next_key(struct bpf_map *map, void *key, void *next_key)
{
struct bpf_htab *htab = container_of(map, struct bpf_htab, map);
struct htab_elem *l, *next_l;
key_size = map->key_size;
if (!key)
goto find_first_elem;
hash = htab_map_hash(key, key_size, htab->hashrnd);
head = select_bucket(htab, hash);
/* lookup the key */
l = lookup_nulls_elem_raw(head, hash, key, key_size, htab->n_buckets);
if (!l)
goto find_first_elem;
/* key was found, get next key in the same bucket */
next_l = hlist_nulls_entry_safe(hlist_nulls_next_rcu(&l->hash_node), struct htab_elem, hash_node);
if (next_l) { // if next elem in this hash list is non-zero, just return it
memcpy(next_key, next_l->key, key_size);
return 0;
}
/* no more elements in this hash list, go to the next bucket */
i = hash & (htab->n_buckets - 1);
i++;
find_first_elem:
for (; i < htab->n_buckets; i++) { // iterate over buckets
head = select_bucket(htab, i);
/* pick first element in the bucket */
next_l = hlist_nulls_entry_safe(hlist_nulls_first_rcu(head), struct htab_elem, hash_node);
if (next_l) { // if it's not empty, just return it
memcpy(next_key, next_l->key, key_size);
return 0;
}
}
/* iterated over all buckets and all elements */
return -ENOENT;
}
PERCPU VS NON-PERCPU
| 特性 | NON-PERCPU(普通) | PERCPU |
|---|---|---|
| 数据存储 | 所有 CPU 共享同一份数据 | 每个 CPU 各自维护一份独立的数据副本 |
| 并发访问 | 需要锁保护(bucket-level spinlock) | 无锁访问,每个 CPU 操作自己的副本 |
| 性能 | 高并发写入时锁争用较大 | 高吞吐,无锁竞争开销 |
| 内存开销 | 仅一份数据 | nr_cpus 份数据,内存随 CPU 数增长 |
| 用户态读取 | 直接读取 value | 需要汇总所有 CPU 的副本(sum/merge) |
| 典型场景 | 连接表、配置下发、进程信息记录 | 流量统计、计数器、直方图、延迟采样 |
PERCPU 类型适用于写多读少且各 CPU 独立统计的场景(如包计数、延迟直方图),可以完全消除锁争用。代价是用户态需要额外汇总各 CPU 的值,并且内存占用与 CPU 核数成正比
PERCPU的Lookup实现(坑)
基于cilium/ebpf的Lookup方法,需要注意 PERCPU map 在用户态的 value 大小计算:每个 CPU 的 value 按 8 字节对齐后,乘以可用 CPU 数量,得到 fullValueSize
// newMap allocates and returns a new Map structure.
// Sets the fullValueSize on per-CPU maps.
func newMap(fd *sys.FD, name string, typ MapType, keySize, valueSize, maxEntries, flags uint32) (*Map, error) {
m := &Map{
name,
fd,
typ,
keySize,
valueSize,
maxEntries,
flags,
"",
int(valueSize),
}
if !typ.hasPerCPUValue() {
return m, nil
}
possibleCPUs, err := PossibleCPU()
if err != nil {
return nil, err
}
m.fullValueSize = int(internal.Align(valueSize, 8)) * possibleCPUs
return m, nil
}
0x02 常用的MAP结构与使用注意点
BPF_MAP_TYPE_HASH
BPF_MAP_TYPE_HASH 是最基础的哈希表类型,初始化时需要指定支持的最大条目数(max_entries),满了之后继续插入数据时会返回 -E2BIG 错误。内核代码位于 kernel/bpf/hashtab.c
核心特性:
- 支持
bpf_map_lookup_elem()、bpf_map_update_elem()、bpf_map_delete_elem()等操作 - 默认执行内存预分配(pre-alloc),可通过
BPF_F_NO_PREALLOC标志关闭 - Lookup / Update / Delete 平均时间复杂度
O(1) - bucket 级别自旋锁保护并发安全
典型应用场景:
- 内核态数据传递给用户态(如流量统计信息)
- 进程跟踪表(以 PID/TGID 为 key)
- 配置参数下发(用户态写入,内核态读取)
与 BPF_MAP_TYPE_LRU_HASH 的核心区别在于:HASH 满后 insert 失败返回 -E2BIG,LRU_HASH 满后自动淘汰最久未使用的 entry
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 10240);
__type(key, u32);
__type(value, u64);
} my_hash_map SEC(".maps");
SEC("kprobe/do_sys_openat2")
int trace_openat(struct pt_regs *ctx)
{
u32 pid = bpf_get_current_pid_tgid() >> 32;
u64 ts = bpf_ktime_get_ns();
bpf_map_update_elem(&my_hash_map, &pid, &ts, BPF_ANY);
return 0;
}
BPF_MAP_TYPE_LRU_HASH
BPF_MAP_TYPE_LRU_HASH 在普通 hash map 基础上增加了 LRU(Least Recently Used)淘汰机制。当 map 容量已满时,再插入新的 entry 会自动淘汰最久未被访问(lookup/update)的 entry,而不是返回错误。引入于内核 4.10,内核代码位于 kernel/bpf/hashtab.c 和 kernel/bpf/bpf_lru_list.c
内核实现原理
LRU Hash 在 bpf_htab 内部使用 struct bpf_lru 管理淘汰策略,采用 per-CPU local list + global LRU list 两级结构:
- Global LRU list:全局的双向链表,维护所有 entry 的 LRU 顺序。分为
active list和inactive list两个子链表 - Per-CPU local free list:每个 CPU 维护本地的空闲 entry 列表,减少对全局锁的争用
- 当 insert 时发现 map 已满,先从 per-CPU local list 取空闲节点,若没有则从 global inactive list 淘汰,最后尝试从其他 CPU “偷取”(steal)
struct bpf_lru {
union {
struct bpf_common_lru common_lru; /* global LRU */
struct bpf_lru_locallist __percpu *percpu_lru; /* per-CPU LRU */
};
...
};
struct bpf_lru_list {
struct list_head lists[NR_BPF_LRU_LIST_T]; /* active / inactive / free */
...
};
LRU 淘汰流程:
flowchart TD
Insert["bpf_map_update_elem"] --> CheckFull{"map full?"}
CheckFull -->|No| DoInsert["insert into hash bucket"]
CheckFull -->|Yes| TryLocal{"per-CPU local list\nhas free node?"}
TryLocal -->|Yes| ReuseLocal["reuse node from local list"]
TryLocal -->|No| TryGlobal{"global inactive list\nhas entry?"}
TryGlobal -->|Yes| EvictGlobal["evict LRU entry\nfrom inactive list"]
TryGlobal -->|No| Steal["steal node from\nother CPU local list"]
ReuseLocal --> DoInsert
EvictGlobal --> DoInsert
Steal --> DoInsert
DoInsert --> PromoteActive["place new entry\non active list"]
使用场景
- 连接跟踪表(conntrack):网络连接数量不可预知,满后淘汰最旧的连接
- NAT 映射表:地址转换记录,容量有限时自动释放不活跃映射
- IP 统计/限流:统计源 IP 的包数量/字节数,map 满时淘汰不活跃 IP
- DDoS 防御中的 rate limiter:记录每个源 IP 的请求速率
注意事项
- 淘汰是被动触发的,仅在 insert 时发现 map 满才执行淘汰,不会后台自动清理
bpf_map_lookup_elem()和bpf_map_update_elem()都会刷新 entry 的”最近访问”状态(将其提升到 active list)- 高频 update 场景下,
BPF_MAP_TYPE_LRU_PERCPU_HASH性能优于全局 LRU,因为减少了全局锁争用 - 在伪造源 IP 的 DDoS 攻击下,海量不同 IP 会导致 LRU map 条目快速轮转(频繁淘汰+插入),CPU 开销显著增大
max_entries设置需要权衡:过小会频繁淘汰有效数据,过大会浪费内存(预分配)
内核态 C 示例:XDP 统计源 IP 包数量
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
struct ip_stats {
__u64 packets;
__u64 bytes;
};
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 65536);
__type(key, __u32); /* src IPv4 addr */
__type(value, struct ip_stats);
} src_ip_stats SEC(".maps");
SEC("xdp")
int xdp_count_src_ip(struct xdp_md *ctx)
{
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
struct ethhdr *eth = data;
if ((void *)(eth + 1) > data_end)
return XDP_PASS;
if (bpf_ntohs(eth->h_proto) != ETH_P_IP)
return XDP_PASS;
struct iphdr *iph = (void *)(eth + 1);
if ((void *)(iph + 1) > data_end)
return XDP_PASS;
__u32 src_ip = iph->saddr;
__u64 pkt_len = data_end - data;
struct ip_stats *stats = bpf_map_lookup_elem(&src_ip_stats, &src_ip);
if (stats) {
__sync_fetch_and_add(&stats->packets, 1);
__sync_fetch_and_add(&stats->bytes, pkt_len);
} else {
struct ip_stats new_stats = {
.packets = 1,
.bytes = pkt_len,
};
bpf_map_update_elem(&src_ip_stats, &src_ip, &new_stats, BPF_NOEXIST);
}
return XDP_PASS;
}
char _license[] SEC("license") = "GPL";
上面的例子中,当 map 中已经有 65536 个不同的源 IP 记录后,再遇到新的 IP,LRU 机制会自动淘汰最久没有收到流量的 IP 记录,腾出空间给新 IP。无需开发者手动管理过期逻辑
BPF_MAP_TYPE_ARRAY
BPF_MAP_TYPE_ARRAY 是最简单的 Map 类型,底层就是一个连续数组。key 是 __u32 类型的索引(从 0 到 max_entries - 1),lookup 操作 O(1) 直接按下标寻址
核心特性:
- 创建时所有 entry 即被零初始化,无需手动插入
- 不支持删除 entry(
bpf_map_delete_elem()返回-EINVAL),只能 update - value 地址在 map 生命周期内固定,可安全持有指针
- 内存连续,CPU cache 友好
典型场景:
- 固定大小的统计数组(如按 softirq 类型的计数器)
- 全局配置表(用 index 0 存储一个配置结构体)
- 作为 “堆内存” 的替代(配合
BPF_MAP_TYPE_PERCPU_ARRAY,突破栈 512 字节限制)
struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__uint(max_entries, 256);
__type(key, u32);
__type(value, u64);
} my_array SEC(".maps");
BPF_MAP_TYPE_PERCPU_HASH / PERCPU_ARRAY
PERCPU 变体(BPF_MAP_TYPE_PERCPU_HASH、BPF_MAP_TYPE_PERCPU_ARRAY)为每个 CPU 维护独立的 value 副本,核心优势是完全无锁的并发读写。引入于内核 4.6 版本
关键特性:
- 内核态
bpf_map_lookup_elem()返回的是当前 CPU 的 value 副本,无需加锁 - 用户态
bpf_map_lookup_elem()返回所有 CPU 副本的数组,需要自行汇总 - 内存开销 =
value_size * nr_possible_cpus(按8字节对齐)
使用场景:
- 流量计数器:每个 CPU 各自累加,用户态定期汇总
- 延迟直方图:per-CPU 独立记录,避免锁争用
- 临时堆空间:
PERCPU_ARRAY+max_entries=1作为无锁的 per-CPU 临时缓冲区,突破 BPF 栈 512 字节限制
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__uint(max_entries, 1024);
__type(key, u32);
__type(value, u64);
} percpu_counter SEC(".maps");
SEC("tp_btf/sched_switch")
int count_context_switch(u64 *ctx)
{
u32 cpu = bpf_get_smp_processor_id();
u64 *val = bpf_map_lookup_elem(&percpu_counter, &cpu);
if (val)
*val += 1; /* no lock needed, per-CPU value */
else {
u64 init = 1;
bpf_map_update_elem(&percpu_counter, &cpu, &init, BPF_NOEXIST);
}
return 0;
}
BPF_MAP_TYPE_LPM_TRIE
BPF_MAP_TYPE_LPM_TRIE 是 eBPF 中专门用于最长前缀匹配(Longest Prefix Matching) 的数据结构,引入于内核 4.11。内核代码位于 kernel/bpf/lpm_trie.c
原理
LPM Trie 基于压缩前缀树(compressed trie) 实现。树中每个节点代表一个前缀,从根节点到叶子节点的路径表示 bit 级别的匹配过程。查找时逐 bit 比较 key 的数据部分,选择匹配最长前缀的节点作为结果,时间复杂度 O(prefixlen)
LPM Trie 的树形查找示意如下(以 IPv4 CIDR 为例):
graph TD
Root["root"] --> N1["10.0.0.0/8\nvalue=1"]
Root --> N2["192.168.0.0/16\nvalue=2"]
N2 --> N3["192.168.1.0/24\nvalue=3"]
N2 --> N4["192.168.2.0/24\nvalue=4"]
N3 --> N5["192.168.1.128/25\nvalue=5"]
QueryIP["Query: 192.168.1.200"] -.->|"match /16"| N2
QueryIP -.->|"match /24"| N3
QueryIP -.->|"longest match /25"| N5
查询 192.168.1.200 时,trie 按 bit 逐层匹配:先匹配到 /16,再匹配到 /24,最终匹配到 /25(最长前缀),返回 value=5
Key 结构
LPM Trie 的 key 由两部分组成:
prefixlen:前缀长度(以 bit 为单位,如 IPv4 的/24对应 prefixlen=24)data:实际数据(必须按网络字节序/大端序存储)
struct lpm_key_ipv4 {
__u32 prefixlen; /* in bits, e.g. 24 for /24 */
__u32 data; /* IPv4 addr in network byte order */
};
struct lpm_key_ipv6 {
__u32 prefixlen; /* in bits, e.g. 64 for /64 */
__u8 data[16]; /* IPv6 addr in network byte order */
};
使用场景
- XDP/TC 中的 CIDR 匹配:IP 黑白名单、ACL 规则
- 路由表最长前缀匹配:将路由条目(如
10.0.0.0/8 -> 网关A)存入 trie,查找最佳匹配路由 - IPv6 前缀匹配:key data 部分扩展为 16 字节即可支持 IPv6
- 域名后缀匹配:将反转后的域名字符按 bit 编码存入 trie
注意事项
- 必须设置
BPF_F_NO_PREALLOC标志,因为 trie 节点按需分配,预分配无意义且会浪费内存 - key 中的
data必须按网络字节序(大端) 存储,用bpf_htonl()/bpf_htons()转换 prefixlen = 0表示默认路由(catch-all),匹配所有 keymax_entries代表 trie 中最大节点数(包含中间节点),不等于叶子数或规则数- lookup 时传入的
prefixlen应设置为 key 数据的完整 bit 长度(如 IPv4 设为 32),trie 内部会自动找最长匹配 - 支持
bpf_map_get_next_key()遍历(内核 4.16+) - 不支持 PERCPU 变体
内核态 C 示例 1:IPv4 CIDR 黑名单(XDP drop)
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
#define MAX_RULES 4096
struct lpm_key_ipv4 {
__u32 prefixlen;
__u32 data; /* network byte order */
};
struct {
__uint(type, BPF_MAP_TYPE_LPM_TRIE);
__uint(max_entries, MAX_RULES);
__type(key, struct lpm_key_ipv4);
__type(value, __u32); /* action: 1=drop, 0=pass */
__uint(map_flags, BPF_F_NO_PREALLOC);
} ipv4_blacklist SEC(".maps");
SEC("xdp")
int xdp_lpm_filter(struct xdp_md *ctx)
{
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
struct ethhdr *eth = data;
if ((void *)(eth + 1) > data_end)
return XDP_PASS;
if (bpf_ntohs(eth->h_proto) != ETH_P_IP)
return XDP_PASS;
struct iphdr *iph = (void *)(eth + 1);
if ((void *)(iph + 1) > data_end)
return XDP_PASS;
/* lookup 时 prefixlen 设为 32(完整 IPv4),trie 自动返回最长匹配 */
struct lpm_key_ipv4 key = {
.prefixlen = 32,
.data = iph->saddr, /* 已经是网络字节序 */
};
__u32 *action = bpf_map_lookup_elem(&ipv4_blacklist, &key);
if (action && *action == 1)
return XDP_DROP;
return XDP_PASS;
}
char _license[] SEC("license") = "GPL";
用户态通过 bpf_map_update_elem() 插入 CIDR 规则,例如封禁 10.0.0.0/8 和 192.168.1.0/24:
/* userspace: insert CIDR rules */
struct lpm_key_ipv4 key1 = { .prefixlen = 8 };
inet_pton(AF_INET, "10.0.0.0", &key1.data);
__u32 drop_action = 1;
bpf_map_update_elem(map_fd, &key1, &drop_action, BPF_ANY);
struct lpm_key_ipv4 key2 = { .prefixlen = 24 };
inet_pton(AF_INET, "192.168.1.0", &key2.data);
bpf_map_update_elem(map_fd, &key2, &drop_action, BPF_ANY);
内核态 C 示例 2:IPv6 前缀匹配
struct lpm_key_ipv6 {
__u32 prefixlen;
__u8 data[16]; /* 128-bit IPv6 addr, network byte order */
};
struct {
__uint(type, BPF_MAP_TYPE_LPM_TRIE);
__uint(max_entries, 1024);
__type(key, struct lpm_key_ipv6);
__type(value, __u32);
__uint(map_flags, BPF_F_NO_PREALLOC);
} ipv6_lpm SEC(".maps");
static __always_inline __u32 *lookup_ipv6(struct in6_addr *addr)
{
struct lpm_key_ipv6 key = { .prefixlen = 128 };
__builtin_memcpy(key.data, addr, 16);
return bpf_map_lookup_elem(&ipv6_lpm, &key);
}
BPF_MAP_TYPE_STACK_TRACE
BPF_MAP_TYPE_STACK_TRACE 专门用于存储内核/用户态的调用栈信息,配合 bpf_get_stackid() helper 使用。引入于内核 4.6
- key 是
bpf_get_stackid()返回的 stack id(32位哈希值) - value 是
__u64数组,存储栈帧中每层的指令地址 - 典型用于 CPU profiling、off-CPU 分析、火焰图生成
struct {
__uint(type, BPF_MAP_TYPE_STACK_TRACE);
__uint(max_entries, 8192);
__uint(key_size, sizeof(u32));
__uint(value_size, 100 * sizeof(u64)); /* max 100 stack frames */
} stack_traces SEC(".maps");
SEC("perf_event")
int profile_handler(struct bpf_perf_event_data *ctx)
{
u32 pid = bpf_get_current_pid_tgid() >> 32;
int stack_id = bpf_get_stackid(ctx, &stack_traces,
BPF_F_USER_STACK | BPF_F_FAST_STACK_CMP);
/* record (pid, stack_id) pair for later symbolization */
return 0;
}
0x03 一些使用技巧
减少预分配(pre-allocation)开销
注意:在早期内核中(commit
94dacdbd5d2d),BPF_F_NO_PREALLOC在 kprobe 场景下可能触发死锁,参见 BCC PR #4044。在 6.6 内核中此问题已修复,但仍需了解其背景
从 Linux 4.6 开始,BPF hash maps 会默认执行内存预分配,并引入 BPF_F_NO_PREALLOC 标志。这样做的动机是为了避免 kprobe + bpf 死锁。社区尝试了其他解决方案,但最终,预分配所有 map 元素是最简单的解决方案,并且不影响用户空间的行为。
当完整的 map 预分配过于昂贵时,可使用 BPF_F_NO_PREALLOC 标志定义 map 以保持早期行为。详情请参阅 bpf: map pre-alloc。当 map 大小不大时(比如 MAX_ENTRIES = 256),这个标志是不必要的,因为 BPF_F_NO_PREALLOC 速度较慢(运行时动态分配)
以下是一个使用示例:
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, MAX_ENTRIES);
__type(key, u32);
__type(value, u64);
__uint(map_flags, BPF_F_NO_PREALLOC);
} start SEC(".maps");
运行时确定 map 大小
libbpf-tools 的一个优点是可移植,因此 map 所需的最大空间可能因不同的机器而异。在这种情况下,可以在加载之前定义 map 而不指定大小,然后运行时调整。例如:
在 <name>.bpf.c 中,定义 map:
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, u32);
__type(value, u64);
} start SEC(".maps");
在 open 阶段之后,调用 bpf_map__set_max_entries()(新版 libbpf API,替代了早期的 bpf_map__resize())进行动态调整。例如:
struct cpudist_bpf *obj;
obj = cpudist_bpf__open();
//bpf_map__resize(obj->maps.start, pid_max); //old version
bpf_map__set_max_entries(obj->maps.start, pid_max);
Per-CPU
在选择 map 类型时,如果与同一 CPU 相关联并发生多个事件,则可以使用 per-CPU 数组来跟踪时间戳,这比使用 hash map 更加简单和高效。然而必须确保内核在两次 BPF 程序调用之间不会将进程从一个 CPU 迁移到另一个 CPU。softirqs.bpf.c示例分析了软中断,并且满足了这两个条件:
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(max_entries, 1);
__type(key, u32);
__type(value, u64);
} start SEC(".maps");
SEC("tp_btf/softirq_entry")
int BPF_PROG(softirq_entry, unsigned int vec_nr)
{
u64 ts = bpf_ktime_get_ns();
u32 key = 0;
bpf_map_update_elem(&start, &key, &ts, 0);
return 0;
}
SEC("tp_btf/softirq_exit")
int BPF_PROG(softirq_exit, unsigned int vec_nr)
{
u32 key = 0;
u64 *tsp;
[...]
tsp = bpf_map_lookup_elem(&start, &key);
[...]
}
全局变量
不仅可以使用全局变量来自定义 BPF 程序逻辑,还可以使用它们来替代 map,这使程序更加简单和高效。全局变量可以是任意大小。可设定全局变量为一个固定的大小
例如,因为 SOFTIRQ 类型的数量是固定的,可以在 softirq.bpf.c 中定义全局数组来保存计数和直方图:
__u64 counts[NR_SOFTIRQS] = {};
struct hist hists[NR_SOFTIRQS] = {};
然后,可以直接在用户空间遍历这个数组:
static int print_count(struct softirqs_bpf__bss *bss)
{
const char *units = env.nanoseconds ? "nsecs" : "usecs";
__u64 count;
__u32 vec;
printf("%-16s %6s%5s\n", "SOFTIRQ", "TOTAL_", units);
for (vec = 0; vec < NR_SOFTIRQS; vec++){
count = __atomic_exchange_n(&bss->counts[vec], 0,
__ATOMIC_RELAXED);
if (count > 0)
printf("%-16s %11llu\n", vec_names[vec], count);
}
return 0;
}
Map 声明风格说明
BPF map 有两种声明风格,推荐使用 BTF-defined 风格:
BTF-defined 风格(推荐,内核 5.2+):
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 1024);
__type(key, __u32);
__type(value, __u64);
} my_map SEC(".maps");
Legacy bpf_map_def 风格(不推荐):
struct bpf_map_def SEC("maps/my_map") my_map = {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u32),
.value_size = sizeof(__u64),
.max_entries = 1024,
};
BTF-defined 风格的优势:类型信息完整、支持 map-in-map、支持 bpf_map__set_*() API 动态配置、兼容 CO-RE 重定位。本文后续的 perf buffer 示例中保留了 legacy 风格用于对比,实际开发中应统一使用 BTF-defined 风格
0x04 再看perf_event VS ringbuf
flowchart LR
subgraph PerfBuf ["perf buffer (per-CPU)"]
CPU0_P["CPU0 buffer"]
CPU1_P["CPU1 buffer"]
CPU2_P["CPU2 buffer"]
end
subgraph RingBuf ["ring buffer (shared, 5.8+)"]
SharedBuf["shared ring buffer\nMPSC queue"]
end
CPU0_P --> UserP["userspace reader\n(per-CPU poll)"]
CPU1_P --> UserP
CPU2_P --> UserP
SharedBuf --> UserR["userspace reader\n(single poll)"]
perf buffer
eBPF中提供了内核和用户空间之间高效地交换数据的机制即perf buffer,它是一种 per-CPU 的环形缓冲区,当需要将 eBPF 收集到的数据发送到用户空间记录或者处理时,就可以用 perf buffer 来完成。它还有如下特点:
- 能够记录可变长度数据
- 能够通过内存映射的方式在用户态读取数据,而无需通过系统调用陷入到内核去拷贝数据
- 实现 epoll 通知机制
eBPF 提供了专门的 map 和 helper function 来使用 perf buffer,如下是最常用的两个:
- map:
BPF_MAP_TYPE_PERF_EVENT_ARRAY:此类型 map 专门用于 perf buffer - helper function:
bpf_perf_event_output:用于将数据发送到 perf buffer 并通知用户态
以下示例使用 legacy bpf_map_def 风格(仅作对比参考,新代码请使用 BTF-defined 风格):
struct bpf_map_def SEC("maps/my_map") my_map = {
.type = BPF_MAP_TYPE_PERF_EVENT_ARRAY,
.key_size = sizeof(int),
.value_size = sizeof(u32),
.max_entries = 1024,
};
struct data_t {
u32 pid;
};
SEC("kprobe/vfs_mkdir")
int kprobe_vfs_mkdir(void *ctx)
{
bpf_printk("mkdir_perf_event (vfs hook point)%u\n", bpf_get_current_pid_tgid());
struct data_t data = {};
data.pid = bpf_get_current_pid_tgid();
bpf_perf_event_output(ctx, &my_map, BPF_F_CURRENT_CPU, &data, sizeof(data));
return 0;
}
在 BPF_MAP_TYPE_PERF_EVENT_ARRAY 类型的映射中,key_size 和 value_size 的含义与其他类型的映射略有不同:
key_size:键是 CPU 的编号,因此key_size通常被设置为sizeof(int)value_size:值是一个文件描述符(fd),该 fd 关联了一个 perf event。因此value_size通常被设置为sizeof(u32),而不是实际需要传输的结构体大小(不能写成sizeof(struct data_t))
上面的代码存在两次内存拷贝的问题:
struct data_t data = {};– 在栈上创建数据(第一次拷贝:初始化)bpf_perf_event_output(...)– 将栈上的数据拷贝到 perf buffer(第二次拷贝)
此外,因为 data 是在栈上分配的,eBPF verifier 会限制结构体不能超过 512 字节
perf buffer heap 优化
struct data_t data = {} 因为是在栈上创建的,所以会存在大小限制,结构体大小不能超过 512 字节。常见的做法是用 BPF_MAP_TYPE_PERCPU_ARRAY 作为 “堆内存”:
struct bpf_map_def SEC("maps/heap") heap = {
.type = BPF_MAP_TYPE_PERCPU_ARRAY,
.key_size = sizeof(int),
.value_size = sizeof(struct data_t),
.max_entries = 1,
};
struct data_t {
u32 pid;
};
struct bpf_map_def SEC("maps/perf_map") perf_map = {
.type = BPF_MAP_TYPE_PERF_EVENT_ARRAY,
.key_size = sizeof(int),
.value_size = sizeof(u32),
.max_entries = 1024,
};
SEC("kprobe/vfs_mkdir")
int kprobe_vfs_mkdir(void *ctx)
{
bpf_printk("mkdir_perf_event (vfs hook point)%u\n", bpf_get_current_pid_tgid());
int zero = 0;
struct data_t *data = bpf_map_lookup_elem(&heap, &zero);
if (!data) {
return 0;
}
data->pid = bpf_get_current_pid_tgid();
bpf_perf_event_output(ctx, &perf_map, BPF_F_CURRENT_CPU, data, sizeof(*data));
return 0;
}
通过 bpf_map_lookup_elem(&heap, &zero) 从 BPF_MAP_TYPE_PERCPU_ARRAY 中获取一块 per-CPU 的内存作为临时缓冲区,从而避免了 512 字节的栈大小限制。但仍然存在两次拷贝的问题
BPF_MAP_TYPE_PERCPU_ARRAY 的优势:
- 并发更新:每个 CPU 核心有独立的数据副本,并发更新无需加锁
- 无锁访问:避免了锁竞争的开销
- 高效的局部性:每个 CPU 核心只访问自己的数据副本,cache 友好
BPF_MAP_TYPE_PERF_EVENT_ARRAY 的优势:
- 内核到用户态数据传输:与 Linux 的 perf_event 子系统集成,允许将数据从内核空间高效传输到用户空间
- 适用于事件流:实时捕获和处理事件数据(如 kprobe/tracepoint 触发的事件)
总之,BPF_MAP_TYPE_PERCPU_ARRAY 适用于需要高性能并发访问的 per-CPU 临时存储,而 BPF_MAP_TYPE_PERF_EVENT_ARRAY 适用于内核到用户态的事件数据传输
perf buffer 的问题
1、内存效率
perfbuf 为每个 CPU 分配一个独立的缓冲区,这意味着开发者通常需要在内存效率和数据丢失之间做出折中:
- 越大的 per-CPU buffer 越能避免丢数据,但也意味着大部分时间里,大部分内存都是浪费的
- 尽量小的 per-CPU buffer 能提高内存使用效率,但在数据量陡增(毛刺)时将导致丢数据
- 对于那些大部分时间都比较空闲、周期性来一大波数据的场景,这个问题尤其突出,很难在两者之间取得一个很好的平衡
ringbuf 的解决方式是分配一个所有 CPU 共享的大缓冲区:
- 大缓冲区,意味着能更好地容忍数据量毛刺
- 共享,则意味着内存使用效率更高
- CPU 数量从
16增加到32时,perfbuf 的总 buffer 会翻倍(per-CPU),ringbuf 的总 buffer 不一定需要翻倍
2、事件顺序
perfbuf 的 per-CPU 特性导致内核调度器在不同 CPU 上调度进程时,对于那些存活时间很短的进程(fork→exec→exit 会在极短的时间内在不同 CPU 上执行),用户态无法按照事件发生的顺序获取数据
但对于 ringbuf 来说,因为它是共享的同一个缓冲区,ringbuf 保证如果事件 A 发生在事件 B 之前,那 A 一定会先于 B 被提交,也会在 B 之前被消费
3、数据复制
BPF 程序使用 perfbuf 时,必须先初始化一份事件数据,然后将它复制到 perfbuf,然后才能发送到用户空间。这意味着数据会被复制两次:
- 复制到一个局部变量或 per-CPU array(BPF 的栈空间很小,因此较大的变量无法放到栈上)
- 复制到 perfbuf 中
更糟糕的是,如果 perfbuf 已经没有足够空间放数据了,那第一步的复制完全是浪费的
BPF ringbuf 提供了一个可选的 reservation/submit API 来避免这种问题:
- 首先申请为数据预留空间(reserve the space)
- 预留成功后,应用就可以直接将准备发送的数据放到 ringbuf 了,从而节省了 perfbuf 中的第一次复制
- 将数据提交到用户空间将是一件极其高效、不会失败的操作,也不涉及任何额外的内存复制
- 如果因为 buffer 没有空间而预留失败了,BPF 程序马上就能知道,从而也不用再执行第一步复制
| 对比项 | perf buffer | ring buffer |
|---|---|---|
| 引入版本 | 4.4+ | 5.8+ |
| 缓冲区模型 | per-CPU 独立 | 所有 CPU 共享(MPSC) |
| 内存效率 | 低(per-CPU 预分配) | 高(共享,弹性利用) |
| 事件顺序 | 无法保证跨 CPU 顺序 | 全局有序 |
| 数据拷贝 | 2 次 | 1 次(reserve/submit)或 2 次(output) |
| API | bpf_perf_event_output() |
bpf_ringbuf_output() / bpf_ringbuf_reserve() + bpf_ringbuf_submit() |
ringbuf
由于 perf buffer 的 per-CPU 模型存在的两个严重缺陷:
- 内存使用效率低下(inefficient use of memory)
- 事件顺序无法保证(event re-ordering)
因此内核 5.8 引入了 ringbuf 来解决这个问题。ringbuf 是一个”多生产者、单消费者”(multi-producer, single-consumer,MPSC)队列,可安全地在多个 CPU 之间共享和操作。perfbuf 支持的一些功能它都支持,包括:
- 可变长数据(variable-length data records)
- 通过 memory-mapped region 来高效地从 userspace 读数据,避免内存复制或系统调用
- 支持 epoll notifications 和 busy-loop 两种获取数据方式
epoll notifications 和 busy-loop 两种消费模式:
- epoll notifications:使用 Linux 的 epoll 机制。用户空间应用将 ringbuf 的 fd 添加到 epoll 实例中,调用
epoll_wait()等待新数据。当内核有新数据写入 ringbuf 时触发 epoll 事件唤醒应用。优点是无新数据时不消耗 CPU;缺点是唤醒有延迟 - busy-loop:用户空间应用不断轮询 ringbuf 检查是否有新数据。优点是延迟最低;缺点是持续占用 CPU 资源
内核态示例:ringbuf 基础用法(bpf_ringbuf_output)
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24); /* 16MB, shared across all CPUs */
} events SEC(".maps");
struct task_info {
u32 pid;
u8 comm[80];
};
SEC("kprobe/vfs_mkdir")
int kprobe_vfs_mkdir(void *ctx)
{
struct task_info task_data = {};
u64 id = bpf_get_current_pid_tgid();
u32 tgid = id >> 32;
task_data.pid = tgid;
bpf_get_current_comm(&task_data.comm, 80);
bpf_ringbuf_output(&events, &task_data, sizeof(task_data), 0);
return 0;
}
对比 bpf_perf_event_output 方式的代码:
struct bpf_map_def SEC("maps/my_map") my_map = {
.type = BPF_MAP_TYPE_PERF_EVENT_ARRAY,
.key_size = sizeof(int),
.value_size = sizeof(u32),
.max_entries = 1024,
};
struct data_t {
u32 pid;
};
SEC("kprobe/vfs_mkdir")
int kprobe_vfs_mkdir(void *ctx)
{
struct data_t data = {};
data.pid = bpf_get_current_pid_tgid();
bpf_perf_event_output(ctx, &my_map, BPF_F_CURRENT_CPU, &data, sizeof(data));
return 0;
}
bpf_ringbuf_output 和 bpf_perf_event_output 有如下几点不同:
- ringbuf map 的大小(
max_entries)在 BPF 侧指定,这是所有 CPU 共享的总大小 bpf_ringbuf_output()不需要 BPF context 参数(ctx),API 更简洁- ringbuf map 类型为
BPF_MAP_TYPE_RINGBUF
用户态代码(基于 datadog/manager)
m := &manager.Manager{
Probes: []*manager.Probe{
&manager.Probe{
UID: "MyFirstHook",
Section: "kprobe/vfs_mkdir",
AttachToFuncName: "vfs_mkdir",
EbpfFuncName: "kprobe_vfs_mkdir",
},
},
RingbufMaps: []*manager.RingbufMap{
&manager.RingbufMap{
Map: manager.Map{
Name: "events",
},
RingbufMapOptions: manager.RingbufMapOptions{
DataHandler: myDataHandler,
},
},
},
}
相比之前的代码,只是将 perfbuf map 替换成了 ringbuf map,并设置 DataHandler 回调函数处理内核态传输过来的数据。DataHandler 的回调函数原型如下:
DataHandler func(CPU int, data []byte, perfMap *RingbufMap, manager *Manager)
实际程序运行得到的结果如下:
successfully started, head over to /sys/kernel/debug/tracing/trace_pipe
Generating events to trigger the probes ...
creating /tmp/test_folder
received: pid:1118833,comm:main
removing /tmp/test_folder
通过 /sys/kernel/debug/tracing/trace_pipe 得到的结果与用户态程序解析结果一致:
<...>-1118835 [010] d...1 702590.314924: bpf_trace_printk: pid: 1118833, comm: main
ringbuf reserve/commit 优化
bpf_ringbuf_output() API 的目的是确保从 perfbuf 到 ringbuf 迁移时无需对 BPF 代码做重大改动,但这也意味着它继承了 perfbuf API 的一些缺点:
- 额外的内存复制:需要额外的空间来构建 event 变量,然后将其复制到 buffer。不仅低效,而且经常需要引入只有一个元素的 per-CPU array
- 非常晚的 buffer 空间申请:如果 buffer 满导致发送失败,前面准备数据的工作完全浪费
为了解决 xxx_output() 存在的问题,引入了新的 bpf_ringbuf_reserve() / bpf_ringbuf_submit() API:
- 提前预留空间,或者能立即发现没有可用空间了(返回 NULL)
- 预留成功后,一旦数据写好了,将它发送到 userspace 是一个不会失败的操作(
bpf_ringbuf_submit()没有返回值,保证成功) - 预留的空间在被提交之前用户空间看不到,BPF 程序可以从容地组织 event 数据,避免了额外的内存复制和临时存储空间
限制:BPF 校验器在校验时(at verification time)必须知道预留数据的大小,因此不支持动态大小的事件数据。对于动态大小的数据,只能退回到用 bpf_ringbuf_output() 方式
struct task_info *task_data;
task_data = bpf_ringbuf_reserve(&events, sizeof(*task_data), 0);
if (!task_data) {
return 0;
}
task_data->pid = bpf_get_current_pid_tgid() >> 32;
bpf_get_current_comm(&task_data->comm, sizeof(task_data->comm));
bpf_ringbuf_submit(task_data, 0);
return 0;
bpf_ringbuf_submit 的函数原型:
void bpf_ringbuf_submit(void *data, u64 flags)
flags 参数有三个选项:
BPF_RB_NO_WAKEUP:提交数据后不唤醒用户空间进程。适用于 busy-loop 模式,用户空间进程总是在轮询,不需要被唤醒BPF_RB_FORCE_WAKEUP:提交数据后强制唤醒用户空间进程。适用于 epoll notifications 模式0:由内核根据当前条件自动决定是否唤醒用户空间进程(推荐默认值)
使用 bpf_ringbuf_reserve() / bpf_ringbuf_submit() 提交数据,用户态代码不需要改变