原创 异步通知与异步I/O

2010-9-15 08:55 2900 6 6 分类: MCU/ 嵌入式

异步通知的概念与作用

异步通知,一旦设备就绪,则主动通知应用程序,这样应用程序不用去查询设备状态,类似于硬件上的“中断”,应该是“信号驱动的异步I/O”。信号是软件层次上对中断机制的一种模拟,在原理上,进程收到信号和处理器收到中断请求一样。信号是异步的,一个进程不必通过任何操作来等待信号的到达。

阻塞I/O意味着一直等待设备可访问后再访问,非阻塞I/O中使用poll()意味着查询设备是否可访问,而异步通知意味着设备通知自身可访问,实现异步I/O。由此可见,这几种方式I/O可以互为补充。

   e98e760b-7792-4573-88ee-2dd63a0520ed.JPG

             阻塞,非阻塞I/O,异步通知的区别

Linux异步通知编程

Linux信号

Linux系统中,异步通知使用信号来实现。除SIGSTOP和SIGKILL两个信号外,进程能够忽略或捕获其它的全部信号。

 

信号的接收

在用户程序中,为了捕获信号,可用signal()来设置对应信号的处理函数。

void (*signal(int signo, void (*handler)(int)))(int);

可分解为:

typedef void  (*sighandler_t)(int);

sighandler_t signal(int signo,sighandler_t handler);

第一个参数为指定信号的值,第二参数指定针对前面信号值的处理函数,若为SIG_IGN,表示忽略该信号;若为SIG_DFL,表示采用系统默认方式处理信号,若为用户自定义的函数,则信号被捕获到后,该函数将被执行。

如果signal()调用成功,它返回最后一次为信号signum绑定的处理函数handler值,失败则返回SIG_ERR。

在进程执行时,按【Ctrl+c】组合键将向其发出SIGINT信号,kill正在运行的进程将向其发出SIGTERM信号。

sigaction()函数可用于改变进程接收到特定信号后的行为,它的原型如下:

int sigaction(int signo, const struct sigaction *act, struct signaction *oldact)

struct sigaction{

       void (*sa_handler)(int);   //handler or SIG_IGN or SIG_DFL

       sigset_a sa_mask;       // signals to block   

       int sa_flags;           //signal options

       void (*sa_sigaction)(int, siginfo_t *, void *);//if sa_flags==SA_SIGINFO,handler

};

第三个参数用来保存原来的处理函数,oldact可为NULL,如果第二三参数均为NULL,那么该函数可用于检查信号的有颜效性。

 

用信号异步通知

#include <sys/types.h>

#include <sys/stat.h>

#include <stdio.h>

#include <fcntl.h>

#include <signal.h>

#define    MAX_LEN 100

 

void input_handler(int num)

{

       char data[MAX_LEN];

       int len;

 

       //读取并输出STDIN_FILENO上的输入

       len = read(STDIN_FILENO, &data, MAX_LEN);

       data[len] = 0;

       printf(“input available:%s\n”,data);      

}

 

main()

{

       into flags;      

       //启动信号驱动机制

        signal(SIGIO,input_handler);

       //设置STD_FILENO文件的拥有者为本进程

        fcntl(STDIN_FILENO, F_SETOWN, getpid());

       //启用异步通知机制

      oflags = fcntl(STDIN_FILENO, F_GETFL);   

       fcntl(STDIN_FILENO, F_SETFL, oflags|FASYNC); 

       while(1);

}

 

为了在用户空间中能处理一个设备释放的信号,它必须完成如下3项工作

●     通过SET_FILEOWN IO控制命令设置设备文件的拥有者为本进程,这样从设备驱动发出的信号才能被本进程接收到。

●     通过F_SETFL IO控制命令设置设备文件支持FASYNC,即异步通知模式。

●     通过signal()函数连接信号和信号处理函数。

 

信号的释放

设备驱动和应用程序的异步通知交互中,仅仅在应用程序端捕获信号是不够的,因为信号没有源头在设备驱动端。因此,应该在合适的时机让设备驱动释放信号,在设备驱动程序增加信号释放的相关代码。

为了使设备支持异步通用通知机制,驱动程序中涉及以下3项工作。

●     支持F_SETOWN命令,能在这个控制命令处理中设置filp->f_owner为对应进程ID,不过此项工作已由内核完成,设备驱动无须处理。

