原创 并发和竞争情况总结

2009-6-5 10:14 1717 7 7 分类: MCU/ 嵌入式








zip

原文抄自http://blog.chinaunix.net/u1/34474/showart.php?id=408682

一、并发及其管理



竞争情况来自对资源的共享存取的结果.因此第一个经验法则是在你设计驱动时在任何可能的时候记住避免共享的资源.
如果没有并发存取,
就没有竞争情况.这个想法的最明显应用是避免使用全局变量.



事实是,
然而,
这样的共享常常是需要的.
硬件资源是,
由于它们的特性,
共享的,
软件资源也必须常常共享给多个线程.
也要记住全局变量远远不是共享数据的唯一方式;
任何时候你的代码传递一个指针给内核的其他部分,
潜在地它创造了一个新的共享情形.


这是资源共享的硬规则:


  1. 任何时候一个硬件或软件资源被超出一个单个执行线程共享,
    并且可能存在一个线程看到那个资源的不一致时,
    你必须明确地管理对那个资源的存取.
    存取管理的常用技术是加锁或者互斥 —
    确保在任何时间只有一个执行线程可以操作一个共享资源.


  2. 当内核代码创建一个会被内核其他部分共享的对象时,
    这个对象必须一直存在(并且功能正常)到它知道没有对它的外部引用存在为止.



二、信号量和互斥体



一个信号量(semaphore:
旗语,信号灯)本质上是一个整数值,它和一对函数联合使用,这一对函数通常称为PV。希望进入临届区的进程将在相关信号量上调用P;如果信号量的值大于
零,则该值会减小一,而进程可以继续。相反,如果信号量的值为零(或更小),进程必须等待知道其他人释放该信号。对信号量的解锁通过调用
V完成;该函数增
加信号量的值,并在必要时唤醒等待的进程。



当信号量用于互斥时(即避免多个进程同是在一个临界区运行),信号量的值应初始化为1。这种信号量在任何给定时刻只能由单个进程或线程拥有。在这种使用模式下,一个信号量有事也称为一个“互斥体(mutex)”,它是互斥(mutual
exclusion
)的简称。Linux内核中几乎所有的信号量均用于互斥



使用信号量,内核代码必须包含<asm/semaphore.h>


 


以下是信号量初始化的方法:




/*初始化函数*/
void
sema_init
(struct
semaphore
*sem,
int
val
);



 


由于信号量通常被用于互斥模式。所以以下是内核提供的一组辅助函数和宏:




/*方法一、声明+初始化宏*/
DECLARE_MUTEX
(name);
DECLARE_MUTEX_LOCKED
(name);

/*
方法二、初始化函数*/
void
init_MUTEX
(struct
semaphore
*sem);
void
init_MUTEX_LOCKED
(struct
semaphore
*sem);

/*
带有“_LOCKED”的是将信号量初始化为0,即锁定,允许任何线程访问时必须先解锁。没带的为1*/



 


P函数为:




void
down
(struct
semaphore
*sem);
/*
不推荐使用,会建立不可杀进程*/
int
down_interruptible
(struct
semaphore
*sem);/*
推荐使用,使用down_interruptible需要格外小心,若操作被中断,该函数会返回非零值,而调用这不会拥有该信号量。对down_interruptible的正确使用需要始终检查返回值,并做出相应的响应。*/
int
down_trylock
(struct
semaphore
*sem);/*
带有“_trylock”的永不休眠,若信号量在调用是不可获得,会返回非零值。*/



 


V函数为:




void
up
(struct
semaphore
*sem);/*
任何拿到信号量的线程都必须通过一次(只有一次)对up的调用而释放该信号量。在出错时,要特别小心;若在拥有一个信号量时发生错误,必须在将错误状态返回前释放信号量。*/






Using
Semaphores in scull——
scull中使用信号量




/*
Initialize each device. */

    
for
(i
=
0
;
i
<
scull_nr_devs
;
i
++)
{
        scull_devices
[i].quantum
=
scull_quantum
;
        scull_devices
[i].qset
=
scull_qset
;
        init_MUTEX
(&scull_devices[i].sem);/* 
注意顺序:先初始化好互斥信号量
,再使
scull_devices可用。*/
        scull_setup_cdev
(&scull_devices[i],
i
);
    
}



读取者/写入者信号量



只读任务可并行完成它们的工作,而不需要等待其他读取者退出临界区。Linux内核提供了读取者/写入者信号量“rwsem”,使用是必须包括<linux/rwsem.h>


初始化:




