原创 设计自己的嵌入式操作系统内核之六 ----- 信号量与邮箱实现

2011-8-1 19:40 4369 5 5 分类: MCU/ 嵌入式

1、概述
在嵌入式操作系统中,信号量机制用于任务间的同步与通信。一般在信号量内部维护两种结构,一个是计数器,另一个是任务阻塞队列。计数器用于指明事件发生的次数或者当前某资源的可用数,而阻塞队列中各项则为等待外部事件或资源可用的任务。
典型的信号量操作为:发送信号量P(),用于通知外部事件发生或已释放资源;等待信号量V(),用于消耗外部事件或资源,当不可用时,则将任务进入信号量阻塞队列等待。

信号量作为一种任务间的同步机制,为任务间的相互协作提供了极大的方便。虽然内核提供了多任务机制能够简化整个系统的实现,但各任务间不可能是相互独立的。一个经典的问题便是基于有限缓冲区的生产者-消费者问题。该问题在操作系统原理课程中常作为引入信号量概念的问题而被提出来。作为生产者的任务不可能在有限缓冲的情况下任意写入数据,而作为消费者的任务也不可能从缓冲取数据。在<<嵌入式实时操作系统uc/os ii>>中信号量被比为旗帜。

邮箱机制用于进程间进行通信。嵌入式操作系统内维护多个邮箱,任务可以由邮箱收发消息。相互协作的进程间可以通过邮箱传递信息。从某种程序上来说,邮箱和信号量的功能类似,只是任务在向邮箱发信号时可以同时附加一消息,消息可用来指明发生的事件类型等。
当向邮箱发送空消息时,这里对邮箱的操作便退出化信号量操作。

eos提供了邮箱机制用于任务间通信。邮箱为所有任务共享,任何任务都可向邮箱发送、接收消息。并且一个邮箱可容纳多个消息。在传递消息时,并不传递消息本身,而是传递消息体的指针,以避免因信息的拷贝带来的性能损失。

eos也实现了信号量机制。其外部接口同其它的内核信号量相差不多,具体实现也比较简单。在介绍具本的实现之前,先看下任务与事件队列的关系。

2、事件队列与任务
因为eos除了实现信号量、邮箱之外,还实现了消息队列等机制。其中可提取出一些相同的操作,即对事件队列的处理。
所谓事件队列,即信号量等对像维护的因等待事件而阻塞的任务队列。其通用的操作为:事件队列的初始化、销毁、任务由就绪队列进入事件队列、由事件队列出进入就绪队列。

事件队列的结构一般为:
点击看大图

在有些场合下,等待同一事件的任务有不同的优先级,而因事件发生唤醒任务时首先是最高优先级任务,这时需要多级队列 mlist_t wait_list. 而有时不需要考虑优先级时,使用单级队列slist_t wait_list,既能简化操作,又可减少存储空间。

队列的初始化:即将队列清空。由mlist_init()或slist_init()完成。
队列的销毁:
  对于单级队列有:uint8 event_slist_del( slist_t * list );  /* 销毁阻塞队列*/
对于多级队列有:uint8 event_mlist_del( mlist_t * list );
这两个函数完成的操作一致,流程如下:

反复从list取出阻塞的任务task,直至队列为空
如果 task同时被置入延时队列
从延时队列移除
将task置入就绪队列
设置task恢复运行的状态.
task->wait_state = OS_ERR_DEL; /* 返回: OS_ERR_DEL*/
task->event = ( void * )0;
在task_struct结构中.wait_state用于保存任务等待外部事件的状态。OS_ERR_DEL指明因为阻塞队列被删除而就绪。当阻塞的任务重新执行时,通过.wait_state的值为OS_ERR_DEL可得知是因为阻塞队列被删除而就绪。

任务由就绪队列->阻塞队列:
支持函数为:
void event_slist_wait( void * event, uint8 wait_state, slist_t * list, uint16 ticks );
void event_mlist_wait( void * event, uint8 wait_state, mlist_t * list, uint16 ticks );

