2012/6

perfbook 读书笔记:Non-Blocking Synchronization

perfbook 中对 NBS 这一节的描述尚未完成。我来简单补充一下。

Non-Blocking 其实很简单,它和 Blocking 相反,也就是非阻塞的意思。阻塞大家都知道,典型的 mutex,semaphore 就是阻塞型的锁,如果其它线程持有锁,该线程会进入阻塞状态,直到它可以获取锁。

非阻塞型的锁就是竞争时不会阻塞的锁,最典型的就是家喻户晓的自旋锁了,在竞争的情况下它会一直自旋,即忙等(busy wait),而不是阻塞。所以 spin lock 是 NBS 的一种。

NBS 的另一类比较高级的形式是 lock-free,从名字上也很容易看出,就是无锁。判断 lock-free 的一个标准是:如果正在进行操作的线程休眠了,别的线程依旧可以进来照常操作。这样就直接把 spin lock 给排除在外了。lock-free 通常使用的是 RMW (read-modify-write)原语,由硬件来支持,它最常见的一种实现是 CAS,即 Compare And Swap,在 x86 上对应的是 cmpxchg 指令。Linux 内核中把 cmpxchg 给封装成了一个通用的函数。第一次看到这个函数感觉有些别扭,估计是出于性能的考虑最新的内核代码中的实现还不如之前的实现容易让人读懂,下面是 2.6.32 中它 generic 的实现:

[c]
static inline unsigned long __cmpxchg(volatile unsigned long *m,
unsigned long old, unsigned long new)
{
unsigned long retval;
unsigned long flags;

    local_irq_save(flags);
    retval = *m;
    if (retval == old)
            *m = new;
    local_irq_restore(flags);
    return retval;

}
[/c]

比较新的 gcc 中有和它对应的一个内置原子函数 __sync_bool_compare_and_swap()。cmpxchg 一个典型的用例是 lockless list 的实现,见其插入操作的实现:

[c]
static inline bool llist_add(struct llist_node new, struct llist_head head)
{
struct llist_node entry, old_entry;

    entry = head->first;
    for (;;) {
            old_entry = entry;
            new->next = entry;
            entry = cmpxchg(&head->first, old_entry, new);
            if (entry == old_entry)
                    break;
    }

    return old_entry == NULL;

}
[/c]

我们可以看出,它虽然是 lock-free 的,但是避免不了等待(wait),因为这种 cmpxchg 一般都套在一个循环中直到操作成功才会退出,不成功就会一直尝试。不运行的话你根本不会知道它到底会尝试多少次才能成功。

另外一种更加高级的 NBS 就是 wait-free 了,这个可以说是大家梦寐以求的东西,它不阻塞,不用忙等,连等待都不用,也可以避免竞争!减少了很多 CPU 的浪费!显然 wait-free 一定是 lock-free 的,它们之间最大的区别是:等待的次数是否有上限。Wait-free 要求每个线程都可以在有限的步骤之内完成操作,不管其它线程如何!无奈这个太难实现,而且一般都比较复杂,即使实现了它的 overhead 也未必就比等价的 lock-free 的小。网上有很多相关的资料,你可以自行搜索一下。

参考资料:

1. http://julien.benoist.name/lockfree.html
2. http://audidude.com/?p=363
3. http://burtleburtle.net/bob/hash/lockfree.html
4. http://en.wikipedia.org/wiki/Non-blocking_synchronization
5. http://en.wikipedia.org/wiki/Lock-free_and_wait-free_algorithms

Python heapq 模块的实现

无意间看了一下 Python heapq 模块的源代码,发现里面的源代码写得很棒,忍不住拿出来和大家分享一下。:)

heapq 的意思是 heap queue,也就是基于 heap 的 priority queue。说到 priority queue,忍不住吐槽几句。我上学的时候学优先级队列的时候就没有碰到像 wikipedia 上那样透彻的解释,priority queue 并不是具体的某一个数据结构,而是对一类数据结构的概括!比如栈就可以看作是后进入的优先级总是大于先进入的,而队列就可以看成是先进入的优先级一定高于后进来的!这还没完!如果我们是用一个无序的数组实现一个priority queue,对它进行出队操作尼玛不就是选择排序么!尼玛用有序数组实现入队操作尼玛不就是插入排序么!尼玛用堆实现(即本文要介绍的)就是堆排序啊!用二叉树实现就是二叉树排序啊!数据结构课学的这些东西基本上都出来了啊!T_T

