原创 Linux 設備驅動中的並發控制

2010-8-2 15:55 2033 1 1 分类: MCU/ 嵌入式

並發與競態


並發(concurrency),多個執行單元同是時,並行被執行,而並發的執行單元對共享資源的訪問則很容易導致競態(race conditions)。


Linux內核中,主要的競態發生於如下幾種情況。


1.SMP(對稱多處理器)


SMP是一種緊耦合,共享存儲系統的系統模型,多個CPU使用共同的系統總線,訪問共同的外設和存儲器。


2.單CPU內進程與搶占它的進程


Linux 2.6內核支持搶占調度,一個進程在內核執行時可能被另一個高优先級進程打斷,進程與搶占它的進程訪問共同的共享資源。


3.中斷(硬中斷,軟中斷,Tasklet,底半部)與進程之間


中斷可以打斷正在執行的進程,中斷處理程序訪問與被中斷進程正在訪問的資源,則競態也會發生。


中斷也有可能被更高优先級的中斷打斷,多個中斷之間也可能引起並發而導致競態。


SMP是真正的並行,其它都 是“宏觀並行,微觀串行”的。


解決競態的途徑是對共享資源的互斥訪問。


訪問共享資源的代碼區稱為臨界區(critical section),臨界區需要以某種互斥機制加以保護。中斷屏蔽,原子操作,自旋鎖和信號量等是Linux設備驅動中可釆用的互斥機制。


 


中斷屏蔽


在單CPU范圍內避免競態的一種簡單的方法是進入臨界區之前屏蔽系統的中斷。中斷屏蔽的使用方法:


local_irq_disable()//屏蔽中斷


...


critical section


...


local_irq_enable()//開中斷


由于linux系統的异步I/O,進程調度等很多重要操作都依賴于中斷,中斷對于內核的運行非常重要,在屏蔽中斷期間都無法得到處理,因此長時間的中斷是很危險的,有可能造成數據丟失甚至系統崩潰。這就要求在屏蔽中斷之後,當前的內核執行路徑應當盡快地執行完全臨界區的代碼。


local_irq_disable()和local_irq_enable()都只能禁止和使能本CPU內的中斷,不能解決SMP引發的競態。因此,單獨使用中斷屏蔽通常不是一種值得推荐的避免競態方法,它適宜與自旋鎖聯合使用。


local_irq_save(flags)         禁止中斷,並保存中斷位信息 ,對應的開中斷:    local_irq_restore(flags)     與local_irq_save(flags)對應的開中斷


local_bh_disable()     禁止中斷的底半部,對應的開中斷 local_bh_enable()


 


原子操作


原子操作在執行過程中不會被別的代碼路徑中斷。


linux內核提供了針對位和整型變量的原子操作的兩類函數。它們的原子操作都依賴于CPU的原子操作來實現的,與CPU架构密切相關。


 


整型原子操作


1.設置原子變量的值


 void atomic_set(atmoic_t *v, int i);//設置原子變量的值為i


atmoic_t v = ATOMIC_INIT(0);//定義原子變量v並初始化為0


2.获得原子变量的值


atmoic_read(atomic_t *v); //返回原子变量的值


 


3.原子变量加/减


void atomic_add(int i, atomic_t *v);//原子变量增加1


void atomic_sub(int i, atomic_t *v);//原子变量减1


 


4.原子变量的自增/减


void atomic_inc(atomic_t *v);//原子变量自增1


void atomic_dec(atomic_t *v);//原子变量自减1


 


5.操作并测试


int atomic_inc_and_test(atomic_t *v);


int atomic_dec_and_test(atomic_t *v);


int atomic_sub_and_test(int i, atomic_t *v);


上述对原子变量操作(无增加)后测试其值是否为0,为0返回true,否则返回false.


 


6.操作并返回


int atomic_add_return(int i, atomic_t *v);


int atomic_sub_return(int i, atomic_t *v);


int atomic_inc_return(atomic_t *v);


int atomic_dec_return(atomic_t *v);


上述操作并返回新值。


 


位原子操作


1.       设置位


void set_bit(nr, void *addr);//设置addr地址的第nr位为1.


 


2.       清除位


void clear_bit(nr,void * addr);// 设置addr地址的第nr位为0.


 


3.       改变位


void change_bit(nr, void *addr);//对addr地址的第nr位置反.


 


4.       测试位


test_bit(nr, void *addr);//返回addr的第nr位.


 


5.       测试并操作位


int test_and_set_bit(nr, void *addr);