其操作流程大致如下:
如果 ticks > 0, 即同时使能了超时等待
将任务置入延时队列OSDelayQ中。
将当前任务从就绪队列移除,并插入list队列
设置任务状态,设置task_struct结构中的.event域,.wait_state, state域

前面已经提到,正在运行的任务也是处于就绪队列的。
这一操作发生于当前任务等待某一未发生的事件或不可用的资源时,邮运行态进行阻塞态,任务进入阻塞队列list中。.event值指明任务等待的对像,如信号量、邮箱控制块的指针,.wait_state指明等待哪种类型的对像,.list则是任务将被置入的阻塞队列。

任务由阻塞队列->就绪队列:
  支持函数:uint8 event_slist_rdy( slist_t * list, void * event, uint8 wait_state )
 uint8 event_mlist_rdy( mlist_t * list, void * event, uint8 wait_state )

其操作流程大致如下:
从list中取任务task.
如果task同时处于延时队列
将task从延时队列移除
将task插入就绪队列,并修改运行状态
修改任务返回状态,设置task_struct结构中的.wait_state, evernt域。

这一操作发生于当前任务某一事件发生或资源可用时,任务由阻塞态进入就绪态。调用该函数的任务将可将消息由.event 参数传送至唤醒的任务,wait_state参数则告诉唤醒的任务其是被唤醒的原因是什么。(如等待的对像被删除,等待超时等 )
当任务返回时,可从task_struct结构中取.wait_state, event的值获取等待的状态。

3、信号量实现
有了事件队列提供的基本操作,信号量的实现也就简单的多。
如下图所示,eos 的信号量包含了计数器和队列两部分,其中队列可为多级队列也可为单级队列,图中只显示了多级队列。

点击看大图

信号量结构的定义为:
typedef struct _semaphore_t /* 信号量结构 */
{
? uint8 cnt; /* 当前计数 */
? uint8 max_cnt; /* 最大计数 */

#if OS_SEM_SET_MLIST == 1
? mlist_t wait_list; /* 阻塞队列 */
#else
? slist_t wait_list;
#endif

}* semaphore_t;

typedef struct /* 信号量查询结构 */
{
? uint8 cnt; /* 信号量查询当前量 */
? uint8 max_cnt; /* 信号量最大允许计数 */
}sem_info_t;

相关的操作为:
uint8 semaphore_create( semaphore_t * sem, uint8 base_cnt, uint8 max_cnt );?
uint8 semaphore_wait( semaphore_t sem, uint16 ticks ); /* 等待信号量 */
uint8 semaphore_signal( semaphore_t sem ); /* 向信号量发送信号*/
uint8 semaphore_destroy( semaphore_t sem ); /* 删除信号量 */
uint8 semaphore_try_wait( semaphore_t sem ); * 无阻塞等待信号量*/
uint8 semaphore_info( semaphore_t sem, sem_info_t * sem_info );/* 查询状态*/
uint8 semaphore_set_cnt( semaphore_t sem, uint8 cnt ); /* 设置信号量计数值*/
 
各操作的详细说明:
smeaphore_create()操作流程:
由mpool_get()分配一块信号量结构的控制块
设定信号量计数值
初始化阻塞队列,队列为空

semaphore_wait()操作流程:
检查是否在中断中或调度器上锁时调用
否:
如果信号量计数 > 0?
计数减1
否,调用event_xlist_wait()将任务置入阻塞队列.

semaphore_try_wait()操作流程:
检查信号量计数是否 > 0
是,计数减1
返回原始计数

semaphore_signal()操作流程:
如果阻塞队列非空
调用event_xlist_wait()唤醒阻塞队列中某任务
调用scheduler()进行调度
否则
信号量计数加1

semaphore_destroy()操作流程:
调用event_xlist_del()销毁阻塞队列
如果有一个或多个任务从阻塞队列移除
进行任务调度
调用mpool_free()回收信号量控制块

semaphore_set_cnt()操作流程:
如果阻塞队列为空
设定信号量计数值