●     支持F_SETFL命令的处理,每当FASYNC标志改变时,驱动程序中的fasync()函数将得以执行。因此,驱动中应该实现fasync()函数。

●     在设备资源可获得时,调用kill_fasync()函数激发相应的信号。

驱动程序中的以上3项和应用程序中的3项一一对应的。

6eb5886f-bd4e-447e-8157-4a2b35fcf50d.JPG

                                异步通知中设备驱动和异步通知的交互

设备驱动中异步通知编程比较简单,主要用到一项数据结构和两个函数。数据结构是fasync_struct结构体,两个函数分别如下。

处理FASYNC标志变更的函数。

int fasync_helper(int fd, struct file *filp, int mode, struct fasync_struct **fa);

释放信号用的函数。

void kill_fasync(struct fasync_struct **fa, int sig, int band);

 

在设备驱动的fasync()函数中,只需简单地将该函数的3个参数以及fasync_struct结构体指针的指针作为第4个参数传入fasync_helper()函数即可。

struct  xxx_dev

{

       struct cdev dev;

       ….

       struct fasync_struct *async_queue;

}

 

static int xxx_fasync(int fd, struct file *filp, int mode)

{

       struct xxx_dev *dev = filp->private_data;

       return fasync_helper(fd, filp, mode, &dev->async_queue);

}

在资源可获得时,应调用kill_fasync()函数释放SIGIO信号,可读时第3个参数设置为POLL_IN,可写时,设置为POLL_OUT。

static ssize_t xxx_write(struct file *filp, const char __user *buf, size_t count, loff_t *f_pos)

{

       struct xxx_dev *dev = filp->private_data;

       …

       if (dev->async_queue)

              kill_fasync(&dev->async_queue, SIGIO,POLL_IN);

         …

}

最后,关闭文件时,在设备驱动的release()函数中,应调用设备驱动的fasync()函数将文件夹从异步通知的列表中删除。

static int xxx_release(struct inode *inode, struct file *filp)

{

       struct xx_dev *dev =filp->private_data;

       …

       xxx_fasync(-1,filp,0);

}

 

Linux 2.6 与 AIO

AIO

Linux异步I/O是不是2.6版内核的一个标准特性。AIO基本思想是允许进程发起很多I/O操作,而不是用阻塞或等待任何操作完成。稍后在接收到I/O操作完成的通知时,进程可以检索I/O操作的结果。

select()所提供的功能与AIO类似,它对通知事件阻塞,而不是对I/O调用进行阻塞。

在异步非阻塞I/O中,可以同时发起多个传输操作。这需要每个操作都是惟一的上下文,这样才能在它们完成时区分到底是哪个操作完成了。在AIO中,通过aiocb(AIO I/O Control Block)结构进行区分。这个结构体包含了有关传输操作的所有信息,包括为数据准备用户缓冲区。在产生I/O(称为完成)通知时,aiocb结构就被用来惟一标识所完成的I/O操作。

AIO系列API被GNU C库函数所包含,它被POSIX.1b所要求,主要包括如下函数。

1.       aio_read

aio_read()函数请求对一个有效的文件描述符进行异步读操作。这个文件描述符可以表示一个文件、套接字甚至管道。

       int aio_read(struct aiocb * aiocb);

aio_read()函数在请求进行排队之后会立即返回。执行成功,返回值为0;错误返回-1,并设置errno的值。

2.       aio_write

aio_write()函数用来请求一个异步写操作。

int aio_write(struct aiocb *aiocb);

aio_write()函数会立即返回,说明请求已经进行排队(成功返回0,失败返回-1.并设置errno值)。

3.       aio_error

aio_error函数用来确定请求的状态。

       int aio_error(struct aiocb *aiocbp)

这个函数可以返回以下内容。

EINPROGRESS:说明请求尚未完成

ECANCELLED:说明请求被应用程序取消了

-1:具体错误看error值

4.aio_return

异步I/O和超标准I/O之间的另外一个区别是不能立即访问这个函数的返回状态,因为没有阻塞在家read()调用上。超标准的read()调用中,返回状态是在函数返回时提供的。但在异步I/O中,要由aio_return()提供。

       ssize_t aio_return(struct aiocb *aiocbp);