int test_and_clear_bit(nr, void *addr);


int test_and_change_bit(nr, void *addr);


上述操作先执行test_bit(nr, void *addr)再执行xxx_bit(nr, void *addr)。


 


自旋锁


自旋锁的使用


自旋锁(spin lock)是一种对临界资源进行互斥访问的典型手段,其名称来源于它的工作方式。为了获得一个自旋锁,在某种CPU上运行的代码需要先进行一个原子操作,该操作测试并设置(test-and-set)某个内存变量,由于它是原子操作,所以在该操作烤鱼顾之前不可能访问这个内存变量。


如果测试结果表明锁已空闲,则程序获得这个自旋锁并继续操作;否则,程序将在一个小的循环内重复这个“测试并设置”操作,即所谓的“自旋”。当自旋锁的持有者通过重置该变量释放这个自旋锁后,某个等待的“测试并设置”的操作向其调用者报告锁已释放。


Linux系统中与自旋锁相关的操作主要有如下4种。


1.       定义自旋锁


spinlock_t lock;


 


2.       初始化自旋锁


spin_lock_init(&lock);//动态初始化


 


3.       获得自旋锁


spin_lock(&lock);//阻塞直到获得锁


spin_trylock(&lock);//非阻塞,不能获得立即返回


 


4.       释放自旋锁


spin_unlock(&lock);


该宏与spin_lock/spin_trylock配对使用


//定义一个自旋锁


spinlock_t lock;


spin_lock_init(&lock);



critical section



spin_unlock(&lock);


 


自旋锁主要针对SMP或单CPU但内核可抢占的情况,对单CPU和内核不支持抢占的系统,自旋锁退化为空操作。


尽管用了自旋锁可以保证临界区不受到别的CPU和本CPU内的抢占进程打扰,但是得到锁的代码路径在执行临界区的时候还可能受到中断和底半部(BH)的影响。为了防止这种影响,就需要用到自旋锁的衍生。spin_lock()/spin-trylock()是自旋锁机制的基础,它们和开/关中断/中断底半部等结合形成整套的自旋锁机制。,


 


spin_lock_irq() =spin_lock() + local_irq_disable()


spin_unlock_irq() = spin_unlock() + local_irq_enable()


sping_lock_irqsave() = spin_lock() + local_irq_save()


spin_unlock_irqrestore() = spin_unlock + local_irq_restore()


spin_lock_bh() = spin_lock() + local_bh_disable()


spin_unlock_bh = spin_unlock + local_bh_enable()


 


自旋锁应谨慎使用,会降低系统性能或导致死锁。


自旋锁实际是忙等待,当锁不可用时,CPU一直循环执行“测试并设置”直到该锁可用,CPU在等待自旋锁时不做任何有用的工作,仅仅是等待。因此,只有在占用锁时间较短的情况下,使用自旋锁才是合理的。当临界区有大量或有共享设备的时候,需要较长时间占用锁,使用自旋锁会降低系统的性能。


自旋锁可能导致死锁。引发这个问题最常见的情况是递归使用一个自旋锁,即如果一个已经拥有某个自旋锁的CPU想第二次获得这个自旋锁,则该CPU将死锁。此外,若进程获得自旋锁之后再阻塞,也有可能导到处死锁的发生。copy_from_user(),copy_to_user()和kmalloc()等函数都有可能引起阻塞,因此在自旋锁占用期间不能调用这些函数。


 


使用自旋锁设备只能独享。


int xxx_count = o;//使用计数


 


static int xxx_open(struct inode * inode, struct file *filp )


{


       ..


       spinlock(&xxx_lock);


       if (xxx_count)


{


       spin_unlock(&xxx_lock)


return – EBUSY;;


}


 


xxx_count++;


spin_unlock(&xxx_lock);



reurn 0;


}


 


startic int xxx_release(struct inode *inode, struct file *filp)


{


       ..


       spin_lock(&xxx_lock);


       xxx_count--;


       spin_unlock(&xxx_lock);


 


       return 0;


}


 


读写自旋锁


自旋锁不管临界区究竟是进行怎样的操作,不管是读还是写,它都一视同仁。即便多个执行单元同时读取设备临界区资源也会被锁住。实际上,对共享资源并发访问时,多个执行单元同时读取它是不会有问题,自旋锁的衍生锁读写自旋锁(rwlock)可允许读的并发。


读写自旋锁是一种粒度比自旋锁更小的锁机制,它保留了“自旋”的概念,读-读,可同时执行,写-写是互斥的,读-写是互斥的。