semaphore_info()操作流程:
将信号量结构中的计数信息复制到sem_info_t的结构体中


很显然,信号量由于内部只维护一个计数器,因而只能用于通知某个事件的发生。而像邮箱、消息队列这样的机制,还允许在通知任务事件发生的同时传递一个或多个消息,以确定事件的具体内容。

从后面有关邮箱、消息队列的实现来看,信号量的操作是邮箱消息队列操作集的一个子集,其完成了最基本的任务阻塞及唤醒功能。

四、邮箱机制的实现
1)、邮箱控制块:
邮箱控制块结构如下:
typedef struct _mbox_t /* 邮箱控制块 */
{
? uint8 mail_cnt; /* 消息槽中可用消息数*/
? uint8 read; /* 消息槽写指针 */
? uint8 write; /* 消息槽读(写)指针 */
? uint8 size; /* 最大消息槽数 */
? void ** mail; /* 消息槽 */

#if OS_MBOX_SET_MLIST == 1
? mlist_t wait_list; /* 接收消息阻塞队列 */
#else
? slist_t wait_list;
#endif
} * mbox_t;

typedef struct
{
? void ** mail; /* 邮箱中邮件存储首*/
? uint8 mail_cnt; /* 消息槽中可用消息数*/
? uint8 free_cnt; ?
? uint8 size; /* 最大消息槽数 */
}mbox_info_t;
  
  由上述代码所示,邮箱控制块结构可视为信号量结构(计数变量为mail_cnt)和一双循环缓冲区组合。其中双循环缓冲用于保存消息,并且只保存消息的指针。循环缓冲的首址为mail, 读写指针分别为read, write。缓冲内有效的消息数量为mail_cnt,最大可容许的消息数量为size.

基本的操作接口函数如下:
uint8 mbox_create( mbox_t * mbox, void ** buf, uint8 size );/* 创建一个邮箱 */
uint8 mbox_recv( mbox_t mbox, uint16 ticks, void ** msg ); /* 从邮箱 中取消息 */
uint8 mbox_flush( mbox_t mbox ); /* 刷新邮箱缓冲 */
uint8 mbox_destroy( mbox_t mbox );    /* 销毁邮箱 */
uint8 mbox_try_recv( mbox_t mbox, void ** mail ); /* 无阻塞的从邮箱取消息 */
uint8 mbox_send( mbox_t mbox, void * mail, uint8 opt ); //* 向邮箱发消息 */
?uint8 mbox_info( mbox_t mbox, mbox_info_t * info ); /* 查询邮箱信息 */

接口函数的基本操作流程:
mbox_create()操作流程:
调用mpool_get()分配一邮箱控制块
初始化邮箱控制块中的循环缓冲结构
初始化邮箱控制块中阻塞队列

mbox_flush()操作流程:
邮箱阻塞队列是否为空
是,恢复循环缓冲结构为初始化状态

mbox_destroy()操作流程:
检查邮箱内是否有消息
是,则退出
否。
邮箱阻塞队列是否有任务等待

调用event_mlist_del()销毁阻塞队列,就绪队列中任务
调用scheduler()进行调度
调用mpool_free()回收邮箱控制块

mbox_recv()操作流程:
检查是否在中断中或调度器上锁时调用
是,
立即退出
否,
检查邮箱中是否有邮箱
是,?
从邮箱中取邮件,并调用读指针read值
返回,退出函数
否,
调用event_xlist_wait()将自己阻塞,任务进入阻塞队列
调用scheduler()进行调度
任务返回时,从OSTaskCur->event取获取消息。
返回OSTaskCur->wait_state值。
注:前面已经说明。task_struct结构中的域/.event, .wait_state用于支持邮箱、信号量操作函数。在mbox_recv()中,当调用者由于邮箱为空而被挂起,当有其它任务发消息时,其消息将直接发送至阻塞任务,.阻塞任务.event将保存该消息的指针,.wait_state域将保存任务执行mbox_recv()的状态。