void
init_rwsem
(struct
rw_semaphore
*sem);



 


只读接口:




void
down_read
(struct
rw_semaphore
*sem);
int
down_read_trylock
(struct
rw_semaphore
*sem);
void
up_read
(struct
rw_semaphore
*sem);



 


写入接口:




void
down_write
(struct
rw_semaphore
*sem);
int
down_write_trylock
(struct
rw_semaphore
*sem);
void
up_write
(struct
rw_semaphore
*sem);




void
downgrade_write
(struct
rw_semaphore
*sem);/*
该函数用于把写者降级为读者,这有时是必要的。因为写者是排他性的,因此在写者保持读写信号量期间,任何读者或写者都将无法访问该读写信号量保护的共享资源,对于那些当前条件下不需要写访问的写者,降级为读者将,使得等待访问的读者能够立刻访问,从而增加了并发性,提高了效率。*/



 



一个
rwsem
允许一个写者或无限多个读者来拥有该信号量.
写者有优先权;
当某个写者试图进入临界区,
就不会允许读者进入直到写者完成了它的工作.
如果有大量的写者竞争该信号量,则这个实现可能导致读者“饿死”,即可能会长期拒绝读者访问。因此,
rwsem
最好用在很少请求写的时候,
并且写者只占用短时间.



completion
completion
是一种轻量级的机制,它允许一个线程告诉另一个线程某个工作已经完成。代码必须包含<linux/completion.h>。使用的代码如下:


 




DECLARE_COMPLETION(my_completion);/*
创建completion(声明+初始化)
*/

/////////////////////////////////////////////////////////

struct
completion my_completion
;/*
动态声明completion
结构体*/
static
inline
void
init_completion
(&my_completion);/*
动态初始化completion*/

///////////////////////////////////////////////////////

void
wait_for_completion
(struct
completion
*c);/*
等待completion
*/

void
complete
(struct
completion
*c);/*
唤醒一个等待completion的线程*/
void
complete_all
(struct
completion
*c);/*
唤醒所有等待completion的线程*/

/*
如果未使用completion_allcompletion可重复使用;否则必须使用以下函数重新初始化completion*/
INIT_COMPLETION
(struct
completion c
);/*
快速重新初始化completion*/




 completion的典型应用是模块退出时的内核线程终止。在这种远行中,某些驱动程序的内部工作有一个内核线程在while(1)循环中完成。当内核准备清楚该模块时,exit函数会告诉该线程退出并等待completion。为此内核包含了用于这种线程的一个特殊函数:




void
complete_and_exit
(struct
completion
*c,
long
retval
);






三、自旋锁



对于互斥,
旗标是一个有用的工具,
但是它们不是内核提供的唯一这样的工具.
相反,
大部分加锁是由一种称为自旋锁的机制来实现.
不象旗标,
自旋锁可用在不能睡眠的代码中,
例如中断处理.
当正确地使用了,
通常自旋锁提供了比旗标更高的性能.
然而,
它们确实带来对它们用法的一套不同的限制.



自旋锁概念上简单.
一个自旋锁是一个互斥设备,
只能有
2
个值:"上锁""解锁".



这个"测试并置位"操作必须以原子方式进行,
以便只有一个线程能够获得锁,
就算如果有多个进程在任何给定时间自旋.


适用于自旋锁的核心规则:


  1. 任何拥有自旋锁的代码都必须使原子的,除服务中断外(某些情况下也
    不能放弃CPU,如中断服务也要获得自旋锁。为了避免这种锁陷阱,需要在拥有自旋锁时禁止中断),不能放弃CPU(如休眠,休眠可发生在许多无法预期的地
    方)。否则CPU将有可能永远自旋下去(死机)。


  2. 拥有自旋锁的时间越短越好。





spinlock_t
my_lock
=
SPIN_LOCK_UNLOCKED
;/*
编译时初始化spinlock*/
void
spin_lock_init
(spinlock_t
*lock);/*
运行时初始化spinlock*/

/*
所有spinlock等待本质上是不可中断的,一旦调用spin_lock,在获得锁之前一直处于自旋状态*/
void
spin_lock
(spinlock_t
*lock);/*
获得spinlock*/
void
spin_lock_irqsave
(spinlock_t
*lock,
unsigned
long
flags
);/*
获得spinlock,禁止本地cpu中断,保存中断标志于flags*/
void
spin_lock_irq
(spinlock_t
*lock);/*
获得spinlock,禁止本地cpu中断*/
void
spin_lock_bh
(spinlock_t
*lock)/*
获得spinlock,禁止软件中断,保持硬件中断打开*/
/*
以下是对应的锁释放函数*/
void
spin_unlock
(spinlock_t
*lock);
void
spin_unlock_irqrestore
(spinlock_t
*lock,
unsigned
long
flags
);
void
spin_unlock_irq
(spinlock_t
*lock);
void
spin_unlock_bh
(spinlock_t
*lock);

