稍微了解 Linux 内核的人都知道,在 x86 上内核中所有的 struct page 是放在一个数组中管理的,它就是 mem_map,通过它我们就可以用 pfn 作为 index 来找到对应的 struct page 了:

[c]

define __pfn_to_page(pfn) (mem_map + ((pfn) - ARCH_PFN_OFFSET))

define __page_to_pfn(page) ((unsigned long)((page) - mem_map) +

                             ARCH_PFN_OFFSET)

[/c]

这是以前旧的 memory model(这个词很流行,你应该在很多地方见过,比如 C/C++标准),现在情况变了。

因为对 NUMA 和内存热插拔(memory hot-plug)的支持,Linux 内核中现在又引入两个新的 memory model,之前那个旧的被称为 Flat memory,新的两个被称为 Discontiguous memory 和 Sparse memory。它们对应的选项是:CONFIG_FLATMEM,CONFIG_DISCONTIGMEM,CONFIG_SPARSEMEM。让我们来看看这两个新的 memory model。

顺便多说两句,NUMA 和内存热插拔并没有直接的联系,NUMA 系统不一定支持内存的热插拔,而可以进行内存热插拔的系统也不一定是 NUMA 的!NUMA 支持对应的选项是 CONFIG_NUMA,而内存热插拔对应的选项是 CONFIG_MEMORY_HOTPLUG 和 CONFIG_MEMORY_HOTREMOVE。由此也可以看出 Linux 内核配置是多么灵活,你可以任意搭配你需要的选项。

CONFIG_DISCONTIGMEM

mm/Kconfig 中对它的介绍是:

This option provides enhanced support for discontiguous memory systems, over FLATMEM. These systems have holes in their physical address spaces, and this option provides more efficient handling of these holes. However, the vast majority of hardware has quite flat address spaces, and can have degraded performance from the extra overhead that this option imposes.

Many NUMA configurations will have this as the only option.
Discontiguous memory 其实很简单,它上从 flat memory 的基础上对 NUMA 进行扩展得出来的。每一个 node 都有一个 struct pglist_data,对于 discontiguous memory 其中记录了每个 node 的 node_start_pfn、node_spanned_pages、node_mem_map,分别表示该 node 的起始页的 PFN、物理页的数量、这些页面的 mem_map。我们可以看 alloc_node_mem_map() 是如何初始化这几个值的:

[c]
static void __init_refok alloc_node_mem_map(struct pglist_data pgdat)
{
/
Skip empty nodes */
if (!pgdat->node_spanned_pages)
return;

ifdef CONFIG_FLAT_NODE_MEM_MAP

    /* ia64 gets its own node_mem_map, before this, without bootmem */
    if (!pgdat->node_mem_map) {
            unsigned long size, start, end;
            struct page *map;

            /*
             * The zone's endpoints aren't required to be MAX_ORDER
             * aligned but the node_mem_map endpoints must be in order
             * for the buddy allocator to function correctly.
             */
            start = pgdat->node_start_pfn & ~(MAX_ORDER_NR_PAGES - 1);
            end = pgdat->node_start_pfn + pgdat->node_spanned_pages;
            end = ALIGN(end, MAX_ORDER_NR_PAGES);
            size =  (end - start) * sizeof(struct page);
            map = alloc_remap(pgdat->node_id, size);
            if (!map)
                    map = alloc_bootmem_node_nopanic(pgdat, size);
            pgdat->node_mem_map = map + (pgdat->node_start_pfn - start);
    }
    //....

}
[/c]

所以,它对应的 pfn_to_page() 和 page_to_pfn() 定义如下:

[c]

define arch_local_page_offset(pfn, nid)

    ((pfn) - NODE_DATA(nid)->node_start_pfn)

define __pfn_to_page(pfn)

({ unsigned long pfn = (pfn);
unsigned long
nid = arch_pfn_to_nid(pfn);
NODE_DATA(
nid)->node_mem_map + arch_local_page_offset(pfn, nid);
})

define __page_to_pfn(pg)

({ const struct page __pg = (pg);
struct pglist_data
pgdat = NODE_DATA(page_to_nid(pg));
(unsigned long)(pg - pgdat->node_mem_map) +
__pgdat->node_start_pfn;
})
[/c]

两个 node 之间的内存未必是连续的,中间可能有内存空洞,空洞的单位是 64M,但整个内存依然是 flat 的:

[c]
/*

  • generic node memory support, the following assumptions apply:
    *
  • 1) memory comes in 64Mb contiguous chunks which are either present or not
  • 2) we will not have more than 64Gb in total
    *
  • for now assume that 64Gb is max amount of RAM for whole system
  • 64Gb / 4096bytes/page = 16777216 pages
    */

    define MAX_NR_PAGES 16777216

    define MAX_SECTIONS 1024

    define PAGES_PER_SECTION (MAX_NR_PAGES/MAX_SECTIONS)

extern s8 physnode_map[];

static inline int pfn_to_nid(unsigned long pfn)
{

ifdef CONFIG_NUMA

    return((int) physnode_map[(pfn) / PAGES_PER_SECTION]);

else

    return 0;

endif

}
[/c]