mbox_try_recv()操作流程:
检查邮箱是否有邮件
是,?
从邮箱取邮件,并调用.read读指针
返回 OS_ERR_OK
返回 OS_ERR_MBOX_EMPTY

mbox_send()操作流程:
检查阻塞队列是否为空
否,?
调用event_xlist_rdy()唤醒阻塞队列的某任务。被唤醒任务的.event指由发送的消息。.wait_state值为OS_ERR_OK
返回,退出函数
是,
检查邮箱是否已满
否,
检查是否以OS_MBOX_POST_LIFO方式按LIFO方式发消息
是?
将消息由.read指针处写入,调整read指针

将消息由write指针处写入,调整write指针
返回 OS_ERR_OK

返回OS_ERR_MBOX_FULL

mbox_info()操作流程:
将邮箱控制块结构中的相应域复制到mbox_inof_t结构中

比较邮箱与信号量的实现,其基本操作流程基本一致,只是邮箱还需要处理从循环缓冲中读写消息操作。

这里的邮箱实现很显可以看出是仿照的uc/os。虽然在任务收发消息时,传送的是指向消息体的指针,不存在拷贝,操作快速,效率很高;但是,在一些情况下应用并不是很方便。比如说,发消息的任务将往某块内存写入消息后,调用mbox_send()发消息,因为只发送该块内存的指针,而不是将该块内存的内容复制至邮箱。发送者怎么知道接收者已接收到消息?而且何时知道接收者已处理完消息?因为传送的是消息体指针,对于收/发者而言,该块内存则成为二者共享的,在接收任务处理完消息前,发送者不能再对其写入。除非任务间存在某种协调,否则,发消息的任务如果要多次发消息,是应该避免使用同一内存块传递消息,那对消息体的必要存储管理是必须的。
因而考虑以上的问题,存储消息的内存可交给存储管理器管理,由发消息任务申请分配,由接消息队列负责释放。也可以考虑实现共享内存的管理,共享内存区内有多块内存,可以容许有多个生产者与消费者。生产者产生数据写入某个共享内存区,消费者则从共共享内存区读数据并处理。但与邮箱机制不同,生产者、消息是直接对共享内存区内的内存进行读写。可以考虑将内存区内的各块内存划分为同样大小的内存块,以简化问题。

四、简单的演示
提供了一个简单的测试实例 app_sem.c 演示了信号量的操作。
提供了一个简单的测试实例 app_mbox.c 演示了邮箱的操作。

5、问题与思考
最初我的想法是同时在struct _semaphore_t 结构中维护一个阻塞的任务计数,并且也尝试实现。但后来,在实现任务删除函数时就碰到一个问题:当将等待信号量的任务从队列中删除时,仅调用event_xlist_del()显然是不够的,必须同时修改计数值。这样就有必要在任务的task_struct结构中保存指向等待的信号量的指针。这种做法行得通。但如果对邮箱、消息队列也使用类似的方法,显然就需要添加指向邮箱、消息队列的指针。并且还需要提供一种方法判定当前阻塞的任务是在等待哪种对像。 
当然有一种简单的解决方法。可以将等待的任务计数附加在队列结构中,这样就不必知道任务在等待何种对像了。

提出之个问题,主要是想说明在这个简单的内核设计过程中,因为自己的知识水平不足。在最开始实现时,没有做比较完整的初步设计,而是参考ucos的代码,在实现一个模块后再实现另一个。这样由于没有考虑周全,前一模块的实现,往往因为设计不当,导致在后面的模块实现时,遇到很大问题,反过来导致又需要前面模块的修改。这样的情况出现过多次,并且有几次的修改幅度较大。这大概就是软件工程学中的需求分析、初步设计没搞好的原因。

六、相关源码
core.c / core.h ---------- 内核支持函数文件
sem.c /ipc.h --- 信号量实现文件
mbox.c/ip.h ----------- 邮箱实现文件

pdf


PARTNER CONTENT

文章评论0条评论)

登录后参与讨论
我要评论
0
5
关闭 站长推荐上一条 /3 下一条