0x00 前言
这篇文章来源于对一个问题的思考:Linux中两个进程通过有名(mkfifo
)或者匿名(pipe
)管道进行通信时,在这两个进程的内核VFS视图中,数据是如何流转的?代码基于 v4.11.6 版本
前置基础:
管道
管道应用场景:
- zero copy
- 进程间通信,又分匿名(
pipe
)与有名(mkfifo
)
linux的pipe和FIFO都是基于pipe文件系统(pipefs)的,pipe和FIFO都是半双工,即数据流向只能是一个方向。pipe机制(匿名管道)只能在pipe的创建进程及其后代进程(后代进程fork/exec时,通过继承父进程的打开文件描述符表)之间使用,来实现通信;有名pipe FIFO,即可以通过名称查找到pipe,所以无上述匿名管道限制,可以通过名称找到pipe文件,创建相应的pipe,可以实现跨进程间的通信
管道在Linux零拷贝中也有应用,零拷贝是一种优化数据传输的技术,它可以减少数据在内核态和用户态之间的拷贝次数,提高数据传输的效率。在传统的数据传输过程中,数据需要从内核缓冲区拷贝至应用程序的缓冲区,然后再从应用程序缓冲区拷贝到网络设备的缓冲区,最后才能发送出去。而零拷贝技术通过直接在应用程序和网络设备之间传输数据,避免了中间的拷贝过程,从而提高了数据传输的效率
TODO
0x01 管道实现分析:数据结构
在内核中,管道本质一个环形缓冲区,通过管道可以将数据从一个文件拷贝另外一个文件
pipefs:文件系统
pipe 是一个伪文件系统(pipefs),内核初始化时会注册到 Linux 系统
TODO
内核数据结构
1、pipe_buffer
:管道缓存,用于暂存写入管道的数据;写进程通过管道写入端将数据写入管道缓存中,读进程通过管道读出端将数据从管道缓存中读出,成员定义如下:
page
:页帧,用于存储pipe数据;pipe缓存与页帧是一对一的关系offset
:页内偏移,用于记录有效数据在页帧的超始地址(只能用偏移,而不能用地址,因为高内存页帧在内核空间中没有虚拟地址与之对应)len
:有效数据长度ops
:缓存操作集(pipe_buf_operations
)flags
:缓存标识private
:缓存操作私有数据
//https://elixir.bootlin.com/linux/v4.11.6/source/include/linux/pipe_fs_i.h#L20
struct pipe_buffer {
struct page *page;
unsigned int offset, len;
const struct pipe_buf_operations *ops;
unsigned int flags;
unsigned long private;
};
2、pipe_inode_info
:定义为管道描述符,用于表示一个管道,存储管道相应的信息
wait
:读/写/poll等待队列;由于读/写不可能同时出现在等待的情况,所以可以共用等待队列;poll读与读,poll写与写可以共存出现在等待队列中nrbufs
:非空的pipe_buffer
数量curbuf
:数据的起始pipe_buffer
buffers
:整个pipe_inode_info
关联pipe_buffer
结构的长度tmp_page
:页缓存,可以加速页帧的分配过程;当释放页帧时将页帧记入tmp_page
,当分配页帧时,先从tmp_page
中获取,如果tmp_page
为空才从伙伴系统中获取readers
:当前管道的读者个数;每次以读方式打开时,readers
加1
;关闭时readers
减1
writers
:当前管道的写者个数;每次以写方式打开时,writers
加1
;关闭时writers
减1
waiting_writers
:被阻塞的管道写者个数;写进程被阻塞时,waiting_writers
加1
;被唤醒时,waiting_writers
减1
r_counter
:管道读者记数器,每次以读方式打开管道时,r_counter
加1
;关闭时不变w_counter
:管道读者计数器;每次以写方式打开时,w_counter
加1
;关闭时不变fasync_readers
:读端异步描述符fasync_writers
:写端异步描述符inode
:pipe对应的inodebufs
:pipe_buffer
回环数据
注意:版本pipe_inode_info
的定义做了优化,参考
struct pipe_inode_info {
struct mutex mutex;
wait_queue_head_t wait;
unsigned int nrbufs, curbuf, buffers;
unsigned int readers;
unsigned int writers;
unsigned int files;
unsigned int waiting_writers;
unsigned int r_counter;
unsigned int w_counter;
struct page *tmp_page;
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
struct pipe_buffer *bufs;
struct user_struct *user;
};
此外,注意从pipe_inode_info
和pipe_buffer
的初始化的过程可以得知,在本文版本 Linux 内核中,使用了 16
个内存页(单页是4k
)作为环形缓冲区,所以环形缓冲区的大小为 64KB(16*4KB)
3、pipe_buf_operations
:记录pipe缓存的操作集
can_merge
:合并标识;如果pipe_buffer
中有空闲空间,有数据写入时,如果can_merge
置位,会先写pipe_buffer
的空闲空间;否则重新分配一个pipe_buffer
来存储写入数据map
:由于pipe_buffer
的page
可能是高内存页帧,由于内核空间页表没有相应的页表项,所以内核不能直接访问page
;只有通过map
将page
映射到内核地址空间后,内核才能访问unmap
:map
的逆过程;因为内核地址空间有限,所以page
访问完后释文地址映射confirm
:检验pipe_buffer
中的数据release
:当pipe_buffer
中的数据被读完后,用于释放pipe_buffer
get
:增加pipe_buffer
的引用计数器
//https://elixir.bootlin.com/linux/v4.11.6/source/include/linux/pipe_fs_i.h#L74
struct pipe_buf_operations {
/*
* This is set to 1, if the generic pipe read/write may coalesce
* data into an existing buffer. If this is set to 0, a new pipe
* page segment is always used for new data.
*/
int can_merge;
/*
* ->confirm() verifies that the data in the pipe buffer is there
* and that the contents are good. If the pages in the pipe belong
* to a file system, we may need to wait for IO completion in this
* hook. Returns 0 for good, or a negative error value in case of
* error.
*/
int (*confirm)(struct pipe_inode_info *, struct pipe_buffer *);
/*
* When the contents of this pipe buffer has been completely
* consumed by a reader, ->release() is called.
*/
void (*release)(struct pipe_inode_info *, struct pipe_buffer *);
/*
* Attempt to take ownership of the pipe buffer and its contents.
* ->steal() returns 0 for success, in which case the contents
* of the pipe (the buf->page) is locked and now completely owned
* by the caller. The page may then be transferred to a different
* mapping, the most often used case is insertion into different
* file address space cache.
*/
int (*steal)(struct pipe_inode_info *, struct pipe_buffer *);
/*
* Get a reference to the pipe buffer.
*/
void (*get)(struct pipe_inode_info *, struct pipe_buffer *);
};
4、anon_pipe_buf_ops
&& packet_pipe_buf_ops
anon_pipe_buf_ops
:匿名管道,默认can_merge
开启(支持)packet_pipe_buf_ops
:
//https://elixir.bootlin.com/linux/v4.11.6/source/fs/pipe.c#L233
static const struct pipe_buf_operations anon_pipe_buf_ops = {
.can_merge = 1, //匿名管道:默认can_merge为1
.confirm = generic_pipe_buf_confirm,
.release = anon_pipe_buf_release,
.steal = anon_pipe_buf_steal,
.get = generic_pipe_buf_get,
};
static const struct pipe_buf_operations packet_pipe_buf_ops = {
.can_merge = 0,
.confirm = generic_pipe_buf_confirm,
.release = anon_pipe_buf_release,
.steal = anon_pipe_buf_steal,
.get = generic_pipe_buf_get,
};
pipefs 在VFS的四大对象及对应的operations
当创建pipe/FIFO时,内核会分配VFS的file/dentry/inode/pipe_inode_info对象,并将file对象的f_op
指向fifo_open/pipe_read/pipe_write/pipe_poll
等方法,当后续的read/write/poll
等系统调用,会通过vfs调用相应的f_op
中方法
pipe/FIFO VFS相关结构及操作集如下:
1、文件对象(File):struct file
,需要区分读写端fd[0]
为读端,fd[1]
为写端,操作集file_operations
如下:
const struct file_operations pipefifo_fops = {
.open = fifo_open,
.llseek = no_llseek,
.read_iter = pipe_read, //pipe读端文件操作/FIFO只读方式文件操作
.write_iter = pipe_write, //pipe写端文件操作/FIFO只写方式文件操作
.poll = pipe_poll,
.unlocked_ioctl = pipe_ioctl,
.release = pipe_release,
.fasync = pipe_fasync,
};
在一个进程创建打开了pipe/fifo之后,在进程打开的文件描述符数组中是占据两个fd,其中对于fd[0]
读端而言,pipe_write
是无用的;反之对写端而言,pipe_read
是无用的
//https://elixir.bootlin.com/linux/v4.11.6/source/fs/pipe.c
int do_pipe_flags(int *fd, int flags)
{
struct file *files[2];
int error = __do_pipe_flags(fd, files, flags);
if (!error) {
fd_install(fd[0], files[0]);
fd_install(fd[1], files[1]);
}
return error;
}
2、目录项对象(Dentry):通过 mount_pseudo()
创建根目录项,无实际磁盘路径。操作集为simple_dentry_operations
static const struct dentry_operations pipefs_dentry_operations = {
.d_dname = pipefs_dname, //仅需处理内存回收,无磁盘同步逻辑,终删除 dentry(内存临时对象)
};
3、 索引节点对象(Inode),关联扩展结构 struct pipe_inode_info
,存储管道核心数据(如环形缓冲区、等待队列),对应于struct inode
的struct pipe_inode_info *i_pipe
成员
//https://elixir.bootlin.com/linux/v4.11.6/source/include/linux/fs.h#L554
struct inode {
......
const struct inode_operations *i_op;
struct super_block *i_sb;
struct address_space *i_mapping;
......
const struct file_operations *i_fop; /* former ->i_op->default_file_ops */
......
union {
struct pipe_inode_info *i_pipe; // 管道procfs
struct block_device *i_bdev;
struct cdev *i_cdev;
char *i_link;
unsigned i_dir_seq;
};
......
};
4、super_block,超级块对象(Superblock)
//https://elixir.bootlin.com/linux/v4.11.6/source/fs/pipe.c#L1173
static const struct super_operations pipefs_ops = {
.destroy_inode = free_inode_nonrcu,
.statfs = simple_statfs,
};
0x02 管道操作分析
本节仅分析pipe
匿名管道的在内核的实现机制
管道创建
系统调用pipe/pipe2
,用于创建管道,函数调用链为pipe2->__do_pipe_flags->create_pipe_files
//https://elixir.bootlin.com/linux/v4.11.6/source/fs/pipe.c#L839
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
// files 数组,0和1
struct file *files[2];
int fd[2];
int error;
error = __do_pipe_flags(fd, files, flags);
if (!error) {
......
fd_install(fd[0], files[0]);
fd_install(fd[1], files[1]);
}
return error;
}
static int __do_pipe_flags(int *fd, struct file **files, int flags)
{
int error;
int fdw, fdr;
if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT))
return -EINVAL;
error = create_pipe_files(files, flags);
if (error)
return error;
error = get_unused_fd_flags(flags);
if (error < 0)
goto err_read_pipe;
fdr = error;
error = get_unused_fd_flags(flags);
if (error < 0)
goto err_fdr;
fdw = error;
audit_fd_pair(fdr, fdw);
fd[0] = fdr;
fd[1] = fdw;
return 0;
......
}
create_pipe_files
是创建pipe的核心实现:
int create_pipe_files(struct file **res, int flags)
{
int err;
// 创建inode节点
struct inode *inode = get_pipe_inode();
struct file *f;
struct path path;
static struct qstr name = { .name = "" };
......
path.dentry = d_alloc_pseudo(pipe_mnt->mnt_sb, &name);
if (!path.dentry)
goto err_inode;
path.mnt = mntget(pipe_mnt);
// 关联dentry与inode的关系
d_instantiate(path.dentry, inode);
// 构建file结构,设置file_operations为管理文件操作符集pipefifo_fops
f = alloc_file(&path, FMODE_WRITE, &pipefifo_fops);
if (IS_ERR(f)) {
err = PTR_ERR(f);
goto err_dentry;
}
f->f_flags = O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT));
// 注意:file结构的private_data指向了inode->i_pipe,即管道的pipe_inode_info节点
f->private_data = inode->i_pipe;
// 创建读端的file结构
res[0] = alloc_file(&path, FMODE_READ, &pipefifo_fops);
if (IS_ERR(res[0])) {
err = PTR_ERR(res[0]);
goto err_file;
}
path_get(&path);
// 0:读端,设置了O_RDONLY标志
res[0]->private_data = inode->i_pipe;
res[0]->f_flags = O_RDONLY | (flags & O_NONBLOCK);
// 1:写端,设置了O_WRONLY标志
res[1] = f;
return 0;
.......
}
create_pipe_files
函数中有几个细节:
get_pipe_inode
:创建并初始化pipe的inode核心结构struct pipe_inode_info
,对应函数alloc_pipe_info
d_alloc_pseudo/d_instantiate
:构建dentry及inode关联关系alloc_file
:两次调用,先初始化写端的file结构、再初始化读端的file结构
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode_pseudo(pipe_mnt->mnt_sb);
struct pipe_inode_info *pipe;
......
inode->i_ino = get_next_ino();
pipe = alloc_pipe_info();
if (!pipe)
goto fail_iput;
// 初始化pipe_inode_info对象
inode->i_pipe = pipe;
pipe->files = 2;
pipe->readers = pipe->writers = 1;
//inode的i_fop与file的fop是一样的
inode->i_fop = &pipefifo_fops;
inode->i_state = I_DIRTY;
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_atime = inode->i_mtime = inode->i_ctime = current_time(inode);
return inode;
......
}
pipe匿名共享的本质
问题:两个进程共享pipe,在内核的本质是什么?回到上面的create_pipe_files
函数实现,在关键字节用序号注释标识
1、在get_pipe_inode
中初始化了inode节点,并且将inode->i_pipe
(union成员)指向了pipe管理结构pipe_inode_info
对象
2、在create_pipe_files
中,d_instantiate(path.dentry, inode)
函数相当于执行了path.dentry->d_inode = inode
,将dentry关联到inode结构
3、同时在create_pipe_files
函数中,完成了读端写端的struct file
的初始化,即files[0] = alloc_file(&path, FMODE_READ, &pipefifo_fops);files[1] = alloc_file(&path, FMODE_WRITE, &pipefifo_fops);
,从第二个参数就知道,这两个file限制了各自的功能,也就意味着对应的两个fd也限制了各自的功能
4、在alloc_file
函数中,file->f_path = *path
,就相当于file[x]->f_path.dentry->d_inode = inode
,也即两个file指向了同一个inode,这个就是所谓的两个file关联(记得struct path
中存在dentry成员),这就是pipe共享的本质
5、下面这段代码将把pipe放在file中是方便取值,否则给定一个files,若需要获取pipe,就需要从file->f_path.dentry->d_inode->i_pipe
来获取,这显然不够优雅
files[0]->private_data = inode->i_pipe;
files[1]->private_data = inode->i_pipe;
6、至此pipe2
系统调用执行完毕,它创建了两个个互相关联的file,然后创建了2个fd,fd与file一一对应,其次file都指向了同一个inode,读写均操作统一inode(即pipe结构pipe_inode_info
);此外,由于创建父子进程时,task_struct
关联的打开文件描述符表fdtable也要复制一份,即父子进程的文件描述符表均指向上述的pipe结构(当然需要关闭一方的读、另一方的写),这样父子进程便可以借助pipe实现单向通信了
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode_pseudo(pipe_mnt->mnt_sb);
struct pipe_inode_info *pipe;
......
pipe = alloc_pipe_info();
......
// 初始化pipe_inode_info对象
// 将这个inode
inode->i_pipe = pipe;
pipe->files = 2;
pipe->readers = pipe->writers = 1;
//inode的i_fop与file的fop是一样的
//告诉vfs如何读写pipefs(文件系统)等操作
inode->i_fop = &pipefifo_fops;
......
return inode;
......
}
// alloc_file:创建file结构,并关联到path
struct file *alloc_file(const struct path *path, fmode_t mode, const struct file_operations *fop)
{
struct file *file;
file = get_empty_filp();
if (IS_ERR(file))
return file;
// file关联到dentry
file->f_path = *path;
file->f_inode = path->dentry->d_inode;
file->f_mapping = path->dentry->d_inode->i_mapping;
if ((mode & FMODE_READ) &&
likely(fop->read || fop->read_iter))
mode |= FMODE_CAN_READ;
if ((mode & FMODE_WRITE) &&
likely(fop->write || fop->write_iter))
mode |= FMODE_CAN_WRITE;
file->f_mode = mode;
file->f_op = fop;
if ((mode & (FMODE_READ | FMODE_WRITE)) == FMODE_READ)
i_readcount_inc(path->dentry->d_inode);
return file;
}
int create_pipe_files(struct file **res, int flag s)
{
int err;
// 1. 创建inode节点并绑定到pipe的管理结构
struct inode *inode = get_pipe_inode();
struct file *f;
struct path path;
static struct qstr name = { .name = "" };
......
path.dentry = d_alloc_pseudo(pipe_mnt->mnt_sb, &name);
if (!path.dentry)
goto err_inode;
path.mnt = mntget(pipe_mnt);
// 2. 关联dentry与inode的关系,将dentry指向inode
d_instantiate(path.dentry, inode);
// 3. 构建(写端)file结构,设置file_operations为管理文件操作符集pipefifo_fops,同时将file->path指向 *path
f = alloc_file(&path, FMODE_WRITE, &pipefifo_fops);
if (IS_ERR(f)) {
err = PTR_ERR(f);
goto err_dentry;
}
f->f_flags = O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT));
// 注意:file结构的private_data指向了inode->i_pipe,即管道的pipe_inode_info节点
f->private_data = inode->i_pipe;
// 4. 创建读端的file结构
res[0] = alloc_file(&path, FMODE_READ, &pipefifo_fops);
if (IS_ERR(res[0])) {
err = PTR_ERR(res[0]);
goto err_file;
}
path_get(&path);
// 5. 快捷访问设置:读/写端file直接访问到pipe结构
// 0:读端,设置了O_RDONLY标志
res[0]->private_data = inode->i_pipe;
res[0]->f_flags = O_RDONLY | (flags & O_NONBLOCK);
// 1:写端,设置了O_WRONLY标志
res[1] = f;
return 0;
.......
}
alloc_pipe_info
//https://elixir.bootlin.com/linux/v4.11.6/source/fs/pipe.c#L620
struct pipe_inode_info *alloc_pipe_info(void)
{
struct pipe_inode_info *pipe;
unsigned long pipe_bufs = PIPE_DEF_BUFFERS; //默认16长度
struct user_struct *user = get_current_user();
unsigned long user_bufs;
pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL_ACCOUNT);
if (pipe == NULL)
goto out_free_uid;
......
// 初始化bufs成员
pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer),
GFP_KERNEL_ACCOUNT);
if (pipe->bufs) {
init_waitqueue_head(&pipe->wait);
pipe->r_counter = pipe->w_counter = 1;
pipe->buffers = pipe_bufs; //16
pipe->user = user;
mutex_init(&pipe->mutex);
return pipe;
}
......
}
pipe读写的实现原理(数据结构)
内核管道实现本质上是基于环形缓冲区(RingBuffer)的设计理念,如 pipe_inode_info
和 pipe_buffer
构成的环形数组)
1、环形存储结构,管道数据存储在内核维护的环形数组中,数组元素为 pipe_buffer
,每个对应一个物理内存页(默认 16
页),通过 curbuf
(当前读位置索引)和 nrbufs
(有效缓冲区数量)实现环形遍历
// 计算下一个缓冲区索引
// 此操作通过位运算(&)替代取模,实现高效的环形索引
int newbuf = (pipe->curbuf + bufs) & (pipe->buffers - 1);
2、读写指针分离,其中读指针为pipe->curbuf
,标记当前读取的缓冲区起始位置;写指针通过 (curbuf + nrbufs) % buffers
计算得出,指向下一个可写入位置,这与经典环形缓冲区的 head(读)和 tail(写)指针逻辑一致
3、管道的阻塞与唤醒机制,当缓冲区空时,读进程阻塞;缓冲区满时,写进程阻塞。通过等待队列(pipe->wait
)和信号(SIGIO
)实现读写协同
4、基于环形缓冲区,管道针对进程通信场景也做了若干优化策略
- 按需分配内存页:管道默认不预分配内存,仅在写入数据时
alloc_page()
动态申请页 - 小数据合并写入:若最后一个缓冲区(
lastbuf
)有剩余空间,新数据可追加到同一页,减少碎片 - 原子性保证:当单次写入
<=PIPE_BUF
(通常4KB
)时,保证数据完整写入,避免分片
0x03 pipe的读实现
对于普通的ringbuffer,读过程步骤如下:
- 首先通过读指针head来定位到读取数据的起始地址
- 判断环形缓冲区中是否有数据可读
- 读取数据,成功读取后移动读指针head
对于内核pipe的ringbuffer,其读指针是由 pipe_inode_info
对象的 curbuf
字段与 pipe_buffer
对象的 offset
字段组合而成,类似于页间序号/页内下标
pipe_inode_info
对象的curbuf
字段表示读操作要从bufs
数组的哪个pipe_buffer
中读取数据(初始化为0
)pipe_buffer
对象的offset
字段表示读操作要从内存页的哪个位置开始读取数据- 可能存在读取长度超过一个page size(
4k
)的情况,需要注意 - 从缓冲区中读取到
n
个字节的数据后,会相应移动读指针n
个字节的位置(即增加pipe_buffer
对象的offset
字段),并且减少n
个字节的可读数据长度(即减少pipe_buffer
对象的len
字段) - 当
pipe_buffer
对象的len
字段变为0
时,表示当前pipe_buffer
没有可读数据,那么将会对pipe_inode_info
对象的curbuf
字段移动一个位置,并且其nrbufs
字段进行减一操作
pipe_read
的实现如下,对于管道的ringbuffer,读操作步骤如下:
- 通过VFS file/inode 对象来获取到管道的
pipe_inode_info
对象 - 通过
pipe_inode_info
对象的nrbufs
成员,获取管道未读数据占有多少个内存页 - 通过
pipe_inode_info
对象的curbuf
成员,获取读操作应该从ringbuffer的内存页哪个序号处读取数据 - 通过
pipe_buffer
对象的offset
成员,获取真正的读指针(位置), 并且从管道中读取数据到用户缓冲区 - 如果当前内存页的数据已经被读取完毕,那么移动
pipe_inode_info
对象的curbuf
指针,并且减少其nrbufs
字段的值 - 如果读取到用户期望的数据长度,退出循环;反之,则移动
curbuf
到下一个ringbuffer位置,继续上面的操作
static ssize_t
pipe_read(struct kiocb *iocb, struct iov_iter *to)
{
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
// 1、从file结构获取管道对象pipe_inode_info
struct pipe_inode_info *pipe = filp->private_data;
int do_wakeup;
ssize_t ret;
/* Null read succeeds. */
if (unlikely(total_len == 0))
return 0;
do_wakeup = 0;
ret = 0;
__pipe_lock(pipe);
// 想想这里为啥是循环
for (;;) {
// 2、获取管道未读数据占有多少个内存页
int bufs = pipe->nrbufs;
if (bufs) {
// 3、获取读操作应该从环形缓冲区的哪个内存页处读取数据(序号)
int curbuf = pipe->curbuf;
// 4、获取页内偏移
struct pipe_buffer *buf = pipe->bufs + curbuf;
size_t chars = buf->len;
size_t written;
int error;
if (chars > total_len)
chars = total_len;
error = pipe_buf_confirm(pipe, buf);
if (error) {
if (!ret)
ret = error;
break;
}
// 5、通过 pipe_buffer 的 offset 字段获取真正的读指针,
// 并且从管道中读取数据到用户缓冲区
written = copy_page_to_iter(buf->page, buf->offset, chars, to);
if (unlikely(written < chars)) {
if (!ret)
ret = -EFAULT;
break;
}
ret += chars;
// 页内:增加 pipe_buffer 对象的 offset 字段的值
buf->offset += chars;
// 页内:减少 pipe_buffer 对象的 len 字段的值
buf->len -= chars;
/* Was it a packet buffer? Clean up and exit */
if (buf->flags & PIPE_BUF_FLAG_PACKET) {
total_len = chars;
buf->len = 0;
}
// 6、如果当前内存页的数据已经被读取完毕
if (!buf->len) {
pipe_buf_release(pipe, buf);
// 移动页间读指针
curbuf = (curbuf + 1) & (pipe->buffers - 1);
// 移动 pipe_inode_info 对象的 curbuf 指针
pipe->curbuf = curbuf;
// 减少 pipe_inode_info 对象的 nrbufs 字段(减1)
pipe->nrbufs = --bufs;
do_wakeup = 1;
}
total_len -= chars;
// 7、如果读取到用户期望的数据长度, 退出循环
if (!total_len)
break; /* common path: read succeeded */
}
if (bufs) /* More to do? */
continue;
if (!pipe->writers)
break;
if (!pipe->waiting_writers) {
/* syscall merging: Usually we must not sleep
* if O_NONBLOCK is set, or if we got some data.
* But if a writer sleeps in kernel space, then
* we can wait for that data without violating POSIX.
*/
if (ret)
break;
if (filp->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
}
if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
if (do_wakeup) {
wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
pipe_wait(pipe);
}
__pipe_unlock(pipe);
/* Signal writers asynchronously that there is more room. */
if (do_wakeup) {
wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
if (ret > 0)
file_accessed(filp);
return ret;
}
0x04 pipe的写实现
内核pipe的ringbuffer结构没有写指针这个成员,实际上是通过读指针计算出来的:写指针 = 读指针 + 未读数据长度,实际上只需要理解未读取内容的开始位置约等于已写入数据的开始位置这个概念就清楚了
- 首先通过
pipe_inode_info
的curbuf
字段和nrbufs
字段来定位到,应该向哪个pipe_buffer
写入数据 - 然后再通过
pipe_buffer
对象的offset
字段和len
字段来定位到,应该写入到内存页的哪个位置(通常情况下offset+len
即是写入的开始位置)
pipe_write的实现如下,主要步骤是:
- 如果上次写操作写入的
pipe_buffer
还有空闲的空间,那么就将数据写入到此pipe_buffer
中,并且增加其len
字段的值 - 如果上次写操作写入的
pipe_buffer
没有足够的空闲空间,那么就新申请一个内存页,并且把数据保存到新的内存页中,并且增加pipe_inode_info
的nrbufs
字段的值 - 如果写入的数据已经全部写入成功,那么就退出写操作
当然,这里还涉及到一些细节问题,如:
- 什么情况下
pipe_buffer
page是不能够被can_merge
的? 4k
情况下,大于或者小于的数据量写入管道,原子性是如何保证的?为什么说数据小于4k
才能保证其原子性?pipe_inode_info
的tmp_page
的作用是什么?
static ssize_t
pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
ssize_t ret = 0;
int do_wakeup = 0;
// from:用户空间的待写入数据
size_t total_len = iov_iter_count(from);
ssize_t chars;
/* Null write succeeds. */
if (unlikely(total_len == 0))
return 0;
__pipe_lock(pipe);
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
goto out;
}
/* We try to merge small writes */
// chars:保存用户空间待写入数据的长度
// < 4k:返回实际长度
// >=4k:返回0
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
// 1、如果最后写入的 pipe_buffer 还有空闲的空间
if (pipe->nrbufs && chars != 0) {
// 获取写入数据的位置
int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
(pipe->buffers - 1);
struct pipe_buffer *buf = pipe->bufs + lastbuf;
int offset = buf->offset + buf->len;
// anon_pipe_buf_ops
// buf->ops->can_merge:管道类型(参考下文)
if (buf->ops->can_merge && offset + chars <= PAGE_SIZE) {
ret = pipe_buf_confirm(pipe, buf);
if (ret)
goto out;
ret = copy_page_from_iter(buf->page, offset, chars, from);
if (unlikely(ret < chars)) {
ret = -EFAULT;
goto out;
}
do_wakeup = 1;
buf->len += ret;
// 2、如果要写入的数据已经全部写入成功
if (!iov_iter_count(from))
goto out;
}
}
// 3、如果最后写入的 pipe_buffer 空闲空间不足, 那么申请一个新的内存页来存储数据
for (;;) {
int bufs;
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
break;
}
bufs = pipe->nrbufs;
if (bufs < pipe->buffers) {
int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1);
struct pipe_buffer *buf = pipe->bufs + newbuf;
struct page *page = pipe->tmp_page;
int copied;
if (!page) {
// 申请一个新的内存页
page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
if (unlikely(!page)) {
ret = ret ? : -ENOMEM;
break;
}
pipe->tmp_page = page;
}
/* Always wake up, even if the copy fails. Otherwise
* we lock up (O_NONBLOCK-)readers that sleep due to
* syscall merging.
* FIXME! Is this really true?
*/
do_wakeup = 1;
copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
if (!ret)
ret = -EFAULT;
break;
}
ret += copied;
/* Insert it into the buffer array */
buf->page = page;
buf->ops = &anon_pipe_buf_ops;
buf->offset = 0;
buf->len = copied;
buf->flags = 0;
if (is_packetized(filp)) {
buf->ops = &packet_pipe_buf_ops;
buf->flags = PIPE_BUF_FLAG_PACKET;
}
pipe->nrbufs = ++bufs;
pipe->tmp_page = NULL;
// 如果要写入的数据已经全部写入成功, 退出循环
if (!iov_iter_count(from))
break;
}
if (bufs < pipe->buffers)
continue;
if (filp->f_flags & O_NONBLOCK) {
if (!ret)
ret = -EAGAIN;
break;
}
if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
if (do_wakeup) {
wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
do_wakeup = 0;
}
pipe->waiting_writers++;
// 注意这里!
pipe_wait(pipe);
pipe->waiting_writers--;
//end of for1
}
out:
__pipe_unlock(pipe);
if (do_wakeup) {
wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}
if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {
int err = file_update_time(filp);
if (err)
ret = err;
sb_end_write(file_inode(filp)->i_sb);
}
return ret;
}
0x05 附录
can_merge
的作用及场景
const struct pipe_buf_operations page_cache_pipe_buf_ops = {
.can_merge = 0,
.confirm = page_cache_pipe_buf_confirm,
.release = page_cache_pipe_buf_release,
.steal = page_cache_pipe_buf_steal,
.get = generic_pipe_buf_get,
};
管道操作的原子性
在APUE中遇到这么一句话:小于 PIPE_BUF
的写操作必须是原子的,要写的数据应被连续地写到管道;大于 PIPE_BUF
的写操作可能是非原子的,内核可能会将数据与其它进程写入的数据交织在一起。POSIX 规定 PIPE_BUF
至少为512
字节(Linux 中为4096
),语义如下:(n
为要写的字节数)
n <= PIPE_BUF
且O_NONBLOCK
为 disable:写入具有原子性。如果没有足够的空间供n
个字节全部立即写入,则阻塞直到有足够空间将n
个字节全部写入管道n <= PIPE_BUF
且O_NONBLOCK
为 enable: 写入具有原子性。如果有足够的空间写入n
个字节,则write
立即成功返回,并写入所有n
个字节;否则一个都不写入,write
返回错误,并将errno
设置为EAGAIN
n > PIPE_BUF
且O_NONBLOCK
为 disable:写入不具有原子性。可能会和其它的写进程交替写,直到将n
个字节全部写入才返回,否则阻塞等待写入n > PIPE_BUF
且O_NONBLOCK
为 enable:写入不具有原子性。如果管道已满,则写入失败,write
返回错误,并将errno
设置为EAGAIN
;否则,可以写入1--n
个字节,即部分写入,此时write
返回实际写入的字节数,并且写入这些字节时可能与其他进程交错写入
所以,从管道的内核视角容易理解上述原子性及非原子性