CONFIG_SPARSEMEM

Sparse memory 是一个相对比较复杂的模型。mm/Kconfig 中对它的介绍是:

This will be the only option for some systems, including memory hotplug systems. This is normal. For many other systems, this will be an alternative to “Discontiguous Memory”. This option provides some potential performance benefits, along with decreased code complexity, but it is newer, and more experimental.

它主要是因为支持内存热插拔引入的。对于支持内存热插拔的系统,系统中的某一个部分内存可以随时被移除和添加,这就是得原本连续的内存空间变得稀疏。这个内存模型是把所有的内存空间划分成一个个 section,每个 section 都是同样大小的,可以进行热插拔的内存大小就是以 section 为单位的。在 x86_64 上面,每个 section 是 128M:

[c]

ifdef CONFIG_X86_32

ifdef CONFIG_X86_PAE

define SECTION_SIZE_BITS 29

define MAX_PHYSADDR_BITS 36

define MAX_PHYSMEM_BITS 36

else

define SECTION_SIZE_BITS 26

define MAX_PHYSADDR_BITS 32

define MAX_PHYSMEM_BITS 32

endif

else / CONFIG_X86_32 /

define SECTION_SIZE_BITS 27 / matt - 128 is convenient right now /

define MAX_PHYSADDR_BITS 44

define MAX_PHYSMEM_BITS 46

endif

[/c]

每个 section 有自己的 mem_map,所以其 __pfn_to_page 的定义如下:

[c]
/*

  • Note: section’s mem_map is encorded to reflect its start_pfn.
  • section[i].section_mem_map == mem_map’s address - start_pfn;
    */

    define __page_to_pfn(pg)

    ({ const struct page *__pg = (pg);
     int __sec = page_to_section(__pg);                      
     (unsigned long)(__pg - __section_mem_map_addr(__nr_to_section(__sec))); 
    
    })

define __pfn_to_page(pfn)

({ unsigned long pfn = (pfn);
struct mem_section *
sec = pfn_to_section(pfn);
section_mem_map_addr(sec) + __pfn;
})
[/c]

Sparse memory 还有两个变异:SPARSEMEM_EXTREME 和 SPARSEMEM_VMEMMAP,前者是对更稀疏的内存做了优化,采用了两层 memory section,即二维数组:

[c]

ifdef CONFIG_SPARSEMEM_EXTREME

define SECTIONS_PER_ROOT (PAGE_SIZE / sizeof (struct mem_section))

else

define SECTIONS_PER_ROOT 1

endif

define SECTION_NR_TO_ROOT(sec) ((sec) / SECTIONS_PER_ROOT)

define NR_SECTION_ROOTS DIV_ROUND_UP(NR_MEM_SECTIONS, SECTIONS_PER_ROOT)

define SECTION_ROOT_MASK (SECTIONS_PER_ROOT - 1)

ifdef CONFIG_SPARSEMEM_EXTREME

extern struct mem_section *mem_section[NR_SECTION_ROOTS];

else

extern struct mem_section mem_section[NR_SECTION_ROOTS][SECTIONS_PER_ROOT];

endif

static inline struct mem_section *__nr_to_section(unsigned long nr)
{
if (!mem_section[SECTION_NR_TO_ROOT(nr)])
return NULL;
return &mem_section[SECTION_NR_TO_ROOT(nr)][nr & SECTION_ROOT_MASK];
}
[/c]

后者是针对 x86_64 这种虚拟地址空间比较大的平台做了速度的优化,把 mem_map 里所有 struct page 的地址一一映射进 vmemmap 地址中去,这样它们的虚拟地址就是连续的了:

[c]

define VMEMMAP_START _AC(0xffffea0000000000, UL)

define vmemmap ((struct page *)VMEMMAP_START)

/ memmap is virtually contiguous. /

define __pfn_to_page(pfn) (vmemmap + (pfn))

define __page_to_pfn(page) (unsigned long)((page) - vmemmap)

[/c]

不过额外的代价就是初始化的时候要对每一个 struct page 做 populate,参见 sparse_mem_map_populate() 函数。另外可参考内存热插拔代码中函数 add_section() 和 remove_section() 的实现。

treap 是一个很有意思的数据结构,从名字也能看得出来,它是 tree 和 heap 的混合产物。为什么会有这么一个数据结构还得从二叉树(BST)说起。

我们都知道,普通的二叉树是不平衡的,在某些情况下进行插入删除操作可以导致时间复杂度从 O(logN) 下降到 O(N)。为了解决平衡的问题,有很多新的二叉树被引入,比如大家熟知的一些:Linux 内核中 CFS 使用到的红黑树(Red-black tree),数据结构课上都会讲到的 AVL 树。这些树都因为要进行复杂的旋转操作而不容易实现。

那么有没有一个实现容易的平衡二叉树呢?Treap 就是一个。普通的二叉树节点只有 key,而 treap 又多了一个 priority,这里的 priority 就是 heap (优先队列)中的 priority。这样, treap 既可以利用二叉树的特性,又可以利用 heap 的特性了。

看它是怎么利用的,以 Perl 的 Tree::Treap 模块为例。