只有在aio_error()调用时返回请求已经完成(可能成功,也可能错误之后),才会调用此函数。

aio_return()返回值等价于同步情况中read或write系统调用的返回值。

       #include <aio.h>

       …

       int fd,ret;

       struct aiocb my_aiocb;

 

       fd = fopen(“myfile”,O_RDONLY);

       if (fd < 0)

              perror(“open”);

      

       bzero((char*)&my_aiocb,sizeof(struct aiocb));

      

       my_aiocb.aio_buf = malloc(BUFSIZE+1);

       if (!my_aiocb.aio_buf)

              perror(“malloc”);

 

       my_aiocb.aio_fildes = fd;

       my_aiocb.aio_nbytes = BUFFSIZE;

       my_aiocb.aio_offset = 0;

       ret = aio_read(&my_aiocb);

       if (ret < 0)

              perror(“read”);

      

       while (aio_error(&my_aiocb)) == EINPROGRESS);

 

       if  ((ret=aio_return(&my_aiocb)) > 0)

       {

              return ret;

       }

        else

       {

       //分析errorno

       }

用户可以使用aio_suspend()函数挂起(阻塞)调用进程,直到异步请求完成,此时会产生一个信号,或者发生其它超时操作。商用者提供一个aiocb引用列表,其中任何一个完成都会导致aio_suspend()返回。

       int aio_suspend(const struct aiocb *const cblist[], int n, const struct timespec *timeout);

 

       struct aiocb *cblist[MAX_KIST];

       bzero((char*)cblist,sizeof(cblist));

       cblist[0] = &my_aiocb;

       ret = aio_read(&my_aiocb);

       ret = aio_suspend(cblist, MAX_LIST, NULL);

aio_cancel()允许用户取消对某个文件执行的一个或所有I/O请求。

       int aio_cancel(int fd, struct aiocb *aiocbp);

取消一个请求,

return:

       AIO_CANCELED 请求成功取消

       AIO_NOTCANCELED 请求已完成

 

取消对某个文件的所有I/O请求,aiocb==NULL

return:

       AIO_CANCELED 请求成功取消

       AIO_NOT_CANCELED 至少一个请求没有被取消

       AIO_ALLDONE 没有一个可以被取消

lio_listio()函数可以同时发起多个传输。它使得用户可以在一个系统调用(一次内核上下文切换)中启动大量的I/O操作。

       int lio_listio(int mode, struct aiocb *list[], int nent, struct sigevent *sig);

mode参数可以是LIO_WAIT或LIO_NOWAIT。LIO_WAIT会阻塞这个调用,直到所有的I/O都完成。在操作进行排队之后,LIO_NOWAIT就会立即返回。list是一个aiocb引用的列表,最大元素的个数是由nent定义的。如果list的元素为NULL。lio_list()会将其忽略。

       struct aiocb aiocb1, aiocb2;

       struct aiocb *list[MAX_LIST];

       aiocb1.aio_fildes = fd;

       aiocb1.aio_buf = malloc(BUFFSIZE+1);

       aiocb1.aio_nbytes = BUFFSIZE;

       aiocb1.aio_offset = next_offset;

       aiocb1.aio_lio_opcode = LIO_READ;//异步读操作//LIO_WRITE,LIO_NOP

       ……//多个aiocb

       bzero((char*)list,sizeof()list);

 

       list[0]= &aiocb1;

       list[1] = *aiocb2;

       …

       ret = lio_list(LIO_WAIT, list, MAX_LIST,NULL);

 

使用信号作为AIO的通知

       void setup_io(…)

       {

              int fd;

              struct sigaction sig_act;

              stuct aiocb my_aiocb;

             

              sigemptyset(&sig_act.sa_mask);

              sig_act.sa_flags = SA_SIGINFO;

              sig_act.sa_sigaction = aio_completion_handler;

 

              bzero((char*)&my_aiocb, sizeof(stuct aiocb));

              my_aiocb.aio_fildes = fd;

              my_aiocb.aio_buf = malloc(BUF_SIZE +1);

              my_aiocb.aio_nbytes = BUF_SIZE;

              my_aiocb.aio_offset = next_offset;

              //连接AIO请求和信号修理函数

              my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;

              my_aiocb.aio_sigevent.sigev_signo = SIGIO;

              my_aiocb.aio_sigevent.sigvalue.sival_ptr = &my_aiocb;

              //将信号与信号处理函数绑定

              ret = sigaction(SIGIO, &sig_act, NULL);

              ..

              ret = aio_read(&my_aiocb);//发出异步请求

}

 

