原创 semaphore的实现机制详解

2010-5-12 17:21 1737 7 7 分类: MCU/ 嵌入式

 


作者:李强,华清远见嵌入式学院讲师。


semaphore是内核中比较重要和常用的同步方式之一,他主要的特点是实现了Sleep机制下的同步。也就是当获取一个semaphore但是又不能立刻获取的时候,他使当前的执行进程进入到Sleep状态中等待,当semaphore可以获取的时候,从新开始运行,而不像splin lock在获取锁的时候是BusyWait。


首先看其定义:


struct semaphore {
                atomic_t count; // 原子变量,是后续的实际代码中,我们能看到其即为我们在初始化时所设置的信号量。
                int sleepers; // 有几个等待者。
                wait_queue_head_t wait; // 等待队列
        };


初始化函数:
        static inline void sema_init (struct semaphore *sem, int val)
        {
                atomic_set(&sem->count, val); // 原子操作,把信号量的值设为原子操作的值。
                sem->sleepers = 0; // 设为等待者为0。
                init_waitqueue_head(&sem->wait); // 初始化等待队列。
        }


好看完了初始化函数,我们来看一下一个特例PV操作:


static inline void init_MUTEX (struct semaphore *sem)
        {
                sema_init(sem, 1);
        }


static inline void init_MUTEX_LOCKED (struct semaphore *sem)
        {
                sema_init(sem, 0);
        }


从中我们可以看到,其实我们所说的PV操作就是调用sema_init来把其中的原子变量分别置0或者1。


下面我们来看下具体的操作函数:


static inline void down(struct semaphore * sem)
        {
                might_sleep();
                __asm__ __volatile__(
                "# atomic down operation\n\t"
                LOCK_PREFIX "decl %0\n\t" /* --sem->count */
                "jns 2f\n"
                "\tlea %0,%%eax\n\t"
                "call __down_failed\n"
                "2:"
                :"+m" (sem->count)
                :
                :"memory","ax");
        }


关于might_sleep():
        #define might_sleep() \
                do { __might_sleep(__FILE__, __LINE__); might_resched(); } while (0)


从中可以看到其根本是调用might_resched():
        #define might_resched() cond_resched()


其调用了cond_resched():


int __sched cond_resched(void)
        {
                if (need_resched() && !(preempt_count() & PREEMPT_ACTIVE) &&
                system_state == SYSTEM_RUNNING) {
                        __cond_resched();
                        return 1;
                 }
                return 0;
        }


其中:


static inline int need_resched(void)
        {
                return unlikely(test_thread_flag(TIF_NEED_RESCHED));
                // TIF_NEED_RESCHED 这个值在arm是2,在i386中是3。
        }


其是判断进程的状态是否是需要调度。


其具体实现是:


#define test_thread_flag(flag) \
        test_ti_thread_flag(current_thread_info(), flag)


得到当前进程的状态,然后和flag做位&操作,判断是否可以做调度。


看下关于抢占机制的判断:
        #define preempt_count() (current_thread_info()->preempt_count)


调用preempt_count()得到当前进程关于抢占的状态,preempt_count & PREEMPT_ACTIVE 为TRUE,取反为当前不可以抢占,就不能调度,否则为可调度。


看些系统状态:system_state 为运行状态。


综合上面的几个条件,我们看出当前进程可以被调度或者说抢占,于是当前进程被调度放弃CPU资源。
might_sleep宏就是检查是否需要重新调度。


下面我们进入汇编代码区:


__asm__ __volatile__(
        "# atomic up operation\n\t"
        LOCK_PREFIX "decl %0\n\t" /* --sem->count */
        "jns 2f\n"        /* 当符号标志位为0的时候跳转到标号2地方,标志获取锁,结束本次操作。 */
        "\tlea %0,%%eax\n\t"
        "call __down_failed\n"        /* 获取不到的话,压栈,调用 __down_failed */
        "2:"
        :"+m" (sem->count)
        :
        :"memory","ax");


LOCK_PREFIX宏:


在多处理器环境中 LOCK_PREFIX 实际被定义为 “lock”前缀。


x86 处理器使用“lock”前缀的方式提供了在指令执行期间对总线加锁的手段。芯片上有一条引线 LOCK,如果在一条汇编指令(ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, XCHG)前加上“lock” 前缀,经过汇编后的机器代码就使得处理器执行该指令时把引线 LOCK 的电位拉低,从而把总线锁住,这样其它处理器或使用DMA的外设暂时无法通过同一总线访问内存。


注意:从 P6 处理器开始,如果指令访问的内存区域已经存在于处理器的内部缓存中,则“lock” 前缀并不将引线 LOCK 的电位拉低,而是锁住本处理器的内部缓存,然后依靠缓存一致性协议保证操作的原子性。


在比较新的内核版本中Version >2.6.20, i386平台找不到关于__down_failed的具体定义:


linux-2.6.13.4>arch>i386>kernel>semaphore.c
        asm(
        ".section .sched.text\n"
        ".align 4\n"
        ".globl __down_failed\n"
        "__down_failed:\n\t"
        #if defined(CONFIG_FRAME_POINTER)
                "pushl %ebp\n\t"
                "movl %esp,%ebp\n\t"
        #endif
                "pushl %edx\n\t"
                "pushl %ecx\n\t"
                "call __down\n\t"
                "popl %ecx\n\t"
                "popl %edx\n\t"
        #if defined(CONFIG_FRAME_POINTER)
                "movl %ebp,%esp\n\t"
                "popl %ebp\n\t"
        #endif
                "ret"
        );


从汇编代码中我们可以看到其调用了__down()来实现其等待获取锁的过程。


fastcall void __sched __down(struct semaphore * sem)
        {
                struct task_struct *tsk = current; // 当前进程task_struct。
                DECLARE_WAITQUEUE(wait, tsk); // 定义等待队列。
                unsigned long flags;


        tsk->state = TASK_UNINTERRUPTIBLE; // 设置当前进程状态为不可中断状态。
                spin_lock_irqsave(&sem->wait.lock, flags);
                // spin_loc_irqsave 在获得自旋锁之前禁止中断(只在当前处理器);之前的中断状态保存在 flags 里。
                add_wait_queue_exclusive_locked(&sem->wait, &wait);
                        // 将当前的进程添加到等待队列的队尾。操作是不需要锁,因为此函数一般是在获取锁之后调用。


        sem->sleepers++;
                for (;;) {
                        int sleepers = sem->sleepers;


                /*
                        * Add "everybody else" into it. They aren't
                        * playing, because we own the spinlock in
                        * the wait_queue_head.
                        */
                        if (!atomic_add_negative(sleepers - 1, &sem->count)) {
                                        // 将(sleepers - 1) + ( &sem->count),如果当结果值为负数时返回真。
                                        /* First:初始化的时候,count = val, sleeper = 0,
                                        * down()中, count -- 成为 val - 1,以PV操作为例,其val值为1,count = 0,
                                        * __down()中,sleeper ++ 为 1,
                                        * 此时(sleepers - 1, &sem->count) 为0,不为负值,故此为FALSE,跳出循环。
                                        *
                                        * Two:由于在跳出的时候sleepers设置为0,故此Other进程尝试想再次获取锁资源的时候,
                                        * (sleepers - 1, &sem->count) 为 ( 0 - 1, 0 )为负值,此时为真,进入下次循环。
                                        */
                               sem->sleepers = 0; //
                               break;
                       }
                       sem->sleepers = 1;        /* us - see -1 above */
                                                                     /* 设置sleeper为1,标识有进程在等待。*/
                       spin_unlock_irqrestore(&sem->wait.lock, flags);
                                                                     /* 解开锁,开始准备被调度。 */


                schedule();


                spin_lock_irqsave(&sem->wait.lock, flags);
                        tsk->state = TASK_UNINTERRUPTIBLE;
                }
                remove_wait_queue_locked(&sem->wait, &wait);
                                /* 获取到了锁资源,把当前进程从等待队列中删掉。 */
                wake_up_locked(&sem->wait);
                                /* 唤醒等待队列 */
                spin_unlock_irqrestore(&sem->wait.lock, flags);
                tsk->state = TASK_RUNNING;
        }


这里要注意的是对sleeper成员的理解结合后面的代码:


进程在没有获得锁时会将其设为1,在获得了锁的时候将其设为0。


所以,sleeper = 1时表示有进程在等待,sleeper = 0时,有进程获得信号量退出,或者没有进程在等待此信号量。


如果在等待队列中进程被唤醒并获得了锁,则将sleeper设为0,然后用break退出循环,再用wake_up_locked(&sem->wait)唤醒等待队列中的一个进程。
这个进程下次被调度的时候从shedule()后面开始运行,更改进程状态。


调用if (!atomic_add_negative(sleepers - 1, &sem->count))判断,此时sleeper = 0,sleep - 1 = -1,经过atomic_add_negative()使sem->count - 1了。


因此前面论述过,进程进来取信号量 down()里count有减1,然后如果有等待线程在获取到这个信号,刚又会将count + 1,使其变到了原值。


现在在我们这个情景中,已经有线程释放信号了,应该要 + 1。


那思考一下:


为什么要对count sleeper做这样的处理呢?直接使用使用计数不就完了吗?进程等待获取信号量的时候sleep+1 count-1.释放信号量的时候sleep-1.count+1不可以吗?


表面上上述的方法也能工作的很好,但是如果进程数目一多就要考虑到溢出问题了。