1) 对于搜索,使用二叉树的 key 即可,和普通二叉树没有区别:

[perl]
sub _get_node {
my $self = shift;
my $key = shift;
while(!$self->_is_empty() and $self->ne($key)){
$self = $self->{$self->lt($key)?”left”:”right”}
}
return $self->_is_empty() ? 0 : $self;
}
[/perl]

2) 插入一个新的节点 key=x 时,随机一个整数值 y 作为 priority,利用二叉树搜索 x,在它应该出现的位置创建一个新的节点,只要 x 不是根节点而且优先级高于它的父节点,那么旋转这个节点使其和其父节点交换位置。

[perl]
sub insert {
my $self = shift;
my $key = shift;
my $data = shift;
$data = defined($data)? $data : $key;
my $priority = shift() || rand();

if($self->_is_empty()) {
    $self->{priority} = $priority,
    $self->{key}      = $key;
    $self->{data}     = $data;
    $self->{left}     = $self->new($self->{cmp});
    $self->{right}    = $self->new($self->{cmp});
    return $self;
}

if($self->gt($key)){
    $self->{right}->insert($key,$data,$priority);
    if($self->{right}->{priority} > $self->{priority}){
        $self->_rotate_left();
    }
}elsif($self->lt($key)){
    $self->{left}->insert($key,$data,$priority);
    if($self->{left}->{priority} > $self->{priority}){
        $self->_rotate_right();
    }

}else{
    $self->_delete_node();
    $self->insert($key,$data,$priority);
}
return $self;

}
[/perl]

从代码可以看出,旋转就是为了保持 heap 的属性,优先级高的节点在上层。

3) 删除一个节点相对比较麻烦:如果要删除的节点 x 是一个叶子,直接删掉即可;如果 x 有一个孩子节点 z,把 x 删掉,然后把 z 作为 x 父亲的孩子;如果 x 有两个孩子节点,则把 x 和它的下一个节点(successor)交换,然后进行相应的旋转。实现是递归实现的,很容易理解。注意:这里实现删除时并没有进行实质的删除操作,而只是把优先级设为了最低的 -100,这也使得代码变得比上面的理论更简单。

[perl]
sub delete {
my $self = shift;
my $key = shift;
return 0 unless $self = $self->_get_node($key);
$self->_delete_node();
}

sub _delete_node {
my $self = shift;
if($self->_is_leaf()) {
%$self = (priority => -100, cmp => $self->{cmp});
} elsif ($self->{left}->{priority} > $self->{right}->{priority}) {
$self->_rotate_right();
$self->{right}->_delete_node();
} else {
$self->_rotate_left();
$self->{left}->_delete_node();
}
}
[/perl]

这里的两个旋转操作很容易实现:

[perl]
sub _clone_node {
my $self = shift;
my $other = shift;
%$self = %$other;
}

sub _rotate_left {
my $self = shift;
my $tmp = $self->new($self->{cmp});
$tmp->_clone_node($self);
$self->_clone_node($self->{right});
$tmp->{right} = $self->{left};
$self->{left} = $tmp;

}

sub _rotate_right {
my $self = shift;
my $tmp = $self->new($self->{cmp});
$tmp->_clone_node($self);
$self->_clone_node($self->{left});
$tmp->{left} = $self->{right};
$self->{right} = $tmp;
}
[/perl]

或者借助于这个图来理解右旋:

      B            A
     /           / 
    A   2   -->  0   B
   /               / 
  0   1            1   2

参考:
1. http://acs.lbl.gov/~aragon/treaps.html
2. http://www.perlmonks.org/?node_id=289584
3. C 语言实现:http://www.canonware.com/trp/
4. Python 实现:http://stromberg.dnsalias.org/~strombrg/python-tree-and-heap-comparison/

我们知道,在KVM里测试内核会碰到一个很严重的问题:那就是在 host 编译的内核不能直接在 guest 里使用。有两个原因:一是 host 和 guest 的硬件可能不一样,所以需要的 config 不一样;二是内核模块即便是安装进了 initramfs 也仍有很多需要安装到 /lib/modules/uname -r

有三种解决办法:

1. host 和 guest 都使用发行版自带的 config,这个 config 是可以在很多机器上运行,所以基本上一定可以在你的guest 里使用,然后用 guestfish 把编译的内核模块拷贝进 guest;这么做的好处是 host 和 guest 都使用同一个 config;坏处也显然,需要进行太多host => guest 拷贝操作。

2. 在 1) 的基础上进行改进,把 host 作为 build server,为 guest 提供 PXE,NFS 服务,这样 guest 里就可以直接使用编译好的内核以及内核模块。这个方法的缺点是,需要一些系统管理技巧,我暂时还没有时间折腾;优点是,如果你的 build server 搭建得比较好,你甚至可以给其它机器(非虚拟)提供服务。

3. guest 使用和 host 不同的 config,guest 的 config 是经过专门定制的,只编译本机器所需要的内核特性和模块,并且把所有的模块都编译进内核(builtin),这样我们甚至不需要 initramfs!优点是,config 很少,即使重新编译花费时间也很少;缺点也很显然,config 很难用于别的机器,尤其是非虚拟化的机器。需要特别注意:如果你的系统用了 LVM 那么它将无法使用!因为内核无法自己检测 LVM!另外,编译进内核的模块和不像单独加载的模块那样灵活,比如 netconsole 模块,我们通常是通过模块参数来指定其功能,而一旦变成 builtin 我们就不得不通过 configfs 来进行操作了,也就是说测试脚本就得重写了。