heapq 的代码也是用 Python 写的,用到了一些其它 Python 模块,如果你对 itertools 不熟悉,在阅读下面的代码之前请先读文档。heapq 模块主要有5个函数:heappush(),把一个元素放入堆中;heappop(),从堆中取出一个元素;heapify(),把一个列表变成一个堆;nlargest() 和 nsmallest() 分别提供列表中最大或最小的N个元素。

先从简单的看起:

[python]
def heappush(heap, item):
“””Push item onto heap, maintaining the heap invariant.”””
heap.append(item)
_siftdown(heap, 0, len(heap)-1)

def heappop(heap):
“””Pop the smallest item off the heap, maintaining the heap invariant.”””
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup(heap, 0)
else:
returnitem = lastelt
return returnitem
[/python]

从源代码我们不难看出,这个堆也是用数组实现的,而且是最小堆,即 a[k] <= a[2*k+1] and a[k] startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if cmp_lt(newitem, parent):
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem

def _siftup(heap, pos):
endpos = len(heap)
startpos = pos
newitem = heap[pos]

# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1    # leftmost child position
while childpos &lt; endpos:
    # Set childpos to index of smaller child.
    rightpos = childpos + 1
    if rightpos &lt; endpos and not cmp_lt(heap[childpos], heap[rightpos]):
        childpos = rightpos
    # Move the smaller child up.
    heap[pos] = heap[childpos]
    pos = childpos
    childpos = 2*pos + 1
# The leaf at pos is empty now.  Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)

[/python]

上面的代码加上注释很容易理解,不是吗?在此基础上实现 heapify() 就很容易了:

[python]
def heapify(x):
"""Transform list into a heap, in-place, in O(len(x)) time."""
n = len(x)