/*
以下非阻塞自旋锁函数,成功获得,返回非零值;否则返回零*/
int
spin_trylock
(spinlock_t
*lock);
int
spin_trylock_bh
(spinlock_t
*lock);




/*
新内核的<linux/spinlock.h>包含了更多函数*/



读取者/写入者自旋锁:




rwlock_t
my_rwlock
=
RW_LOCK_UNLOCKED
;/*
编译时初始化*/




rwlock_t
my_rwlock
;
rwlock_init
(&my_rwlock);
/*
运行时初始化*/




void
read_lock
(rwlock_t
*lock);
void
read_lock_irqsave
(rwlock_t
*lock,
unsigned
long
flags
);
void
read_lock_irq
(rwlock_t
*lock);
void
read_lock_bh
(rwlock_t
*lock);




void
read_unlock
(rwlock_t
*lock);
void
read_unlock_irqrestore
(rwlock_t
*lock,
unsigned
long
flags
);
void
read_unlock_irq
(rwlock_t
*lock);
void
read_unlock_bh
(rwlock_t
*lock);

/*
新内核已经有了read_trylock*/

void
write_lock
(rwlock_t
*lock);
void
write_lock_irqsave
(rwlock_t
*lock,
unsigned
long
flags
);
void
write_lock_irq
(rwlock_t
*lock);
void
write_lock_bh
(rwlock_t
*lock);
int
write_trylock
(rwlock_t
*lock);




void
write_unlock
(rwlock_t
*lock);
void
write_unlock_irqrestore
(rwlock_t
*lock,
unsigned
long
flags
);
void
write_unlock_irq
(rwlock_t
*lock);
void
write_unlock_bh
(rwlock_t
*lock);

/*
新内核的<linux/spinlock.h>包含了更多函数*/



锁陷阱



锁定模式必须在一开始就安排好,否则其后的改进将会非常困难。


不明确规则



如果某个获得锁的函数要调用其他同样试图获取这个锁的函数,代码就会锁死。(不允许锁的拥有者第二次获得同个锁。)为了锁的正确工作,不得不编写一些函数,这些函数假定调用这已经获得了相关的锁。


锁的顺序规则



再必须获取多个锁时,应始终以相同顺序获取。



若必须获得一个局部锁和一个属于内核更中心位置的锁,应先获得局部锁。



若我们拥有信号量和自旋锁的组合,必须先获得信号量。



不得再拥有自旋锁时调用down。(可导致休眠)



尽量避免需要多个锁的情况。


细颗粒度和粗颗粒度的对比



应该在最初使用粗颗粒度的锁,除非有真正的原因相信竞争会导致问题。


四、锁之外的办法


 


1)免锁算法


经常用于免锁的生产者/消费者任务的数据结构之一是循环缓冲区。内核里有一个通用的循环缓冲区的实现在
<linux/kfifo.h>


 





2)原子变量



完整的锁机制对一个简单的整数来讲显得浪费。内核提供了一种原子的整数类型,称为atomic_t,定义在<asm/atomic.h>原子变量操作是非常快的,
因为它们在任何可能时编译成一条单个机器指令。


以下是其接口函数:




void
atomic_set
(atomic_t
*v,
int
i
);
/*
设置原子变量
v
为整数值
i.*/
atomic_t
v
=
ATOMIC_INIT
(0);  /*
编译时使用宏定义
ATOMIC_INIT
初始化原子值.*/

int
atomic_read
(atomic_t
*v);
/*
返回
v
的当前值.*/

void
atomic_add
(int
i
,
atomic_t
*v);/*

v
指向的原子变量加
i.
返回值是
void*/
void
atomic_sub
(int
i
,
atomic_t
*v);
/*

*v
减去
i.*/

void
atomic_inc
(atomic_t
*v);
void
atomic_dec
(atomic_t
*v);
/*
递增或递减一个原子变量.*/

int
atomic_inc_and_test
(atomic_t
*v);
int
atomic_dec_and_test
(atomic_t
*v);
int
atomic_sub_and_test
(int
i
,
atomic_t
*v);
/*
进行一个特定的操作并且测试结果;
如果,
在操作后,
原子值是
0,
那么返回值是真;
否则,
它是假.
注意没有
atomic_add_and_test.*/