为了定制上述 3) 中专门的 config,我花了不少时间把我的 config 精简到最少,但同时又包含了 KVM 里需要功能以及我调试用到的选项,我把最终的 config 放到了 github 上进行管理:

https://github.com/congwang/kernelconfig/blob/master/kvm-mini-config

我的目标是让它可以在尽可能多的 KVM guest 上可以运行(显然不可能保证所有),同时尽量不破坏用户空间的程序比如 systemd(取决于发行版),当然还要保持编译出的内核可能的小。欢迎帮忙测试。:-)

下一步是尝试实现第2种方法,我们需要搭建一个集编译内核、PXE服务器、NFS服务器于一身的服务器,准备弄一个新的 github 项目去搞一下。

当然,如果你有其它更好的主意,希望不吝赐教!

附我常用的一些内核调试选项:

一定要有的:
CONFIG_DEBUG_KERNEL=y

内存相关:
CONFIG_SLUB_DEBUG=y
CONFIG_DEBUG_VM=y
CONFIG_DEBUG_LIST=y

内核加锁相关:
CONFIG_DEBUG_ATOMIC_SLEEP=y
CONFIG_LOCKDEP=y
CONFIG_DEBUG_SPINLOCK=y
CONFIG_DEBUG_MUTEXES=y
CONFIG_DEBUG_LOCK_ALLOC=y
CONFIG_PROVE_LOCKING=y
CONFIG_PROVE_RCU=y

ftrace 相关:
CONFIG_FTRACE=y
CONFIG_FUNCTION_TRACER=y

锁死检测:
CONFIG_LOCKUP_DETECTOR=y
CONFIG_HARDLOCKUP_DETECTOR=y
CONFIG_BOOTPARAM_HARDLOCKUP_PANIC=y
CONFIG_BOOTPARAM_HARDLOCKUP_PANIC_VALUE=1
CONFIG_BOOTPARAM_SOFTLOCKUP_PANIC=y
CONFIG_BOOTPARAM_SOFTLOCKUP_PANIC_VALUE=1
CONFIG_DETECT_HUNG_TASK=y
CONFIG_DEFAULT_HUNG_TASK_TIMEOUT=300

kdump 相关:
CONFIG_KEXEC=y
CONFIG_CRASH_DUMP=y
CONFIG_PHYSICAL_START=0x1000000
CONFIG_RELOCATABLE=y

netconsole 相关:
CONFIG_NETCONSOLE=y
CONFIG_NETCONSOLE_DYNAMIC=y
CONFIG_NETPOLL=y

kprobe,jump label 相关:
CONFIG_KPROBES=y
CONFIG_JUMP_LABEL=y

perfbook 中对 NBS 这一节的描述尚未完成。我来简单补充一下。

Non-Blocking 其实很简单,它和 Blocking 相反,也就是非阻塞的意思。阻塞大家都知道,典型的 mutex,semaphore 就是阻塞型的锁,如果其它线程持有锁,该线程会进入阻塞状态,直到它可以获取锁。

非阻塞型的锁就是竞争时不会阻塞的锁,最典型的就是家喻户晓的自旋锁了,在竞争的情况下它会一直自旋,即忙等(busy wait),而不是阻塞。所以 spin lock 是 NBS 的一种。

NBS 的另一类比较高级的形式是 lock-free,从名字上也很容易看出,就是无锁。判断 lock-free 的一个标准是:如果正在进行操作的线程休眠了,别的线程依旧可以进来照常操作。这样就直接把 spin lock 给排除在外了。lock-free 通常使用的是 RMW (read-modify-write)原语,由硬件来支持,它最常见的一种实现是 CAS,即 Compare And Swap,在 x86 上对应的是 cmpxchg 指令。Linux 内核中把 cmpxchg 给封装成了一个通用的函数。第一次看到这个函数感觉有些别扭,估计是出于性能的考虑最新的内核代码中的实现还不如之前的实现容易让人读懂,下面是 2.6.32 中它 generic 的实现:

[c]
static inline unsigned long __cmpxchg(volatile unsigned long *m,
unsigned long old, unsigned long new)
{
unsigned long retval;
unsigned long flags;

    local_irq_save(flags);
    retval = *m;
    if (retval == old)
            *m = new;
    local_irq_restore(flags);
    return retval;

}
[/c]

比较新的 gcc 中有和它对应的一个内置原子函数 __sync_bool_compare_and_swap()。cmpxchg 一个典型的用例是 lockless list 的实现,见其插入操作的实现:

[c]
static inline bool llist_add(struct llist_node new, struct llist_head head)
{
struct llist_node entry, old_entry;

    entry = head->first;
    for (;;) {
            old_entry = entry;
            new->next = entry;
            entry = cmpxchg(&head->first, old_entry, new);
            if (entry == old_entry)
                    break;
    }

    return old_entry == NULL;

}
[/c]