static inline void up(struct semaphore * sem)
        {
        __asm__ __volatile__(
                "# atomic up operation\n\t"
                LOCK_PREFIX "incl %0\n\t" /* ++sem->count */
                "jg 1f\n\t"
                "lea %0,%%eax\n\t"
                "call __up_wakeup\n"
                "1:"
                :"+m" (sem->count)
                :
                :"memory","ax");
        }

        asm(
        ".section .sched.text\n"
        ".align 4\n"
        ".globl __up_wakeup\n"
        "__up_wakeup:\n\t"
                "pushl %edx\n\t"
                "pushl %ecx\n\t"
                "call __up\n\t"
                "popl %ecx\n\t"
                "popl %edx\n\t"
                "ret"
        );


fastcall void __up(struct semaphore *sem)
        {
                wake_up(&sem->wait);
        }


释放锁资源就比较简单了,首先是利用汇编判断其是否是有符号大于,我们这里count是为0,不是大于则call __up_wakeup(),最终执行__up(),唤醒等待队列。


至于相应信号的分析如下:


static inline int down_interruptible(struct semaphore * sem)
        {
                int result;


        might_sleep();
                __asm__ __volatile__(
                        "# atomic interruptible down operation\n\t"
                        "xorl %0,%0\n\t"
                        LOCK_PREFIX "decl %1\n\t"        /* --sem->count */
                        "jns 2f\n\t"
                        "lea %1,%%eax\n\t"
                        "call __down_failed_interruptible\n"
                        "2:"
                        :"=&a" (result), "+m" (sem->count)
:
                        :"memory");
                return result;
        }


fastcall int __sched __down_interruptible(struct semaphore * sem)
        {
                int retval = 0; /* 返回值 */
                struct task_struct *tsk = current;
                DECLARE_WAITQUEUE(wait, tsk);
                unsigned long flags;


        tsk->state = TASK_INTERRUPTIBLE;
                spin_lock_irqsave(&sem->wait.lock, flags);
                add_wait_queue_exclusive_locked(&sem->wait, &wait);


        sem->sleepers++;
                for (;;) {
                        int sleepers = sem->sleepers;


                /*
                        * With signals pending, this turns into
                        * the trylock failure case - we won't be
                        * sleeping, and we* can't get the lock as
                        * it has contention. Just correct the count
                        * and exit.
                        */
                        if (signal_pending(current)) {
                                /* 判断是否有中断触发,如果有则返回非0值,否则返回0。*/
                                retval = -EINTR;
                                sem->sleepers = 0;
                                /* 重置sleepers = 0 */
                                atomic_add(sleepers, &sem->count);
                                /* 让等待者等于count。*/
                                break;
                        }


                /*
                        * Add "everybody else" into it. They aren't
                        * playing, because we own the spinlock in
                        * wait_queue_head. The "-1" is because we're
                        * still hoping to get the semaphore.
                        */
                        if (!atomic_add_negative(sleepers - 1, &sem->count)) {
                                sem->sleepers = 0;
                                break;
                        }
                        sem->sleepers = 1; /* us - see -1 above */
                        spin_unlock_irqrestore(&sem->wait.lock, flags);


                schedule();


                spin_lock_irqsave(&sem->wait.lock, flags);
                        tsk->state = TASK_INTERRUPTIBLE;
                }
                remove_wait_queue_locked(&sem->wait, &wait);
                wake_up_locked(&sem->wait);
                spin_unlock_irqrestore(&sem->wait.lock, flags);


        tsk->state = TASK_RUNNING;
                return retval;
        }


static inline int down_trylock(struct semaphore * sem)
       {
                  int result;


         __asm__ __volatile__(
                           "# atomic interruptible down operation\n\t"
                           "xorl %0,%0\n\t"
                           LOCK_PREFIX "decl %1\n\t" /* --sem->count */
                           "jns 2f\n\t" /* 可以获取锁,锁之。*/
                           "lea %1,%%eax\n\t"
                           "call __down_failed_trylock\n\t"
                           "2:\n"
                           :"=&a" (result), "+m" (sem->count)
                           :
                           :"memory");
                return result;
        }


fastcall int __down_trylock(struct semaphore * sem)
        {
                int sleepers;
                unsigned long flags;


        spin_lock_irqsave(&sem->wait.lock, flags);
                sleepers = sem->sleepers + 1;
                        /* 不能获取锁,把sleepers + 1,判断是否有等待进程,如果有唤醒其。*/
                sem->sleepers = 0;


        /*
                * Add "everybody else" and us into it. They aren't
                * playing, because we own the spinlock in the
                * wait_queue_head.
                */
                if (!atomic_add_negative(sleepers, &sem->count)) {
                wake_up_locked(&sem->wait);
                }


        spin_unlock_irqrestore(&sem->wait.lock, flags);
                return 1;
        }

PARTNER CONTENT

文章评论0条评论)

登录后参与讨论
EE直播间
更多
我要评论
0
7
关闭 站长推荐上一条 /3 下一条