From: Chris Mason <mason@suse.com> AIO support for pipes (using the retry infrastructure). They were easier than I expected ;-) This goes on top of your fsaio patches. This is only lightly tested. I missed the obvious for the pipe aio cancel routine, which is to just wake up the pipe wait queue (which is what the retry is waiting on). Here's a new pipe aio patch, along with a change to sys_aio_cancel to set the cancel bit (See aio-cancel.patch). I'm not 100% sure we need it, but it seems like a good idea. fs/pipe.c | 102 +++++++++++++++++++++++++++++++++++++--------- include/linux/pipe_fs_i.h | 2 2 files changed, 84 insertions(+), 20 deletions(-) Index: linux.aio/fs/pipe.c =================================================================== --- linux.aio.orig/fs/pipe.c 2004-02-05 16:56:29.000000000 -0500 +++ linux.aio/fs/pipe.c 2004-02-06 10:02:57.000000000 -0500 @@ -33,15 +33,21 @@ */ /* Drop the inode semaphore and wait for a pipe event, atomically */ -void pipe_wait(struct inode * inode) +int pipe_wait(struct inode * inode) { - DEFINE_WAIT(wait); + DEFINE_WAIT(local_wait); + wait_queue_t *wait = &local_wait; - prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE); + if (current->io_wait) + wait = current->io_wait; + prepare_to_wait(PIPE_WAIT(*inode), wait, TASK_INTERRUPTIBLE); + if (!is_sync_wait(wait)) + return -EIOCBRETRY; up(PIPE_SEM(*inode)); schedule(); - finish_wait(PIPE_WAIT(*inode), &wait); + finish_wait(PIPE_WAIT(*inode), wait); down(PIPE_SEM(*inode)); + return 0; } static inline int @@ -81,11 +87,11 @@ iov->iov_base += copy; iov->iov_len -= copy; } - return 0; + return 0; } static ssize_t -pipe_readv(struct file *filp, const struct iovec *_iov, +pipe_aio_readv(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; @@ -93,6 +99,7 @@ ssize_t ret; struct iovec *iov = (struct iovec *)_iov; size_t total_len; + ssize_t retry; /* pread is not allowed on pipes. */ if (unlikely(ppos != &filp->f_pos)) @@ -156,7 +163,12 @@ wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } - pipe_wait(inode); + retry = pipe_wait(inode); + if (retry == -EIOCBRETRY) { + if (!ret) + ret = retry; + break; + } } up(PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ @@ -173,11 +185,15 @@ pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) { struct iovec iov = { .iov_base = buf, .iov_len = count }; - return pipe_readv(filp, &iov, 1, ppos); + ssize_t ret; + ret = pipe_aio_readv(filp, &iov, 1, ppos); + if (ret == -EIOCBRETRY) + BUG(); + return ret; } static ssize_t -pipe_writev(struct file *filp, const struct iovec *_iov, +pipe_aio_writev(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; @@ -186,6 +202,7 @@ int do_wakeup; struct iovec *iov = (struct iovec *)_iov; size_t total_len; + int retry; /* pwrite is not allowed on pipes. */ if (unlikely(ppos != &filp->f_pos)) @@ -254,7 +271,12 @@ do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); + retry = pipe_wait(inode); + if (retry == -EIOCBRETRY) { + if (!ret) + ret = retry; + break; + } PIPE_WAITING_WRITERS(*inode)--; } up(PIPE_SEM(*inode)); @@ -272,7 +294,41 @@ size_t count, loff_t *ppos) { struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; - return pipe_writev(filp, &iov, 1, ppos); + return pipe_aio_writev(filp, &iov, 1, ppos); +} + +static int +pipe_aio_cancel(struct kiocb *iocb, struct io_event *evt) +{ + struct inode *inode = iocb->ki_filp->f_dentry->d_inode; + evt->obj = (u64)(unsigned long)iocb->ki_user_obj; + evt->data = iocb->ki_user_data; + evt->res = iocb->ki_nbytes - iocb->ki_left; + if (evt->res == 0) + evt->res = -EINTR; + evt->res2 = 0; + wake_up_interruptible(PIPE_WAIT(*inode)); + aio_put_req(iocb); + return 0; +} + +static ssize_t +pipe_aio_write(struct kiocb *iocb, const char __user *buf, + size_t count, loff_t pos) +{ + struct file *file = iocb->ki_filp; + struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; + iocb->ki_cancel = pipe_aio_cancel; + return pipe_aio_writev(file, &iov, 1, &file->f_pos); +} + +static ssize_t +pipe_aio_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos) +{ + struct file *file = iocb->ki_filp; + struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; + iocb->ki_cancel = pipe_aio_cancel; + return pipe_aio_readv(file, &iov, 1, &file->f_pos); } static ssize_t @@ -467,7 +523,8 @@ struct file_operations read_fifo_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .readv = pipe_aio_readv, + .aio_read = pipe_aio_read, .write = bad_pipe_w, .poll = fifo_poll, .ioctl = pipe_ioctl, @@ -480,7 +537,8 @@ .llseek = no_llseek, .read = bad_pipe_r, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -491,9 +549,11 @@ struct file_operations rdwr_fifo_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .readv = pipe_aio_readv, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, + .aio_write = pipe_aio_write, + .aio_read = pipe_aio_read, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, @@ -504,7 +564,8 @@ struct file_operations read_pipe_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .aio_read = pipe_aio_read, + .readv = pipe_aio_readv, .write = bad_pipe_w, .poll = pipe_poll, .ioctl = pipe_ioctl, @@ -517,7 +578,8 @@ .llseek = no_llseek, .read = bad_pipe_r, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -528,9 +590,11 @@ struct file_operations rdwr_pipe_fops = { .llseek = no_llseek, .read = pipe_read, - .readv = pipe_readv, + .readv = pipe_aio_readv, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .write = pipe_write, - .writev = pipe_writev, + .writev = pipe_aio_writev, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, Index: linux.aio/include/linux/pipe_fs_i.h =================================================================== --- linux.aio.orig/include/linux/pipe_fs_i.h 2004-01-09 01:59:46.000000000 -0500 +++ linux.aio/include/linux/pipe_fs_i.h 2004-02-05 22:19:14.000000000 -0500 @@ -41,7 +41,7 @@ #define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode)) /* Drop the inode semaphore and wait for a pipe event, atomically */ -void pipe_wait(struct inode * inode); +int pipe_wait(struct inode * inode); struct inode* pipe_new(struct inode* inode);