Programming

libnl 项目需要你的帮助……

libnl 是用户空间对 netlink 包装的一个库,有了它使用 netlink 变得很方便了。你熟悉的 NetworkManager 就使用了它。它的官方网站是:http://www.infradead.org/~tgr/libnl/

对比内核已经实现的 TC classifier 和 TC action,你会发现 libnl 实现了不到其中一半,所以还有不少东西需要做。如果非要我写一个 list 的话,下面几个是优先级比较高的:

1) 添加 skbedit action 的支持;
2) 添加 cgroup classifier 的支持;
3) 添加 tun/tap 设备的支持;
4) 添加 gre tunnel 设备的支持。

通过 libnl 你可以深入了解 netlink,以及内核中的这些 netlink 接口设计,当然了还有 traffic control 的一些东西。感兴趣的话可以联系我:xiyou.wangcong <AT> gmail.com。如果你能直接去贡献补丁,那当然是再好不过了。:)

为什么计数应该从零开始?

众所周知,C语言数组下标是从0开始,其它很多语言皆如此。而 FORTRAN 则是数组下标从1开始的典范。所以就有数组下标是从1开始好还是从0开始好之争。连《C专家编程》中都如此调侃:

数组的下标应该是从0还是从1开始?我提议的妥协方案是0.5,可惜他们未予认真考虑便一口回绝。—— Stan Kelly-Bootle
仔细思考一下这个问题很有意思,建议你不妨自己思考一下再继续往下看。

其实这个问题早在 1982 年就已经由计算机科学领域的大师 Edsger Dijkstra 研究过并得出了一个结论。他手写了一篇名为《Why numbering should start at zero》的论文,关键部分截图和大体翻译如下:

表示一个自然数子序列,比如 2, 3, …, 12,不用中间那三个点,有四种方式可供我们选择:

a) 2<= i < 13

b) 1 < i <= 12

c) 2<= i <= 12

d) 1 < i < 13

有什么道理选其中一种而不选别的吗?是的,的确有。观察到 a) 和 b) 的优势是两边的边界值之差正好是子序列的长度。作为一个推论,下面的观察也成立:在它们两个当中,两个子序列是邻接的就意味着一个的上界等于另外一个的下界。这些观察并没有让我们从a) 和 b) 之中做出选择,所以我们从头开始。

一定存在一个最小的自然数。排除掉下界——就像 b) 和 d) 那样——就会迫使一个从最小的自然数开始的子序列的下界进入非自然数领域。这很难看,所以对于下界我们更喜欢<=,就像 a) 和 c) 那样。现在,考虑一下从最小的自然数开始的子序列:包含上界就会迫后者不那么像自然数(译者注:作者的意思是自然数是一个有下界没有上界的集合),当序列缩小成空序列时。这很难看,所以对于上界我们更喜欢<,就像 a) 和 d) 那样。我们得出结论, a) 是我们最喜欢的。

当处理长度为 N 的序列时,我们期望通过下标来区分它的元素。下一个令人烦恼的问题就是,我们该给它的第一个元素赋予什么样下标值?坚持 a) 的方式,当下标从1开始时,下标范围为 1<= i < N+1;当下标从0开始时则是更好看的 0<= i < N。所以,让我们的序数从0开始:一个元素的序数(下标)等于序列中在它前面的元素个数。这个故事的寓意是我们最好尊重0最为一个最自然的数——过了这么多个世纪!


顺便说一句,Python 选择了 a),所以 range(0,3) 返回的是序列是 0,1,2。

What every programmer should know...

What Every Computer Scientist Should Know About Floating-Point Arithmetic

What Every Programmer Should Know About Memory

What every programmer absolutely, positively needs to know about encodings and character sets to work with text

Latency Numbers Every Programmer Should Know

What every programmer should know about time

What Every Programmer Should Know About Races

What Every C Programmer Should Know About Undefined Behavior

What every data programmer needs to know about disks

发送 IGMP 包的 Perl 脚本