int
atomic_add_negative
(int
i
,
atomic_t
*v);
/*
加整数变量
i

v.
如果结果是负值返回值是真,
否则为假.*/

int
atomic_add_return
(int
i
,
atomic_t
*v);
int
atomic_sub_return
(int
i
,
atomic_t
*v);
int
atomic_inc_return
(atomic_t
*v);
int
atomic_dec_return
(atomic_t
*v);
/*

atomic_add
和其类似函数,
除了它们返回原子变量的新值给调用者.*/




atomic_t
数据项必须通过这些函数存取。
如果你传递一个原子项给一个期望一个整数参数的函数,
你会得到一个编译错误。需要多个
atomic_t
变量的操作仍然需要某种其他种类的加锁。


 


3)位操作



内核提供了一套函数来原子地修改或测试单个位。原子位操作非常快,
因为它们使用单个机器指令来进行操作,
而在任何时候低层平台做的时候不用禁止中断.
函数是体系依赖的并且在
<asm/bitops.h>
中声明.
以下函数中的数据是体系依赖的.
nr
参数(描述要操作哪个位)ARM体系中定义为unsigned
int





void
set_bit
(nr,
void
*addr);
/*
设置第
nr
位在
addr
指向的数据项中。*/

void
clear_bit
(nr,
void
*addr);
/*
清除指定位在
addr
处的无符号长型数据.*/

void
change_bit
(nr,
void
*addr);/*
翻转nr.*/

test_bit
(nr,
void
*addr);
/*
这个函数是唯一一个不需要是原子的位操作;
它简单地返回这个位的当前值.*/



/*以下原子操作如同前面列出的,
除了它们还返回这个位以前的值.*/



int
test_and_set_bit
(nr,
void
*addr);
int
test_and_clear_bit
(nr,
void
*addr);
int
test_and_change_bit
(nr,
void
*addr);



  


4seqlock



2.6内核包含了一对新机制打算来提供快速地,
无锁地存取一个共享资源。
seqlock要保护的资源小,简单,并且常常被存取,并且很少写存取但是必须要快。seqlock
通常不能用在保护包含指针的数据结构。seqlock
定义在
<linux/seqlock.h>





/*两种初始化方法*/
seqlock_t
lock1
=
SEQLOCK_UNLOCKED
;

seqlock_t
lock2
;
seqlock_init
(&lock2);




这个类型的锁常常用在保护某种简单计算,读存取通过在进入临界区入口获取一个(无符号的)整数序列来工作.
在退出时,
那个序列值与当前值比较;
如果不匹配,
读存取必须重试.读者代码形式




unsigned
int
seq
;
do
{
    seq
=
read_seqbegin
(&the_lock);
    
/*
Do what you need to do */

}
while
read_seqretry
(&the_lock,
seq
);




 如果你的
seqlock
可能从一个中断处理里存取,
你应当使用
IRQ
安全的版本来代替:




unsigned
int
read_seqbegin_irqsave
(seqlock_t
*lock,
unsigned
long
flags
);
int
read_seqretry_irqrestore
(seqlock_t
*lock,
unsigned
int
seq
,
unsigned
long
flags
);




写者必须获取一个排他锁来进入由一个
seqlock
保护的临界区,写锁由一个自旋锁实现,调用:




void
write_seqlock
(seqlock_t
*lock);
void
write_sequnlock
(seqlock_t
*lock);




因为自旋锁用来控制写存取,
所有通常的变体都可用:




void
write_seqlock_irqsave
(seqlock_t
*lock,
unsigned
long
flags
);
void
write_seqlock_irq
(seqlock_t
*lock);
void
write_seqlock_bh
(seqlock_t
*lock);

void
write_sequnlock_irqrestore
(seqlock_t
*lock,
unsigned
long
flags
);
void
write_sequnlock_irq
(seqlock_t
*lock);
void
write_sequnlock_bh
(seqlock_t
*lock);




 还有一个
write_tryseqlock
在它能够获得锁时返回非零.



5)读取-复制-更新



读取-拷贝-更新(RCU)
是一个高级的互斥方法,
在合适的情况下能够有高效率.
它在驱动中的使用很少。





















PARTNER CONTENT

文章评论0条评论)

登录后参与讨论
EE直播间
更多
我要评论
0
7
关闭 站长推荐上一条 /3 下一条