读写自旋锁的操作如下。


1.       定义和初始化读写自旋锁


rwlock_t my_rwlock = RW_LOCk_UNLOCKED;//静态初始化


rwlock_t my_rwlock;


rwlock_init(&my_rwlock);//动态初始化


 


2.       读锁锁


void read_lock(rwlock_t *lock);


void read_lock_irq(rwlock _t *lock);


void read_lock_irqsave(rwlock _t *lock, unsigned long flags);


void read_lock_bh(rwlock _t *lock);


 


3.读解锁


void read_unlock(rwlock_t *lock);


void read_unlock_irq(rwlock _t *lock);


void read_unlock_irqrestore(rwlock _t *lock, unsigned long flags);


void read_unlock_bh(rwlock _t *lock);


 


4.写锁锁


void write_lock(rwlock_t *lock);


void write_trylock(rwlock_t *lock);//非阻塞


void write _lock_irq(rwlock _t *lock);


void write _lock_irqsave(rwlock _t *lock, unsigned long flags);


void write _lock_bh(rwlock _t *lock);


 


5.写解锁


void write _unlock(rwlock_t *lock);


void write _unlock_irq(rwlock _t *lock);


void write _unlock_irqrestore(rwlock _t *lock, unsigned long flags);


void write _unlock_bh(rwlock _t *lock);


 


顺序锁


顺序锁(seqlock)是读写锁的优化,读写不再互斥,写写互斥。若读操作期间,写执行单元已发生写操作,那么读操作必须重新操作,以确保得到的数据是完整的。顺序锁有一个限制,必须要求被保护的共享资源不能含有指针,因为写执行单元有可能使指针失效,读操作访问该指针,就导致Oops.


写执行单元的顺序锁操作:


1.       获得锁


void write_seqlock(seqlock_t *sl);


int write_tryseqlock(seqlock_t *sl);


write_seqlock_irqsave(lock,flags);


write_seqlock_irq(lock);


write_seqlock_bh(lock);


2.       释放锁


void write_sequnlock(seqlock_t *sl);


write_ sequnlock _irqsave(lock,flags);


write_ sequnlock _irq(lock);


write_ sequnlock _bh(lock);


 


 


读执行单元的顺序锁操作


1.       读开始


unsigned read_seqbegin(const seqlock_t *sl);


read_seqbeing_irqsave(lock,flags);


 


读执行单元在对被顺序锁sl保护的共享资源进行访问前需要调用该函数,该函数仅返回顺序锁s1的当前顺序号。


 


2.       重读


int read_seqretry(const seqlock *sl, unsigned iv);


read_seqretry_irqrestore(lock, iv, flags);


读执行单元在访问完被顺序锁s1保护的共享资源后需要调用该函数来检查,在读访问期间是否有写操作。如果有写操作,读执行单元就需要重新进行读操作。


 


do {


       seqnum = read_seqbegin(&seqlock_a);


       …..


} while (read_seqretry(&seqlock_a,seqnum));


 


读-拷贝-更新


RCU(Read-Copy-Update).对于被RCU保护的数据,读执行单元不需要获得任何锁就可以访问它,不使用原子指令,而且在除了Alpha的所有架构上不需要内存栅(Memory Barrier),因此不会导致锁竞争,内存延迟以及流水线停滞。


使用RCU的写执行单元在访问共享数据前先复制一个副本,然后对副本进行操作,最后使用一个回调机制在适当的时机把指向原数据的指针重新指向被修改的副本。这个时机就是在


所有引用该数据的CPU都退出对共享数据的操作的时候。读执行单元没有任何同步开销,而写执行单元的同步开销则取决于使用的写执行单元间的同步机制。


RCU可以看做读写锁的高性能版本,相比读写锁,RCU的优点在于既允许多个读执行单元同时访问被保护的数据,又允许多个读执行单元和多个写执行单元同时访问被保护的数据。


但是RCU不能替代读写锁,如果写比较多时,对读执行的性能提高不能弥补写执行导致的损失。使用RCU,写执行单元之间的同步开销会比较大,它需要延迟数据的释放,复制被修改的数据,它也必须使用某种锁机制同步并行的其它写执行单元的修改操作。


RCU操作:


1.       读锁定


rcu_read_lock()


rcu_read_lock_bh()


 


2读解锁


rcu_read_unlock()


rcu_read_unlock_bh()


 


rcu_read_lock()和rcu_read_unlock()实质是禁止和使能内核的抢占调度。


#define rcu_read_lock() preempt_disable()