因为 Fedora 上的 scapy (2.2.0)还不支持 IGMP,所以我用 Perl 写了一个发送 IGMP 的脚本。最初代码来自这里,原脚本只能发送 IGMP query,我添加了 IGMP leave 和一些命令行选项。用法如下:

% ./igmp.pl -q 192.168.10.2 224.0.0.22
% ./igmp.pl -l 224.8.8.8 192.168.10.2 224.0.0.22

具体代码如下:

[perl]

! /usr/bin/perl -w

Adapted from : http://code.google.com/p/perl-igmp-querier/

use strict;
use POSIX;
use Socket qw(inet_pton AF_INET SOCK_RAW);
use Getopt::Long qw(:config no_auto_abbrev);
my $leave;
my $leave_addr;
my $query = 1;
my $help;

sub forgepkt {

my $src_host = shift;
my $dst_host = shift;

my $zero_cksum = 0;
my $igmp_proto = 2;
my $igmp_type = $query? ‘11’: ‘17’; #11 query, 17 leave#
my $igmp_mrt = ‘64’;
my $igmp_pay = $query? “” : $leave_addr; # 0 for query
my $igmp_chk = 0;
my $igmp_len = 0;

my ($igmp_pseudo) = pack(‘H2H2va4’,$igmp_type,$igmp_mrt,$igmp_chk,$igmp_pay);
$igmp_chk = &checksum($igmp_pseudo);
$igmp_pseudo = pack(‘H2H2va4’,$igmp_type,$igmp_mrt,$igmp_chk,$igmp_pay);
$igmp_len = length($igmp_pseudo);

my $ip_ver = 4;
my $ip_len = 6;
my $ip_ver_len = $ip_ver . $ip_len;
my $ip_tos = 00;
my ($ip_tot_len) = $igmp_len + 20 + 4;
my $ip_frag_id = 11243;
my $ip_frag_flag = “010”;
my $ip_frag_oset = “0000000000000”;
my $ip_fl_fr = $ip_frag_flag . $ip_frag_oset;
my $ip_ttl = 1;
my $ip_opts = ‘94040000’; # router alert

my ($head) = pack(‘H2H2nnB16C2n’,
$ip_ver_len,$ip_tos,$ip_tot_len,$ip_frag_id,
$ip_fl_fr,$ip_ttl,$igmp_proto);
my ($addresses) = pack(‘a4a4’,$src_host,$dst_host);
my ($pkt) = pack(‘aaH8a*’,$head,$addresses,$ip_opts,$igmp_pseudo);

return $pkt;
}

sub checksum {
my ($msg) = @_;
my ($len_msg,$num_short,$short,$chk);
$len_msg = length($msg);
$num_short = $len_msg / 2;
$chk = 0;
foreach $short (unpack(“S$num_short”, $msg)) {
$chk += $short;
}
$chk += unpack(“C”, substr($msg, $len_msg - 1, 1)) if $len_msg % 2;
$chk = ($chk >> 16) + ($chk & 0xffff);
return(~(($chk >> 16) + $chk) & 0xffff);
}

sub help {
my ($exitcode) = @_;

    print &lt; $leave,
'q|query'       =&gt; $query,
'h|help'        =&gt; $help,

) or help(0);