viod aio_completion_handler(int signo, siginfo_t *info, void *context)

{

       struct aiocb *req;

      

       if (info->si_signo == SIGIO)

      {

              req = (struct aiocb*)info->si_value.sival_ptr;//获得aiocb

             

              if (aio_error(req) == 0)

              {

                     ret = aio_return(req);

              }

        }

      

        return;    

}

 

使用回调函数作为AIO的通知

除了信号之外,应用程序还可提供一个回调的函数给内核,以便AIO的请求完成后内核调用这个函数。

      void setup_io(…)

       {

              int fd;

              struct sigaction sig_act;

              stuct aiocb my_aiocb;          

 

              bzero((char*)&my_aiocb, sizeof(stuct aiocb));

              my_aiocb.aio_fildes = fd;

              my_aiocb.aio_buf = malloc(BUF_SIZE +1);

              my_aiocb.aio_nbytes = BUF_SIZE;

              my_aiocb.aio_offset = next_offset;

              //连接AIO请求和线程回调函数

              my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;

              my_aiocb.aio_sigevent.notify_function = aio_completion_handler;

              ;

              //设置回调函数

              my_aiocb.aio_sigevent.notify_attributes = NULL;

              my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

              ..

              ret = aio_read(&my_aiocb);//发出异步请求

}

 

viod aio_completion_handler(sigval_t sigval)

{

       struct aiocb *req;

       req = (struct aiocb*)sigval.sival_ptr;//获得aiocb

                           

       if (aio_error(req) == 0)

       {

                     ret = aio_return(req);

         }

      

         return;    

}

 

proc 文件系统包含两个虚拟文件,它们可以用来对异步I/O的性能进优化。

/proc/sys/fs/aio-nr 提供了系统范围异步I/O请求的最大数目。

/proc/sys/fs/aio-max-nr 所允许的并发请求的最大个数,最大个数通常为64KB。

AIO与设备驱动

内核中每个IO请求对应一个kiocb结构体,其ki_filp成员指向对应的file指针。通过is_sync_kiocb()可以判断kiocb是否为同步IO请求。
file_operations :


ssize_t (*aio_read)(struct kiocb *iocb, char *buffer,size_t count, loff_t offset);
ssize_t (*aio_write)(struct kiocb *iocb, const char *buffer,size_t count, loff_t offset);
ssize_t (*aio_read)(struct kiocb *iocb, int datasync);
--
static ssize_t xxx_aio_read(struct kiocb *iocb, char *buf, size_t count, loff_t pos)
{
     return xxx_defer_op(0,iocb,buf,count, pos);

}

 

static ssize_t xxx_aio_write(struct kiocb *iocb,const char *buf, size_t count, loff_t pos)
{
      return xxx_defer_op(1,iocb,(char *)buf,count, pos);

}


static int xxx_defer_op(int write,struct kiocb *iocb,const char *buf, size_t count, loff_t pos)

struct async_work *async_wk;

int reult;


if(write)

result =xxx_write(iocb->ki_filp, buf,count, &pos);

else

result = xxx_read(iocb->ki_filp,buf, count , &pos);


if (is_sync_kiocb(iocb))

return result;


async_wk = kmalloc(sizeof(*async_wk),GFP_KERNEL);

if(async_wk ==NULL)

return result;

async_wk->iocb =iocb;

async_wk->result = result;

INIT_WORK(&async_wk->work,xxx_do_deferred_op,async_wk);

schedule_delayed_work(&async_wk->work, HZ/100);//调度async_work

return -EIOCBQUEUED;

}

 

static void xxx_do_deferred_op(void *p)
{

struct  async_work *async_wk = (struct async_work *)p;

aio_complete(async_wk->iocb,async_wk->result,0);//通知内核已完成

kfree(async_wk);

}


struct async_work

{

struct kiocb *iocb;

int result;

struct work_struct  work;

}

 

 

PARTNER CONTENT

文章评论0条评论)

登录后参与讨论
EE直播间
更多
我要评论
0
6
关闭 站长推荐上一条 /3 下一条