#define rcu_read_unlock() preempt_enable()


#define rcu_read_lock_bh()  local_bh_disbal()


#define rcu_read_unlock_bh() local_bh_enable()


 


3.同步RCU


 sysnchronize_rcu()


由RCU写执行单元调用,它将阻塞写执行单元,直到所有的读执行单元已经完成读执行单元临界区,才会继续下一下操作。如果有多个RCU写执行单元调用该函数,它们将在下一个gace period(所有的读执行已经完成对临界区的访问)之后全部被唤醒。synchronize_rcu()保证所有的CPU都处理完正在运行的读执行单元临界区。


synchronize_kernel()


内核代码使用该函数来等待所有CPU处于可抢占状态,目前功能等同于sysnchronize_rcu(),


但现在是使用sysnchronize_sched(),它能保证正在运行的中断处理函数运行完毕,但不能保证正在运行的软中断处理完毕。


 


4.挂接回调


void fastcall call_rcu(struct rcu_head *head,


void (*func)(struct rcu_head *rcu));


函数call_rcu()也由RCU写执行单元调用,它不会使写执行单元阻塞,因面可以在中断上下文或软中断使用。该函数将把函数func挂接到RCU回调函数上,然后立即返回。函数


synchronize_rcu()的实现实际上使用了call_rcu()函数。


void fastcall call_rcu_bh(struct rcu_head *head,


void (*func)(struct rcu_head *rcu));


call_rcu_bh()的功能几乎与call_rcu()完全相同,唯一的差别就是它把软中断的完成当做经历一个quiesecent state(静默状态),因此如果写执行单元使用了该函数,在进程上下文的读执行单元必须使用rcu_read_lock_bh().


每个CPU维护两个数据结构rcu_date和rcu_bh_date,它们用于保存回调函数,函数call_rcu()把回调函数注册到rcu_date,call_rcu_bh()则把回调函数注册到rcu_date)bh,在每一个数据结构上,回调函数被注册到一个链表,先注册的排到前面,后注册排到末尾。


使用CRU时,读执行单元必须提供一个信号给写执行者能够确定数据可以被安全地释放或修改的时机。有一个专门的垃圾回收器不探测读执行单元的信号,一旦所有的读执行单元都已经发送信号告知它们都不在使用被CRU保护的数据结构,垃圾回收器就调用回收函数完成最后的数据释放或修改操作。


RCU的链表操作:


       static inline void list_add_rcu(struct list_head *new, struct list_head *head);


该函数把链表元素插入到CRU保护的链表head的开头,内存栅保证了在引用这个新插入的链表元素之前,新链表元素的链接指针的修改对所有读执行单元是可见的。


       static inline void list_add_tail_rcu(struct list_head *new, struct list_head *head);


该函数类似于list_add_rcu(),它将新的链表元素new被添加到被CRU保护的链表的末尾。


static inline void list_del_rcu(struct list_head *entry);
该函数从RCU保护的链表中删除指定的链表元素entry。


static inleine void list_replace_rcu(struct list_head* old, struct list_head *new);


该函数是RCU新添加的函数,度不存在非RCU版本。它使用新指针new代替old,内存栅保证在引用新元素之前,它对链接指针的修正对所有读者是可见的。


       list_for_each_rcu(pos,head);


该宏用于遍历由RCU保护的链表head,只要在读执行单元临界区使用该函数,它就可以安全地和其它_rcu链表操作函数并发运行如list_add_rcu().


       list_for_each_safe_rcu(pos, n, head);


该宏类似于list_for_rcu(),不同之处在于它允许安全在删除当前的链表元素pos。


       list_for_each_entry_rcu(pos, head, member);


该宏类似于list_for_each(),不同之处在于它用于遍历指定类型的数据结构链表,当前元素pos为一包含struct list_head结构的特定的数据结构。


       static inline hlist_del_rcu(struct hlist_node *n);


它从由RCU保护的哈希链表中移走链表元素n。


       static inline void hlist_add_head_rcu(struct hlist_node *n, struct list_head *h);


该函数用于把链表元素n插入到被RCU保护的哈希链表的开头,但同时允许读执行者对该哈希链表的遍历。内存栅确保在引用新链表元素之前,它对指针的修改对所有读执行单元可见。


       hlist_for_each(pos, head);


该宏用于遍历由RCU保护的哈希链表head,只要在读端临界区使用该函数,它就可安全地和其它_rcu哈希链表操作函数(如list_add_rcu)并发地运行。


     hlist_for_each_entry_rcu(tpos, pos, head, member); 