help(1) if ($#ARGV < 1);

if ($leave) {
$query = 0;
$leave_addr = inet_pton(AF_INET, $leave);
}

my $src = $ARGV[0];
my $dst = $ARGV[1]; # 224.0.0.22, igmp all-hosts

socket(RAW, AF_INET, SOCK_RAW, 255) || die $!;
setsockopt(RAW, 0, 1, 1);

my $src_host = (gethostbyname($src))[4];
my $dst_host = (gethostbyname($dst))[4];
my ($packet) = forgepkt($src_host,$dst_host);
my ($dest) = pack('Sna4x8', AF_INET, 0, $dst_host);

Send general query packet twice for reliability

send(RAW,$packet,0,$dest);
send(RAW,$packet,0,$dest);

[/perl]

C binary literals

众所周知,C 语言中只有十进制、十六进制和八进制的常数表示(literal),没有二进制的常数表示。有时候我们很想用二进制的常数,因为它设置了哪些位一目了然。当然了,十六进制和八进制在某种程度上也行,对于熟练的人或许一眼就能看出,但是有二进制的常数表示不是更好嘛?

这里有人就给出了一个解决方法,构造一个宏接受二进制的“常量”,这样我们就可以通过它来直接使用二进制常量了。其实代码很容易理解,见下:

[c]
/ Binary constant generator macro
By Tom Torfs - donated to the public domain
/

/ All macro’s evaluate to compile-time constants /

/ ** helper macros */

/ turn a numeric literal into a hex constant
(avoids problems with leading zeroes)
8-bit constants max value 0x11111111, always fits in unsigned long
/

define HEX__(n) 0x##n##LU

/ 8-bit conversion function /

define B8__(x) ((x&0x0000000FLU)?1:0)

    +((x&amp;0x000000F0LU)?2:0) 
    +((x&amp;0x00000F00LU)?4:0) 
    +((x&amp;0x0000F000LU)?8:0) 
    +((x&amp;0x000F0000LU)?16:0) 
    +((x&amp;0x00F00000LU)?32:0) 
    +((x&amp;0x0F000000LU)?64:0) 
    +((x&amp;0xF0000000LU)?128:0)

/ ** user macros */

/ for upto 8-bit binary constants /

define B8(d) ((unsigned char)B8(HEX(d)))

/ for upto 16-bit binary constants, MSB first /

define B16(dmsb,dlsb) (((unsigned short)B8(dmsb)<<8)

        + B8(dlsb))

/ for upto 32-bit binary constants, MSB first /

define B32(dmsb,db2,db3,dlsb) (((unsigned long)B8(dmsb)<<24)

            + ((unsigned long)B8(db2)&lt;&lt;16) 
            + ((unsigned long)B8(db3)&lt;&lt;8) 
            + B8(dlsb))

/ Sample usage:
B8(01010101) = 85
B16(10101010,01010101) = 43605
B32(10000000,11111111,10101010,01010101) = 2164238933
/

[/c]

把这段代码放到头文件中,使用的时候 #include 进去就行了。当然了,这最好还是编译器支持才行,幸运的是,gcc 从4.3 开始就已经支持二进制常数了!以0b或者0B为前缀即可,比如:0b01010101。

示例代码如下:

[c]

include

int main(void)
{
assert(B8(01010101) == 85);
assert(B8(01010101) == 0b01010101); //gcc extension!!
assert(B8(11111111) == 0xff);
assert(B8(00111111) == 077);
assert(B16(10101010,01010101) == 43605);
assert(B32(10000000,11111111,10101010,01010101) == 2164238933);
return 0;
}
[/c]

顺便说一句,在C++中你可以使用 std::bitset

Parallelism != Concurrency

Parallelism,中文一般翻译成“并行”;Concurrency,中文一般翻译成“并发”。这两个词往往被混淆使用,即使在英文资料中,中文翻译使得这种情况变得更糟糕。其实这两个词的含义是完全不同的,这里并不是在咬文嚼字,而是想说明它们本质上确实有不同。为了表达方便,下文一律使用英文中的原词。

Wikipedia 上对 Concurrency 的定义如下:

Concurrent computing is a form of computing in which programs are designed as collections of interacting computational processes that may be executed in parallel. Concurrent programs (processes or threads) can be executed on a single processor by interleaving the execution steps of each in a time-slicing way, or can be executed in parallel by assigning each computational process to one of a set of processors that may be close or distributed across a network.
可见,concurrency 并不意味着一定是 parallelism,它是把多个互相作用的计算过程(注:上文第一句中的 processes 不是进程的意思!)组合起来的一种计算形式。为了完成一个大的任务,我们可以把它切割成几个独立的几个步骤,然后把每个步骤交给单独的线程去完成,那么在 UP 上 concurrency 看起来就像是这样:

Parallelism 的定义如下:

Parallel computing is a form of computation in which many calculations are carried out simultaneously, operating on the principle that large problems can often be divided into smaller ones, which are then solved concurrently (“in parallel”).
也就是多个计算同时执行,在 SMP 上如下图所示:

这里需要注意的是:广义上 parallelism 并不总是指 SMP,广义的 parallelism 也可以发生在指令上,比如支持 SIMD 的 CPU,像 Intel 的 SSE 指令;也可以发生在 CPU 内部的指令 pipeline 上。请结合上下文去理解。

考虑到现在流行的操作系统都是多任务的,那么 UP 也可以达到 SMP 的效果,对于用户空间的程序来说,多任务操作系统即使运行在 UP 上也是和 SMP 上一样的。因此下文中我们将不再区分 UP 和 SMP,并且假设操作系统就是多任务的,即所有多线程的程序都是并行运行的。

这时候,另一个问题随之而来:有 parallelism 的程序一定是 concurrent 的么?未必!根据上面 concurrency 的定义,不同的处理步骤之间需要是互相作用的(interacting),我们完全可以写一个多线程程序,线程之间没有任何互相的作用:比如我们要处理 [1 ~ 40000] 这个数组,我们可以启动4个线程,让它们分别处理 [1 ~ 10000]、[10000 ~ 20000]、[20000 ~ 30000]、[30000 ~ 40000],最后主线程把 4 个结果汇总成一个。这个多线程程序显然可以做到 parallelism,但它没有 concurrency!

有人这么认为,说 concurrency 是程序的属性;而 parallelism 程序执行时机器的属性。现在我们可以知道,这是不严格的。只有当 parallel 的程序的线程之间有了互相作用之后才会有 concurrency!concurrency 比 parallelism 要难得多。

Rob Pike 大牛是这么概括它们的区别的:

Concurrency is a way to structure a program by breaking it into pieces that can be executed independently. Communication is the means to coordinate the independent executions.
所以,concurrency 和 parallelism 的本质区别是,多个线程/进程(或程序的多个部分)之间有没有互相作用和交流。这种交流分两种:一种是共享内存,一种是消息传递。前者我们很熟悉,我们最常用的锁(Lock))或者信号量(Semaphore)就属于共享内存的一种。