我们可以看出,它虽然是 lock-free 的,但是避免不了等待(wait),因为这种 cmpxchg 一般都套在一个循环中直到操作成功才会退出,不成功就会一直尝试。不运行的话你根本不会知道它到底会尝试多少次才能成功。

另外一种更加高级的 NBS 就是 wait-free 了,这个可以说是大家梦寐以求的东西,它不阻塞,不用忙等,连等待都不用,也可以避免竞争!减少了很多 CPU 的浪费!显然 wait-free 一定是 lock-free 的,它们之间最大的区别是:等待的次数是否有上限。Wait-free 要求每个线程都可以在有限的步骤之内完成操作,不管其它线程如何!无奈这个太难实现,而且一般都比较复杂,即使实现了它的 overhead 也未必就比等价的 lock-free 的小。网上有很多相关的资料,你可以自行搜索一下。

参考资料:

1. http://julien.benoist.name/lockfree.html
2. http://audidude.com/?p=363
3. http://burtleburtle.net/bob/hash/lockfree.html
4. http://en.wikipedia.org/wiki/Non-blocking_synchronization
5. http://en.wikipedia.org/wiki/Lock-free_and_wait-free_algorithms

无意间看了一下 Python heapq 模块的源代码,发现里面的源代码写得很棒,忍不住拿出来和大家分享一下。:)

heapq 的意思是 heap queue,也就是基于 heap 的 priority queue。说到 priority queue,忍不住吐槽几句。我上学的时候学优先级队列的时候就没有碰到像 wikipedia 上那样透彻的解释,priority queue 并不是具体的某一个数据结构,而是对一类数据结构的概括!比如栈就可以看作是后进入的优先级总是大于先进入的,而队列就可以看成是先进入的优先级一定高于后进来的!这还没完!如果我们是用一个无序的数组实现一个priority queue,对它进行出队操作尼玛不就是选择排序么!尼玛用有序数组实现入队操作尼玛不就是插入排序么!尼玛用堆实现(即本文要介绍的)就是堆排序啊!用二叉树实现就是二叉树排序啊!数据结构课学的这些东西基本上都出来了啊!T_T

heapq 的代码也是用 Python 写的,用到了一些其它 Python 模块,如果你对 itertools 不熟悉,在阅读下面的代码之前请先读文档。heapq 模块主要有5个函数:heappush(),把一个元素放入堆中;heappop(),从堆中取出一个元素;heapify(),把一个列表变成一个堆;nlargest() 和 nsmallest() 分别提供列表中最大或最小的N个元素。

先从简单的看起:

[python]
def heappush(heap, item):
“””Push item onto heap, maintaining the heap invariant.”””
heap.append(item)
_siftdown(heap, 0, len(heap)-1)

def heappop(heap):
“””Pop the smallest item off the heap, maintaining the heap invariant.”””
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup(heap, 0)
else:
returnitem = lastelt
return returnitem
[/python]

从源代码我们不难看出,这个堆也是用数组实现的,而且是最小堆,即 a[k] <= a[2*k+1] and a[k] startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if cmp_lt(newitem, parent):
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem

def _siftup(heap, pos):
endpos = len(heap)
startpos = pos
newitem = heap[pos]

# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1    # leftmost child position
while childpos &lt; endpos:
    # Set childpos to index of smaller child.
    rightpos = childpos + 1
    if rightpos &lt; endpos and not cmp_lt(heap[childpos], heap[rightpos]):
        childpos = rightpos
    # Move the smaller child up.
    heap[pos] = heap[childpos]
    pos = childpos
    childpos = 2*pos + 1
# The leaf at pos is empty now.  Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)

[/python]

上面的代码加上注释很容易理解,不是吗?在此基础上实现 heapify() 就很容易了:

[python]
def heapify(x):
"""Transform list into a heap, in-place, in O(len(x)) time."""
n = len(x)