# Transform bottom-up.  The largest index there&#039;s any point to looking at
# is the largest with a child index in-range, so must have 2*i + 1 &lt; n,
# or i &lt; (n-1)/2\.  If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1\.  If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that&#039;s again n//2-1.
for i in reversed(xrange(n//2)):
    _siftup(x, i)

[/python]
这里用了一个技巧,正如注释中所说,其实只要 siftup 后面一半就可以了,前面的一半自然就是一个heap了。heappushpop() 也可以用它来实现了,而且用不着调用 heappush()+heappop():

[python]
def heappushpop(heap, item):
"""Fast version of a heappush followed by a heappop."""
if heap and cmp_lt(heap[0], item):
item, heap[0] = heap[0], item
_siftup(heap, 0)
return item
[/python]

第一眼看到这个函数可能觉得它放进去一个再取出来一个有什么意思嘛!仔细想想它很有用,尤其是在后面实现 nlargest() 的时候:

[python]
def nlargest(n, iterable):
"""Find the n largest elements in a dataset.

Equivalent to:  sorted(iterable, reverse=True)[:n]
&quot;&quot;&quot;
if n &lt; 0:
    return []
it = iter(iterable)
result = list(islice(it, n))
if not result:
    return result
heapify(result)
_heappushpop = heappushpop
for elem in it:
    _heappushpop(result, elem)
result.sort(reverse=True)
return result

[/python]

先从 list 中取出 N 个元素来,然后把这个 list 转化成 heap,把原先的 list 中的所有元素在此 heap 上进行 heappushpop() 操作,最后剩下的一定是最大的!因为你每次 push 进去的不一定是最大的,但你 pop 出来的一定是最小的啊!

但 nsmallest() 的实现就截然不同了:

[python]
def nsmallest(n, iterable):
"""Find the n smallest elements in a dataset.

Equivalent to:  sorted(iterable)[:n]
&quot;&quot;&quot;
if n &lt; 0:
    return []
if hasattr(iterable, &#039;__len__&#039;) and n * 10  Largest of the nsmallest
    for elem in it:
        if cmp_lt(elem, los):
            insort(result, elem)
            pop()
            los = result[-1]
    return result
# An alternative approach manifests the whole iterable in memory but
# saves comparisons by heapifying all at once.  Also, saves time
# over bisect.insort() which has O(n) data movement time for every
# insertion.  Finding the n smallest of an m length iterable requires
#    O(m) + O(n log m) comparisons.
h = list(iterable)
heapify(h)
return map(heappop, repeat(h, min(n, len(h))))

[/python]

这里做了一个优化,如果 N 小于 list 长度的1/10的话,就直接用插入排序(因为之前就是有序的,所以很快)。如果 N 比较大的话,就用 heap 来实现了,把整个 list 作成一个堆,然后 heappop() 出来的前 N 个就是最后的结果了!不得不说最后一行代码写得太精炼了!

Python memoize decorator

偶然看 Python 代码,看到这么一个叫memoize decorator 的东西,你会发现它非常有用。

看下面的例子,我们最常见的 fibonacci 数列的递归实现。

~% python
>>> def fib(n):
...     if n in (0, 1):
...             return n
...     else:
...             return fib(n-1)+fib(n-2)
...
>>> import timeit
>>> timeit.timeit("fib(5)", "from __main__ import fib")
3.946546792984009
>>> timeit.timeit("fib(7)", "from __main__ import fib")
10.944302082061768

很慢,不是么?你当然可以把它的实现改成迭代,但是,如果前提是我们不对fib()函数本身做任何代码修改的话,我们还有别的方法去改进它。

我们注意到,fib(n-1) 和 fib(n-2) 实际上重复计算了很多结果,根本没必要,一个自然而然的想法就是把这些结果缓存起来,于是就有了下面的:

>>> def memoize(f):
...     memo = {}
...     def wrapper(x):
...         if x not in memo:
...             memo[x] = f(x)
...         return memo[x]
...     return wrapper
...
>>>
>>> def fib(n):
...     if n in (0, 1):
...         return n
...     else:
...         return fib(n-1) + fib(n-2)
...
>>> fib = memoize(fib)
>>> import timeit
>>> timeit.timeit("fib(7)", "from __main__ import fib")
0.35535502433776855

还不够,Python 还有自己的解决办法,那就是用 decorator

[python]
def memoize(function):
memo = {}
def wrapper(args):
if args in memo:
return memo[args]
else:
rv = function(
args)
memo[args] = rv
return rv
return wrapper

@memoize
def fib(n):
if n in (0, 1):
return n
return fib(n-1) + fib(n-2)
[/python]

为此,在 Python3 中还专门引入了一个 lru_cache

这个东西的牛逼之处在于,有了它你可以放心地去写递归的代码,不用为了效率去把它改成迭代形式,你也知道,有些时候递归代码比迭代代码更容易理解。这样可读性有了,效率也提升了,不用修改函数的实现,直接让它从次方级降到了线性级。

为什么Linux内核不允许在中断中休眠?

我以前一直以为 Linux 内核之所以不允许在中断中休眠是因为中断上下文中无法获取 thread_info,task_struct 等进程相关的结构体,从而无法调度。

今天又重新看了一下相关的代码,发现实际上不是。在最新的代码中,x86 32 使用的 irq stack 中也保存了thread_info,我们可以看 execute_on_irq_stack() 的定义:

[c]
union irqctx {
struct threadinfo tinfo;
u32 stack[THREAD_SIZE/sizeof(u32)];
} __attribute
((aligned(THREAD_SIZE)));

static inline int
execute_on_irq_stack(int overflow, struct irq_desc desc, int irq)
{
union irq_ctx
curctx, irqctx;
u32
isp, arg1, arg2;

    curctx = (union irq_ctx *) current_thread_info();
    irqctx = __this_cpu_read(hardirq_ctx);

    /*
     * this is where we switch to the IRQ stack. However, if we are
     * already using the IRQ stack (because we interrupted a hardirq
     * handler) we can't do that and just have to keep using the
     * current stack (which is the irq stack already after all)
     */
    if (unlikely(curctx == irqctx))
            return 0;

    /* build the stack frame on the IRQ stack */
    isp = (u32 *) ((char *)irqctx + sizeof(*irqctx));
    irqctx-&gt;tinfo.task = curctx-&gt;tinfo.task;
    irqctx-&gt;tinfo.previous_esp = current_stack_pointer;

    /* Copy the preempt_count so that the [soft]irq checks work. */
    irqctx-&gt;tinfo.preempt_count = curctx-&gt;tinfo.preempt_count;

    if (unlikely(overflow))
            call_on_stack(print_stack_overflow, isp);

    asm volatile("xchgl     %%ebx,%%esp     n"
                 "call      *%%edi          n"
                 "movl      %%ebx,%%esp     n"
                 : "=a" (arg1), "=d" (arg2), "=b" (isp)
                 :  "0" (irq),   "1" (desc),  "2" (isp),
                    "D" (desc-&gt;handle_irq)
                 : "memory", "cc", "ecx");
    return 1;

}
[/c]

(注:x86 都已经使用 irq stack 了,和进程的内核栈独立开了,这样即使 irq 处理函数中占用了很多栈也不会影响外面的了。)

所以这不是问题。也就是说,技术上我们完全可以做到在中断中休眠。Linux 内核之所以没有这么做是出于设计上的原因。

如果这还不足以说明问题的话,还有个例子:do_page_fault() 也是在中断上下文中调用的(注:此处更严格的讲是 trap,而非 interrupt,但无论哪个都肯定不是普通的进程上下文,相对而言还算是中断上下文),但是它是可能休眠的,至少它明显调用了可能休眠的函数 down_read() 。

为什么要这么设计呢?因为在 Linux 内核中能让 do_page_fault() 休眠的地方基本上只有copy_from_user(),copy_to_user()(其它地方触发 page fault 会导致 oops),它们在使用用户空间的页面时可能会因为对应的页面不在物理内存中而触发 swap 等,即 handle_mm_fault() 所做的。这种情况下休眠是合理的,因为调用 copy_from_user() 的进程本身就是需要等待这个资源。休眠不就是为了等待资源吗?

为什么其它中断(注:此处是指 IRQ,或者严格意义上的 interrupt,非 trap)不能休眠?假设 CPU 上正在执行的是某个高优先级的进程 A,它本身没有使用任何网络通信,突然网卡中断来了,它被中断而且被休眠了,那么,这看起来就像是进程 A 本身在等待某网络资源,而实际上根本不是。

换句话说,如果在 IRQ 中休眠,被打断的当前进程很可能不是需要等待的进程,更进一步讲,在中断中我们根本无法知道到底是哪个进程需要休眠。更何况,这里还没有考虑到在某个中断中休眠会不会影响下一个中断之类的更复杂的问题。由此可见,如果真允许在中断中休眠的话,那么设计将会是很复杂的。

因此,这完全是一个设计的问题。中断中休眠技术上可实现,但设计上不好。

另外,Linux 内核中很多会休眠的函数会调用 might_sleep(),它是用来检测是否在中断上下文中,如果是就是触发一个警告 “BUG: sleeping function called from invalid context”。可是为什么 do_page_fault() 中的函数没有呢?这是因为普通的 IRQ 在进入处理函数之前都会调用 irq_enter(),它里面改变了进程的 preempt 的值:

[c]

define __irq_enter()

    do {                                            
            account_system_vtime(current);          
            add_preempt_count(HARDIRQ_OFFSET);      
            trace_hardirq_enter();                  
    } while (0)

void irq_enter(void)
{
int cpu = smp_processor_id();

    rcu_irq_enter();
    if (is_idle_task(current) &amp;&amp; !in_interrupt()) {
            /*
             * Prevent raise_softirq from needlessly waking up ksoftirqd
             * here, as softirq will be serviced on return from interrupt.
             */
            local_bh_disable();
            tick_check_idle(cpu);
            _local_bh_enable();
    }

    __irq_enter();

}
[/c]

而 page fault 的话不会经过这个流程。这也从另一面证明了这是个设计的问题。

顺便说一句,比较新的 Linux 内核中已经支持中断线程化了:

[c]
int request_threaded_irq(unsigned int irq, irq_handler_t handler,
irq_handler_t thread_fn, unsigned long irqflags,
const char devname, void dev_id)
[/c]

所以一部分中断处理可以在进程上下文中完成了。