另外,需要说明的是,并不是所有的 paraellel 的程序都使用多线程/进程,它们也可以使用 coroutine,使用 events。一言以蔽之,parallelism 是建立在同步(synchronous )代码的基础上,同步代码同时运行;而把一个程序分成几个同步的部分,它们之间允许进行 communication 就有 concurrency 了!

借用 Rob Pike 使用的图来说明一下就是:

Original Program

Parallel Program

Concurrent Program

从图中我们可以一目了然地看出两者之间并没有直接的关系。最后,在《Parallelism is not concurrency》一文中对此有更精辟的概括:

Concurrency is concerned with nondeterministic composition of programs (or their components). Parallelism is concerned with asymptotic efficiency of programs with deterministic behavior. Concurrency is all about managing the unmanageable[… ] Parallelism, on the other hand, is all about dependencies among the subcomputations of a deterministic computation.
参考资料:

1. http://stackoverflow.com/questions/1050222/concurrency-vs-parallelism-what-is-the-difference
2. http://existentialtype.wordpress.com/2011/03/17/parallelism-is-not-concurrency/
3. http://ghcmutterings.wordpress.com/2009/10/06/parallelism-concurrency/
4. http://stackoverflow.com/questions/1897993/difference-between-concurrent-programming-and-parallel-programming
5. http://concur.rspace.googlecode.com/hg/talk/concur.html#title-slide

treap

treap 是一个很有意思的数据结构,从名字也能看得出来,它是 tree 和 heap 的混合产物。为什么会有这么一个数据结构还得从二叉树(BST)说起。