似于hlist_for_each(),不同之处在于它用于遍历指定类型的数据结构链表,当前元素pos为一包含struct list_head结构的特定的数据结构。


 


信号量


信号量与使用


semaphore是用于保护临界区的一种常用方法,它的使用方式和自旋锁类似。与自旋锁不同的是得不到信号量的进程不会在原地打转,而是进入休眠等待状态。


信号量的操作:


1.       定义信号量


struct seamaphore sem;


2.       初始化信号量


void sema_init(struct semaphore *sem, int val);//初始化信号量sema的值为val


void init_MUTES(struct semaphore *seam);//初始化互斥信号量值为1


void init_MUTEX_LOCKED(struct semaphore *sema);// //初始化信号量sema的值为0


DECLARE_MUTEX(name)//初始化信号量为1的宏


DECLARE_MUTEX_LOCKED(name)//初始化信号量为0的宏


3.       获得信号量


void down(struct semaphore *sem);//获得信号量,会导致睡眠,不能用在中断上下文使用


int down_interruptible(struct semaphore *sem);//信号能打断睡眠,也会导致该函数返回非0


       int doen_trylock(struct semaphore *sem);//获得返回0,否则返回非0,不会让调用者睡眠


4.       释放信号


void up(struct semaphore *sem);//释放信号量,唤醒等待者


 


信号量用于同步


信号量被初始化为0,可用于同步。


 


完成量用于同步


completion,用于一个执行单元等待另一个执行单元处理完成。


1.       定义


struct completion comp;


2.       初始化


init_completion(&comp);


DECLARE_COMPLETION(comp);//定义并初始化


3.       等待


void wait_for_completion(struct completion *comp);


4.       唤醒


void complete(struct completion *comp);


void complete_all(struct completion *comp);//唤醒所有等待的同一completion的执行单元


void complete_all_and_exit(struct completion *comp, long retval);//唤醒并将调用它的进程终止


 


自旋锁与信号量


自旋锁和信号量都是是解决互斥问题的基本方法。


严格意义上说,自旋锁和信号量是不同层次的互斥手段,后者的实现依赖于前者。在信号量本身的实现上,为了保证信号量结构存取的原子性,在多CPU中需要自旋锁来互斥。


信号量是进程线,用于多个进程之间对资源的互斥,虽然是在内核中,但是该内核执行路径是以进程的身份,代表进程来争夺资源的。如果竞争失败,会发生进程上下文切换。


自旋锁和信号量选择的原则:


当锁不能被获得时,使用信号量的开销是进程上下文的切换时间Tsw,使用自旋锁的开销是等待获取锁的时间(由临界区执行时间决定)Tcs,Tcs<Tsw,使用自旋锁,Tcs>Tsw,使用信号量。


信号量所保护的临界区可以包含可能引起阻塞的代码,而自旋锁绝对保护要避免包含这样的代码的临界区。阻塞意味要进行进程的切换,如果进程被切换出去后,若其它进程企图获取同一自旋锁,死锁就会产生。


信号量存在于进程上下文,因此,如果被保护的共享资源需要在中断或软中断情况下使用,则在信号量和自旋锁之间只能选择自旋锁。如果,一定要使用信号量,则只能通过down_trylock()方式进行,不能获取就立即返回以出口避免阻塞。


 


读写信号量


读写信号量允许多个读执行单元同时访问共享资源,但最多只有一个写执行单元。


1.       定义和初始化读写信号量


struct rw_semaphore  rw_sem;//定义读写信号量


void init_rwsem(struct rw_semaphore * sem);//


2.       读信号量获取


void down_read(struct rw_semaphore *sem);


int down_read_trylock(struct rw_semaphore *sem);


3.       读信号量释放


void up_read(struct rw_semaphore *sem);


4.       写信号量获取


void down_write(struct rw_semaphore *sem);


int down_write_trylock(struct rw_semaphore *sem);


5.       释放写信号量


void up_write(struct rw_semaphore* sem);


 


互斥体


内核中的互斥体


struct mutex my_mutex;


mutext_init(&my_mutex);//初始化


void fastcall mutex_lock(struct mutex *lock)


init fastcall mutext_lock_interruptible(struct mutex *lock);//引起的休眠可被信号打断


init factcall mutex_trylock(struct mutex *lock);//获取不到,返回不引起睡眠


void fastcall mutex_unlock(struct mutex * lock); 


 

文章评论0条评论)

登录后参与讨论
我要评论
0
1
关闭 站长推荐上一条 /2 下一条