# Transform bottom-up.  The largest index there&#039;s any point to looking at
# is the largest with a child index in-range, so must have 2*i + 1 &lt; n,
# or i &lt; (n-1)/2\.  If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1\.  If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that&#039;s again n//2-1.
for i in reversed(xrange(n//2)):
    _siftup(x, i)

[/python]
这里用了一个技巧,正如注释中所说,其实只要 siftup 后面一半就可以了,前面的一半自然就是一个heap了。heappushpop() 也可以用它来实现了,而且用不着调用 heappush()+heappop():

[python]
def heappushpop(heap, item):
"""Fast version of a heappush followed by a heappop."""
if heap and cmp_lt(heap[0], item):
item, heap[0] = heap[0], item
_siftup(heap, 0)
return item
[/python]

第一眼看到这个函数可能觉得它放进去一个再取出来一个有什么意思嘛!仔细想想它很有用,尤其是在后面实现 nlargest() 的时候:

[python]
def nlargest(n, iterable):
"""Find the n largest elements in a dataset.

Equivalent to:  sorted(iterable, reverse=True)[:n]
&quot;&quot;&quot;
if n &lt; 0:
    return []
it = iter(iterable)
result = list(islice(it, n))
if not result:
    return result
heapify(result)
_heappushpop = heappushpop
for elem in it:
    _heappushpop(result, elem)
result.sort(reverse=True)
return result

[/python]

先从 list 中取出 N 个元素来,然后把这个 list 转化成 heap,把原先的 list 中的所有元素在此 heap 上进行 heappushpop() 操作,最后剩下的一定是最大的!因为你每次 push 进去的不一定是最大的,但你 pop 出来的一定是最小的啊!

但 nsmallest() 的实现就截然不同了:

[python]
def nsmallest(n, iterable):
"""Find the n smallest elements in a dataset.

Equivalent to:  sorted(iterable)[:n]
&quot;&quot;&quot;
if n &lt; 0:
    return []
if hasattr(iterable, &#039;__len__&#039;) and n * 10  Largest of the nsmallest
    for elem in it:
        if cmp_lt(elem, los):
            insort(result, elem)
            pop()
            los = result[-1]
    return result
# An alternative approach manifests the whole iterable in memory but
# saves comparisons by heapifying all at once.  Also, saves time
# over bisect.insort() which has O(n) data movement time for every
# insertion.  Finding the n smallest of an m length iterable requires
#    O(m) + O(n log m) comparisons.
h = list(iterable)
heapify(h)
return map(heappop, repeat(h, min(n, len(h))))

[/python]

这里做了一个优化,如果 N 小于 list 长度的1/10的话,就直接用插入排序(因为之前就是有序的,所以很快)。如果 N 比较大的话,就用 heap 来实现了,把整个 list 作成一个堆,然后 heappop() 出来的前 N 个就是最后的结果了!不得不说最后一行代码写得太精炼了!

偶然看 Python 代码,看到这么一个叫memoize decorator 的东西,你会发现它非常有用。

看下面的例子,我们最常见的 fibonacci 数列的递归实现。

~% python
>>> def fib(n):
...     if n in (0, 1):
...             return n
...     else:
...             return fib(n-1)+fib(n-2)
...
>>> import timeit
>>> timeit.timeit("fib(5)", "from __main__ import fib")
3.946546792984009
>>> timeit.timeit("fib(7)", "from __main__ import fib")
10.944302082061768

很慢,不是么?你当然可以把它的实现改成迭代,但是,如果前提是我们不对fib()函数本身做任何代码修改的话,我们还有别的方法去改进它。

我们注意到,fib(n-1) 和 fib(n-2) 实际上重复计算了很多结果,根本没必要,一个自然而然的想法就是把这些结果缓存起来,于是就有了下面的:

>>> def memoize(f):
...     memo = {}
...     def wrapper(x):
...         if x not in memo:
...             memo[x] = f(x)
...         return memo[x]
...     return wrapper
...
>>>
>>> def fib(n):
...     if n in (0, 1):
...         return n
...     else:
...         return fib(n-1) + fib(n-2)
...
>>> fib = memoize(fib)
>>> import timeit
>>> timeit.timeit("fib(7)", "from __main__ import fib")
0.35535502433776855

还不够,Python 还有自己的解决办法,那就是用 decorator

[python]
def memoize(function):
memo = {}
def wrapper(args):
if args in memo:
return memo[args]
else:
rv = function(
args)
memo[args] = rv
return rv
return wrapper

@memoize
def fib(n):
if n in (0, 1):
return n
return fib(n-1) + fib(n-2)
[/python]

为此,在 Python3 中还专门引入了一个 lru_cache

这个东西的牛逼之处在于,有了它你可以放心地去写递归的代码,不用为了效率去把它改成迭代形式,你也知道,有些时候递归代码比迭代代码更容易理解。这样可读性有了,效率也提升了,不用修改函数的实现,直接让它从次方级降到了线性级。

我以前一直以为 Linux 内核之所以不允许在中断中休眠是因为中断上下文中无法获取 thread_info,task_struct 等进程相关的结构体,从而无法调度。

今天又重新看了一下相关的代码,发现实际上不是。在最新的代码中,x86 32 使用的 irq stack 中也保存了thread_info,我们可以看 execute_on_irq_stack() 的定义:

[c]
union irqctx {
struct threadinfo tinfo;
u32 stack[THREAD_SIZE/sizeof(u32)];
} __attribute
((aligned(THREAD_SIZE)));

static inline int
execute_on_irq_stack(int overflow, struct irq_desc desc, int irq)
{
union irq_ctx
curctx, irqctx;
u32
isp, arg1, arg2;

    curctx = (union irq_ctx *) current_thread_info();
    irqctx = __this_cpu_read(hardirq_ctx);

    /*
     * this is where we switch to the IRQ stack. However, if we are
     * already using the IRQ stack (because we interrupted a hardirq
     * handler) we can't do that and just have to keep using the
     * current stack (which is the irq stack already after all)
     */
    if (unlikely(curctx == irqctx))
            return 0;

    /* build the stack frame on the IRQ stack */
    isp = (u32 *) ((char *)irqctx + sizeof(*irqctx));
    irqctx-&gt;tinfo.task = curctx-&gt;tinfo.task;
    irqctx-&gt;tinfo.previous_esp = current_stack_pointer;

    /* Copy the preempt_count so that the [soft]irq checks work. */
    irqctx-&gt;tinfo.preempt_count = curctx-&gt;tinfo.preempt_count;

    if (unlikely(overflow))
            call_on_stack(print_stack_overflow, isp);

    asm volatile("xchgl     %%ebx,%%esp     n"
                 "call      *%%edi          n"
                 "movl      %%ebx,%%esp     n"
                 : "=a" (arg1), "=d" (arg2), "=b" (isp)
                 :  "0" (irq),   "1" (desc),  "2" (isp),
                    "D" (desc-&gt;handle_irq)
                 : "memory", "cc", "ecx");
    return 1;

}
[/c]

(注:x86 都已经使用 irq stack 了,和进程的内核栈独立开了,这样即使 irq 处理函数中占用了很多栈也不会影响外面的了。)

所以这不是问题。也就是说,技术上我们完全可以做到在中断中休眠。Linux 内核之所以没有这么做是出于设计上的原因。

如果这还不足以说明问题的话,还有个例子:do_page_fault() 也是在中断上下文中调用的(注:此处更严格的讲是 trap,而非 interrupt,但无论哪个都肯定不是普通的进程上下文,相对而言还算是中断上下文),但是它是可能休眠的,至少它明显调用了可能休眠的函数 down_read() 。

为什么要这么设计呢?因为在 Linux 内核中能让 do_page_fault() 休眠的地方基本上只有copy_from_user(),copy_to_user()(其它地方触发 page fault 会导致 oops),它们在使用用户空间的页面时可能会因为对应的页面不在物理内存中而触发 swap 等,即 handle_mm_fault() 所做的。这种情况下休眠是合理的,因为调用 copy_from_user() 的进程本身就是需要等待这个资源。休眠不就是为了等待资源吗?

为什么其它中断(注:此处是指 IRQ,或者严格意义上的 interrupt,非 trap)不能休眠?假设 CPU 上正在执行的是某个高优先级的进程 A,它本身没有使用任何网络通信,突然网卡中断来了,它被中断而且被休眠了,那么,这看起来就像是进程 A 本身在等待某网络资源,而实际上根本不是。

换句话说,如果在 IRQ 中休眠,被打断的当前进程很可能不是需要等待的进程,更进一步讲,在中断中我们根本无法知道到底是哪个进程需要休眠。更何况,这里还没有考虑到在某个中断中休眠会不会影响下一个中断之类的更复杂的问题。由此可见,如果真允许在中断中休眠的话,那么设计将会是很复杂的。

因此,这完全是一个设计的问题。中断中休眠技术上可实现,但设计上不好。

另外,Linux 内核中很多会休眠的函数会调用 might_sleep(),它是用来检测是否在中断上下文中,如果是就是触发一个警告 “BUG: sleeping function called from invalid context”。可是为什么 do_page_fault() 中的函数没有呢?这是因为普通的 IRQ 在进入处理函数之前都会调用 irq_enter(),它里面改变了进程的 preempt 的值:

[c]

define __irq_enter()

    do {                                            
            account_system_vtime(current);          
            add_preempt_count(HARDIRQ_OFFSET);      
            trace_hardirq_enter();                  
    } while (0)

void irq_enter(void)
{
int cpu = smp_processor_id();

    rcu_irq_enter();
    if (is_idle_task(current) &amp;&amp; !in_interrupt()) {
            /*
             * Prevent raise_softirq from needlessly waking up ksoftirqd
             * here, as softirq will be serviced on return from interrupt.
             */
            local_bh_disable();
            tick_check_idle(cpu);
            _local_bh_enable();
    }

    __irq_enter();

}
[/c]

而 page fault 的话不会经过这个流程。这也从另一面证明了这是个设计的问题。

顺便说一句,比较新的 Linux 内核中已经支持中断线程化了:

[c]
int request_threaded_irq(unsigned int irq, irq_handler_t handler,
irq_handler_t thread_fn, unsigned long irqflags,
const char devname, void dev_id)
[/c]

所以一部分中断处理可以在进程上下文中完成了。

这是我今年去日本参加 LinuxCon 要讲的,主要是最近在做的一些和 dracut 相关的东西。

Slides 可以在这里下载:dracut.pdf

另外,今年日本 LinuxCon 的会议日程在这里。今年没有 Linus 大神了,他去年去参加果然是因为 Linux 20周年的原因。

抱怨一句:好羡慕 akong 他们部门,公司赞助参加 KVM Forum,光去听就可以。不像我这样,没有要讲的东西连去都去不了;就算有东西要讲,公司也不赞助,费用都是回来找 Linux Foudation 包销;开会三天还不能算工作日……唉,同一个公司的人出去开会差距怎么这么大呢……

Floyd’s cycle-finding algorithm 是一个经典的算法,用来检测一个链表中是否有环这种问题。因为它使用了两个指针,快的指针被称为兔子,慢的指针被称为乌龟,所以这个算法又被称为“龟兔算法”。

Emacs 源代码中有它的一个完美实现,我们可以看 src/eval.c 中的 signal_error()。

[c]
/ Signal `error’ with message S, and additional arg ARG.
If ARG is not a genuine list, make it a one-element list.
/

void
signal_error (s, arg)
char *s;
Lisp_Object arg;
{
Lisp_Object tortoise, hare;

hare = tortoise = arg;
while (CONSP (hare))
{
hare = XCDR (hare);
if (!CONSP (hare))
break;

  hare = XCDR (hare);
  tortoise = XCDR (tortoise);

  if (EQ (hare, tortoise))
    break;
}

if (!NILP (hare))
arg = Fcons (arg, Qnil); / Make it a list. /

xsignal (Qerror, Fcons (build_string (s), arg));
}
[/c]

上面有一些 lisp “词汇” 可能你看不懂,翻译成你看得懂的代码如下。

[c]

bool
list_has_cycle(NODE list)
{
NODE
tortoise, *hare;

hare = tortoise = list;
while (hare)
{
hare = hare->next;
if (!hare)
break;

  hare = hare-&gt;next;
  tortoise = tortoise-&gt;next;

  if (hare == tortoise)
    break;
}

if (hare)
return true;

return false;
}
[/c]

整数溢出大家都不陌生,可能陌生的是 gcc 对带符号整数溢出的处理。

先看标准怎么说,对于无符号整数,标准如此描述:

A computation involving unsigned operands can never overflow,
because a result that cannot be represented by the resulting unsigned
integer type is reduced modulo the number that is one greater than the
largest value that can be represented by the resulting type.

换句话说,无符号整数的溢出总是 wrap 回去的,但依旧在无符号整数的表示范围内。

对于带符号整数的溢出,标准规定是未定义行为:

If an exceptional condition occurs during the evaluation of an expression (that is, if the
result is not mathematically defined or not in the range of representable values for its
type), the behavior is undefined.

实际上(使用二进制补码),如果溢出的话,两个正数结果会变成负数,超出了正数本身的范围。wikipedia 中如此说:

Most computers distinguish between two kinds of overflow conditions. A carry occurs when the result of an addition or subtraction, considering the operands and result as unsigned numbers, does not fit in the result. Therefore, it is useful to check the carry flag after adding or subtracting numbers that are interpreted as unsigned values. An overflow proper occurs when the result does not have the sign that one would predict from the signs of the operands (e.g. a negative result when adding two positive numbers). Therefore, it is useful to check the overflow flag after adding or subtracting numbers that are represented in two’s complement form (i.e. they are considered signed numbers).

正是因为“未定义”,所以 gcc 会大胆地做出一些可能令你吃惊的优化。看下面的例子:

[c]

include

include

int wrap(int a) {
return (a + 1 > a);
}

int main(void) {
printf(“%sn”, wrap(INT_MAX) ? “no wrap” : “wrapped”);
}
[/c]

很好理解,可是 gcc 完全可以如此生成 wrap() 函数的代码:

00000000004004f0 :
4004f0: b8 01 00 00 00 mov $0x1,%eax
4004f5: c3 retq

因为 gcc 假设了带符号的整数从来不会溢出,所以”a + 1 > a” 总会是真,所以直接返回1!这么做完全符合标准,但不符合我们的期待。我们期望 wrap() 函数能够检测是否会溢出。

为此,gcc 引入了几个相关的命令行选项:-fwrapv,-fstrict-overflow/-fno-strict-overflow,-Wstrict-overflow。简单地说,-fstrict-overflow 就是告诉编译器,带符号整数溢出是未定义的,你可以假设它不会发生,从而继续做优化。而 -fwrapv 是说,带符号整数的溢出是定义好的,就是 wrap,你按照这个定义来编译。gcc 文档中提到:

Using -fwrapv means that integer signed overflow is fully defined: it wraps. When -fwrapv is used, there is no difference between -fstrict-overflow and -fno-strict-overflow for integers. With -fwrapv certain types of overflow are permitted. For example, if the compiler gets an overflow when doing arithmetic on constants, the overflowed value can still be used with fwrapv, but not otherwise.

我们加上 -fno-strict-overflow 之后再去编译上面的代码,结果明显不同:

00000000004004f0 :
4004f0: 8d 47 01 lea 0x1(%rdi),%eax
4004f3: 39 f8 cmp %edi,%eax
4004f5: 0f 9f c0 setg %al
4004f8: 0f b6 c0 movzbl %al,%eax
4004fb: c3 retq

而对于使用二进制补码的机器来说,-fwrapv 和 -fno-strict-overflow 只有细微的区别: -fno-strict-overflow 只是说不去优化,而-fwrapv 明确地定义了溢出的行为。

Linux 内核中曾经出现一个相关的 bug,是 Linus 大神出手搞定了,他说道:

It looks like ‘fwrapv’ generates more temporaries (possibly for the code
that treies to enforce the exact twos-complement behavior) that then all
get optimized back out again. The differences seem to be in the temporary
variable numbers etc, not in the actual code.

So fwrapv really is different from fno-strict-pverflow, and disturbs the
code generation more.

IOW, I’m convinced we should never use fwrapv. It’s clearly a buggy piece
of sh*t, as shown by our 4.1.x experiences. We should use
-fno-strict-overflow.

所以编译 Linux 内核时使用的是 -fno-strict-overflow。