我们都知道,普通的二叉树是不平衡的,在某些情况下进行插入删除操作可以导致时间复杂度从 O(logN) 下降到 O(N)。为了解决平衡的问题,有很多新的二叉树被引入,比如大家熟知的一些:Linux 内核中 CFS 使用到的红黑树(Red-black tree),数据结构课上都会讲到的 AVL 树。这些树都因为要进行复杂的旋转操作而不容易实现。

那么有没有一个实现容易的平衡二叉树呢?Treap 就是一个。普通的二叉树节点只有 key,而 treap 又多了一个 priority,这里的 priority 就是 heap (优先队列)中的 priority。这样, treap 既可以利用二叉树的特性,又可以利用 heap 的特性了。

看它是怎么利用的,以 Perl 的 Tree::Treap 模块为例。

1) 对于搜索,使用二叉树的 key 即可,和普通二叉树没有区别:

[perl]
sub _get_node {
my $self = shift;
my $key = shift;
while(!$self->_is_empty() and $self->ne($key)){
$self = $self->{$self->lt($key)?”left”:”right”}
}
return $self->_is_empty() ? 0 : $self;
}
[/perl]

2) 插入一个新的节点 key=x 时,随机一个整数值 y 作为 priority,利用二叉树搜索 x,在它应该出现的位置创建一个新的节点,只要 x 不是根节点而且优先级高于它的父节点,那么旋转这个节点使其和其父节点交换位置。

[perl]
sub insert {
my $self = shift;
my $key = shift;
my $data = shift;
$data = defined($data)? $data : $key;
my $priority = shift() || rand();

if($self-&gt;_is_empty()) {
    $self-&gt;{priority} = $priority,
    $self-&gt;{key}      = $key;
    $self-&gt;{data}     = $data;
    $self-&gt;{left}     = $self-&gt;new($self-&gt;{cmp});
    $self-&gt;{right}    = $self-&gt;new($self-&gt;{cmp});
    return $self;
}

if($self-&gt;gt($key)){
    $self-&gt;{right}-&gt;insert($key,$data,$priority);
    if($self-&gt;{right}-&gt;{priority} &gt; $self-&gt;{priority}){
        $self-&gt;_rotate_left();
    }
}elsif($self-&gt;lt($key)){
    $self-&gt;{left}-&gt;insert($key,$data,$priority);
    if($self-&gt;{left}-&gt;{priority} &gt; $self-&gt;{priority}){
        $self-&gt;_rotate_right();
    }

}else{
    $self-&gt;_delete_node();
    $self-&gt;insert($key,$data,$priority);
}
return $self;

}
[/perl]

从代码可以看出,旋转就是为了保持 heap 的属性,优先级高的节点在上层。

3) 删除一个节点相对比较麻烦:如果要删除的节点 x 是一个叶子,直接删掉即可;如果 x 有一个孩子节点 z,把 x 删掉,然后把 z 作为 x 父亲的孩子;如果 x 有两个孩子节点,则把 x 和它的下一个节点(successor)交换,然后进行相应的旋转。实现是递归实现的,很容易理解。注意:这里实现删除时并没有进行实质的删除操作,而只是把优先级设为了最低的 -100,这也使得代码变得比上面的理论更简单。

[perl]
sub delete {
my $self = shift;
my $key = shift;
return 0 unless $self = $self->_get_node($key);
$self->_delete_node();
}

sub _delete_node {
my $self = shift;
if($self->_is_leaf()) {
%$self = (priority => -100, cmp => $self->{cmp});
} elsif ($self->{left}->{priority} > $self->{right}->{priority}) {
$self->_rotate_right();
$self->{right}->_delete_node();
} else {
$self->_rotate_left();
$self->{left}->_delete_node();
}
}
[/perl]

这里的两个旋转操作很容易实现:

[perl]
sub _clone_node {
my $self = shift;
my $other = shift;
%$self = %$other;
}

sub _rotate_left {
my $self = shift;
my $tmp = $self->new($self->{cmp});
$tmp->_clone_node($self);
$self->_clone_node($self->{right});
$tmp->{right} = $self->{left};
$self->{left} = $tmp;

}

sub _rotate_right {
my $self = shift;
my $tmp = $self->new($self->{cmp});
$tmp->_clone_node($self);
$self->_clone_node($self->{left});
$tmp->{left} = $self->{right};
$self->{right} = $tmp;
}
[/perl]

或者借助于这个图来理解右旋:

      B            A
     /           / 
    A   2   -->  0   B
   /               / 
  0   1            1   2

参考:
1. http://acs.lbl.gov/~aragon/treaps.html
2. http://www.perlmonks.org/?node_id=289584
3. C 语言实现:http://www.canonware.com/trp/
4. Python 实现:http://stromberg.dnsalias.org/~strombrg/python-tree-and-heap-comparison/

perfbook 读书笔记:Non-Blocking Synchronization

perfbook 中对 NBS 这一节的描述尚未完成。我来简单补充一下。

Non-Blocking 其实很简单,它和 Blocking 相反,也就是非阻塞的意思。阻塞大家都知道,典型的 mutex,semaphore 就是阻塞型的锁,如果其它线程持有锁,该线程会进入阻塞状态,直到它可以获取锁。

非阻塞型的锁就是竞争时不会阻塞的锁,最典型的就是家喻户晓的自旋锁了,在竞争的情况下它会一直自旋,即忙等(busy wait),而不是阻塞。所以 spin lock 是 NBS 的一种。

NBS 的另一类比较高级的形式是 lock-free,从名字上也很容易看出,就是无锁。判断 lock-free 的一个标准是:如果正在进行操作的线程休眠了,别的线程依旧可以进来照常操作。这样就直接把 spin lock 给排除在外了。lock-free 通常使用的是 RMW (read-modify-write)原语,由硬件来支持,它最常见的一种实现是 CAS,即 Compare And Swap,在 x86 上对应的是 cmpxchg 指令。Linux 内核中把 cmpxchg 给封装成了一个通用的函数。第一次看到这个函数感觉有些别扭,估计是出于性能的考虑最新的内核代码中的实现还不如之前的实现容易让人读懂,下面是 2.6.32 中它 generic 的实现:

[c]
static inline unsigned long __cmpxchg(volatile unsigned long *m,
unsigned long old, unsigned long new)
{
unsigned long retval;
unsigned long flags;

    local_irq_save(flags);
    retval = *m;
    if (retval == old)
            *m = new;
    local_irq_restore(flags);
    return retval;

}
[/c]

比较新的 gcc 中有和它对应的一个内置原子函数 __sync_bool_compare_and_swap()。cmpxchg 一个典型的用例是 lockless list 的实现,见其插入操作的实现:

[c]
static inline bool llist_add(struct llist_node new, struct llist_head head)
{
struct llist_node entry, old_entry;

    entry = head-&gt;first;
    for (;;) {
            old_entry = entry;
            new-&gt;next = entry;
            entry = cmpxchg(&amp;head-&gt;first, old_entry, new);
            if (entry == old_entry)
                    break;
    }

    return old_entry == NULL;

}
[/c]

我们可以看出,它虽然是 lock-free 的,但是避免不了等待(wait),因为这种 cmpxchg 一般都套在一个循环中直到操作成功才会退出,不成功就会一直尝试。不运行的话你根本不会知道它到底会尝试多少次才能成功。

另外一种更加高级的 NBS 就是 wait-free 了,这个可以说是大家梦寐以求的东西,它不阻塞,不用忙等,连等待都不用,也可以避免竞争!减少了很多 CPU 的浪费!显然 wait-free 一定是 lock-free 的,它们之间最大的区别是:等待的次数是否有上限。Wait-free 要求每个线程都可以在有限的步骤之内完成操作,不管其它线程如何!无奈这个太难实现,而且一般都比较复杂,即使实现了它的 overhead 也未必就比等价的 lock-free 的小。网上有很多相关的资料,你可以自行搜索一下。

参考资料:

1. http://julien.benoist.name/lockfree.html
2. http://audidude.com/?p=363
3. http://burtleburtle.net/bob/hash/lockfree.html
4. http://en.wikipedia.org/wiki/Non-blocking_synchronization
5. http://en.wikipedia.org/wiki/Lock-free_and_wait-free_algorithms

Python heapq 模块的实现

无意间看了一下 Python heapq 模块的源代码,发现里面的源代码写得很棒,忍不住拿出来和大家分享一下。:)

heapq 的意思是 heap queue,也就是基于 heap 的 priority queue。说到 priority queue,忍不住吐槽几句。我上学的时候学优先级队列的时候就没有碰到像 wikipedia 上那样透彻的解释,priority queue 并不是具体的某一个数据结构,而是对一类数据结构的概括!比如栈就可以看作是后进入的优先级总是大于先进入的,而队列就可以看成是先进入的优先级一定高于后进来的!这还没完!如果我们是用一个无序的数组实现一个priority queue,对它进行出队操作尼玛不就是选择排序么!尼玛用有序数组实现入队操作尼玛不就是插入排序么!尼玛用堆实现(即本文要介绍的)就是堆排序啊!用二叉树实现就是二叉树排序啊!数据结构课学的这些东西基本上都出来了啊!T_T

heapq 的代码也是用 Python 写的,用到了一些其它 Python 模块,如果你对 itertools 不熟悉,在阅读下面的代码之前请先读文档。heapq 模块主要有5个函数:heappush(),把一个元素放入堆中;heappop(),从堆中取出一个元素;heapify(),把一个列表变成一个堆;nlargest() 和 nsmallest() 分别提供列表中最大或最小的N个元素。

先从简单的看起:

[python]
def heappush(heap, item):
“””Push item onto heap, maintaining the heap invariant.”””
heap.append(item)
_siftdown(heap, 0, len(heap)-1)

def heappop(heap):
“””Pop the smallest item off the heap, maintaining the heap invariant.”””
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup(heap, 0)
else:
returnitem = lastelt
return returnitem
[/python]

从源代码我们不难看出,这个堆也是用数组实现的,而且是最小堆,即 a[k] <= a[2*k+1] and a[k] startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if cmp_lt(newitem, parent):
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem

def _siftup(heap, pos):
endpos = len(heap)
startpos = pos
newitem = heap[pos]

# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1    # leftmost child position
while childpos &lt; endpos:
    # Set childpos to index of smaller child.
    rightpos = childpos + 1
    if rightpos &lt; endpos and not cmp_lt(heap[childpos], heap[rightpos]):
        childpos = rightpos
    # Move the smaller child up.
    heap[pos] = heap[childpos]
    pos = childpos
    childpos = 2*pos + 1
# The leaf at pos is empty now.  Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)

[/python]

上面的代码加上注释很容易理解,不是吗?在此基础上实现 heapify() 就很容易了:

[python]
def heapify(x):
"""Transform list into a heap, in-place, in O(len(x)) time."""
n = len(x)

# Transform bottom-up.  The largest index there&#039;s any point to looking at
# is the largest with a child index in-range, so must have 2*i + 1 &lt; n,
# or i &lt; (n-1)/2\.  If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1\.  If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that&#039;s again n//2-1.
for i in reversed(xrange(n//2)):
    _siftup(x, i)

[/python]
这里用了一个技巧,正如注释中所说,其实只要 siftup 后面一半就可以了,前面的一半自然就是一个heap了。heappushpop() 也可以用它来实现了,而且用不着调用 heappush()+heappop():

[python]
def heappushpop(heap, item):
"""Fast version of a heappush followed by a heappop."""
if heap and cmp_lt(heap[0], item):
item, heap[0] = heap[0], item
_siftup(heap, 0)
return item
[/python]

第一眼看到这个函数可能觉得它放进去一个再取出来一个有什么意思嘛!仔细想想它很有用,尤其是在后面实现 nlargest() 的时候:

[python]
def nlargest(n, iterable):
"""Find the n largest elements in a dataset.

Equivalent to:  sorted(iterable, reverse=True)[:n]
&quot;&quot;&quot;
if n &lt; 0:
    return []
it = iter(iterable)
result = list(islice(it, n))
if not result:
    return result
heapify(result)
_heappushpop = heappushpop
for elem in it:
    _heappushpop(result, elem)
result.sort(reverse=True)
return result

[/python]

先从 list 中取出 N 个元素来,然后把这个 list 转化成 heap,把原先的 list 中的所有元素在此 heap 上进行 heappushpop() 操作,最后剩下的一定是最大的!因为你每次 push 进去的不一定是最大的,但你 pop 出来的一定是最小的啊!

但 nsmallest() 的实现就截然不同了:

[python]
def nsmallest(n, iterable):
"""Find the n smallest elements in a dataset.

Equivalent to:  sorted(iterable)[:n]
&quot;&quot;&quot;
if n &lt; 0:
    return []
if hasattr(iterable, &#039;__len__&#039;) and n * 10  Largest of the nsmallest
    for elem in it:
        if cmp_lt(elem, los):
            insort(result, elem)
            pop()
            los = result[-1]
    return result
# An alternative approach manifests the whole iterable in memory but
# saves comparisons by heapifying all at once.  Also, saves time
# over bisect.insort() which has O(n) data movement time for every
# insertion.  Finding the n smallest of an m length iterable requires
#    O(m) + O(n log m) comparisons.
h = list(iterable)
heapify(h)
return map(heappop, repeat(h, min(n, len(h))))

[/python]

这里做了一个优化,如果 N 小于 list 长度的1/10的话,就直接用插入排序(因为之前就是有序的,所以很快)。如果 N 比较大的话,就用 heap 来实现了,把整个 list 作成一个堆,然后 heappop() 出来的前 N 个就是最后的结果了!不得不说最后一行代码写得太精炼了!

Python memoize decorator

偶然看 Python 代码,看到这么一个叫memoize decorator 的东西,你会发现它非常有用。

看下面的例子,我们最常见的 fibonacci 数列的递归实现。

~% python
>>> def fib(n):
...     if n in (0, 1):
...             return n
...     else:
...             return fib(n-1)+fib(n-2)
...
>>> import timeit
>>> timeit.timeit("fib(5)", "from __main__ import fib")
3.946546792984009
>>> timeit.timeit("fib(7)", "from __main__ import fib")
10.944302082061768

很慢,不是么?你当然可以把它的实现改成迭代,但是,如果前提是我们不对fib()函数本身做任何代码修改的话,我们还有别的方法去改进它。

我们注意到,fib(n-1) 和 fib(n-2) 实际上重复计算了很多结果,根本没必要,一个自然而然的想法就是把这些结果缓存起来,于是就有了下面的:

>>> def memoize(f):
...     memo = {}
...     def wrapper(x):
...         if x not in memo:
...             memo[x] = f(x)
...         return memo[x]
...     return wrapper
...
>>>
>>> def fib(n):
...     if n in (0, 1):
...         return n
...     else:
...         return fib(n-1) + fib(n-2)
...
>>> fib = memoize(fib)
>>> import timeit
>>> timeit.timeit("fib(7)", "from __main__ import fib")
0.35535502433776855

还不够,Python 还有自己的解决办法,那就是用 decorator

[python]
def memoize(function):
memo = {}
def wrapper(args):
if args in memo:
return memo[args]
else:
rv = function(
args)
memo[args] = rv
return rv
return wrapper

@memoize
def fib(n):
if n in (0, 1):
return n
return fib(n-1) + fib(n-2)
[/python]

为此,在 Python3 中还专门引入了一个 lru_cache

这个东西的牛逼之处在于,有了它你可以放心地去写递归的代码,不用为了效率去把它改成迭代形式,你也知道,有些时候递归代码比迭代代码更容易理解。这样可读性有了,效率也提升了,不用修改函数的实现,直接让它从次方级降到了线性级。