shomanのブログ

ただの備忘録

Goのcontainer/heapのソースコードを読んでいく

container/heapとは

Goの標準ライブラリには、データ構造の一種であるヒープがcontainer/heapによって提供されています。

github.com

heapはヒープソート(heap sort)や優先度付きキュー(priority queue)の実装に使われる重要なデータ構造のうちの一つです。

ただ、container/heapはヒープに関する処理を実装したインターフェースを提供しており、プログラマは自分の用意した型にインターフェースを実装することで実際にその型をヒープとして使用できるようになるという、一見変わった構造をしているため外から見ると中で何をしているかよく分かりません。

例えば、ドキュメントに例としても載っている、intを格納するヒープは以下のように実装できます。

package main

import (
    "container/heap"
    "fmt"
)

type IntHeap []int

// heap.InterfaceをIntHeapに実装
func (h IntHeap) Len() int           { return len(h) }
func (h IntHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] }

func (h *IntHeap) Push(e interface{}) {
    *h = append(*h, e.(int))
}

func (h *IntHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[:n-1]
    return x
}

func main() {
    q := &IntHeap{1, 3, 4}
    heap.Init(q)
    heap.Push(q, 5)
    heap.Push(q, 2)
    fmt.Println(*q) // [1, 2, 4, 5, 3]
    for len(*q) > 0 {
        fmt.Printf("%v ", heap.Pop(q)) // 1, 2, 3, 4, 5
    }
}

用意した型IntHeapをヒープとして使用するためには、Len, Swap, Less, Push, Popというメソッド(heap.Interfaceのメソッド群)を実装してあげる必要があります。

IntHeap.Lessで何やら順序付けを定義しているようです。 また、IntHeap.Pushは末尾に要素を追加しているだけ、IntHeap.Popは末尾の要素を削除しそれを返しているだけ。

なぜこれでIntHeapがヒープとして使えるようになるのか、よく分かりません。

そして、実際にIntHeapをヒープとして使うためにはheap.Initという関数を最初に呼ぶ必要があります。

その後、heap.Pushという関数でヒープに要素を追加し、heap.Popという関数を呼ぶことで最小値が順番に返ってくることが分かります。

さらに、heap.Pushで要素を追加したあとのqの中身を見てみると、昇順に要素がソートされているのではなく、[1, 2, 4, 5, 3]という謎の順番になっていることが分かります。

目標

今回はcontainer/heapのソースコードを読んでいくことで、中で何が行われているのかを理解することを目標とします。

(そしてコードを読むことは、一般にヒープというデータ構造がどのようなアルゴリズムを含んでいるのかを知る、とても良い機会になると思います。)

ヒープとは

まず、ソースコードを読む前にヒープがどのような性質を持つデータ構造であるかを説明します。

(二分木)ヒープは図のような、最も下のレベル以外が完全に埋まっている二分木と見なすことのできる構造をしています。

f:id:shoman_panda:20200516192731p:plain
ヒープの二分木表現

ヒープには2種類あり、それぞれminヒープ、maxヒープと呼ばれます。 どちらもヒープ条件(heap property)というものを満たしますが、条件はminヒープとmaxヒープで異なります。

minヒープの場合、任意のノードについて、

親ノードの値 \leq 子ノードの値

となります。 したがって、minヒープは根に最小の要素を格納し、あるノードを根とする部分木が含む値は、そのノード自身の値以上になると言えます。

maxヒープは逆で、根の値が最大になります。

親ノードの値 \geq 子ノードの値

よくヒープは木構造で表されることが多く、これはこのヒープ条件を視覚化する上で分かりやすいと思います。

ただ、木構造配列で表すことができ、実際の実装では配列を使用します1

f:id:shoman_panda:20200516203546p:plain
ヒープの配列表現

親ノードと子ノードはそれぞれ配列の添字を使うことで互いにアクセスできます。

配列の添字を0-origin(最初の要素の添字が0)としたとき、


\begin{eqnarray}
parent_i &=& (i-1)/2\\
left_i &=& 2*i+1\\
right_i &=& 2*i+2
\end{eqnarray}

が成り立ちます。

ここまで確認したところで、実際にcontainer/heapの実装を見ていきましょう!

container/heapのソースコード

まず、Interfaceという名前のinterfaceが定義されています。

type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

PushxLen()番目の要素(つまり末尾)として加えるメソッド、 PopLen()-1番目の要素(つまり最後の要素)を削除し返すメソッド と書かれています。

つまりは、このInterfaceを使う側はそのような処理をそれぞれPushPopとして 実装することを期待されています。

sort.Interfaceが埋め込まれていますが、これはsortパッケージに以下のように定義されています。

type Interface interface {
    // Len is the number of elements in the collection.
    Len() int
    // Less reports whether the element with
    // index i should sort before the element with index j.
    Less(i, j int) bool
    // Swap swaps the elements with indexes i and j.
    Swap(i, j int)
}

ここで、heap.Interfaceコメントにとても重要なことが書かれています。

このインターフェースを実装した全ての型は、以下の不変式を満たすminヒープとして使える
!h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() -- (★)

(★)はどういう意味でしょうか。 先ほどのIntHeapの例のように

func Less(i, j int) { return h[i] < h[j] }

と定義した場合を考えてみます。

すると、 !h.Less(j, i) <=> h[j] >= h[i] となり、これが 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() を満たす全ての(i, j)で成り立つと言っています。

2*i+1 <= j <= 2*i+2は先ほどの親ノードと子ノードの添字の関係を見ると、jiの子ノードに対応していることが分かります。

つまりこの場合、(★)は全てのノードについて、子ノードの値は親ノード以上であるというminヒープ条件を表しているのです!

順序関係を逆にして、

func Less(i, j int) { return h[i] > h[j] }

とした場合は、(★)はmaxヒープ条件を表すことになります。

すなわち、このheap.Interfaceを実装することで、常にminヒープ条件、またはmaxヒープ条件が満たされ、実装した型をヒープとして使える、と主張しているわけです。

ここまででheap.Interfaceの意味することは分かったのですが、それぞれのメソッドがどう使われているかはまだ謎です。 読み進めていきましょう。

次にInitという関数が定義されています。 これはIntHeapの例で見たように、最初に呼ばれるべき関数です。

func Init(h Interface) {
    // heapify
    n := h.Len()
    for i := n/2 - 1; i >= 0; i-- {
        down(h, i, n)
    }
}

何やらdownという関数を約半数の要素について呼び出しているようです。

一旦飛ばして次を見てみます。

次にはPushという関数が定義されています。

func Push(h Interface, x interface{}) {
    h.Push(x)
    up(h, h.Len()-1)
}

ここではまず、Interface型とinterface{}型の引数を受け取っています。

若干ややこしいですが、hは先ほど定義してあったInterfaceという名前の型の変数であり、 xinterface{}型の変数です。

interface{}型の変数はどのような型も取り得る型で、組み込みで定義されているものです。

さて、Pushではまずh.Push(x)が実行されています。 h.Push(x)は配列の最後に値を追加することを想定したメソッドです。

その後、up(h, h.Len()-1)を呼び出しています。

upはパッケージプライベートな関数で、以下のように定義されています。

func up(h Interface, j int) {
    for {
        i := (j - 1) / 2 // parent
        if i == j || !h.Less(j, i) {
            break
        }
        h.Swap(i, j)
        j = i
    }
}

実は、upはヒープのアルゴリズムを理解する上で非常に重要な役割を担っています。

まず、jの親ノードの添字をiに代入しています。

次のif文の条件には、i == j || !h.Less(j, i)とあります。

i == jは子ノードと親ノードの添字が等しいとき、つまりjが根であるときに満たされます。

そして2つ目の条件の!h.Less(j, i)ですが、これはまさに先ほど見たヒープ条件を表している式です。

つまり、jが根ノード、もしくはjが親ノードとヒープ条件を満たしている場合にbreakし実行を終了します。

逆にヒープ条件を満たさない場合はその次のh.Swap(i, j)が実行され、 i番目の要素とj番目の要素が交換されます。

これはつまり、jから見たときに自身と親ノードを交換することを意味します。

それにより、元々のjの位置にあるノードは親ノードとヒープ条件を満たすことができます。 しかし、元々のiの位置にあるノードはその親ノードとヒープ条件を満たすかは不明です。

f:id:shoman_panda:20200516215924p:plain
upの挙動

よって、j = iとすることで、元々iの位置にあったノードについて再度、ヒープ条件が成り立つかを確認していきます。

まとめると、upは引数のj番目のノードが親ノードとヒープ条件を満たしているかを確認し、 満たしていない場合は要素を交換する。そして、交換した場合は再帰的に親ノードとヒープ条件を満たしているかを確認していく、といった処理をしています。

Pushのコードに戻ってみましょう。

func Push(h Interface, x interface{}) {
    h.Push(x)
    up(h, h.Len()-1)
}

ここでは、up(h, h.Len()-1)が呼ばれています。

h.Len()-1は配列の最後の添字を表すので、jはたった今追加されたxを格納する場所を指します。

つまり、Pushは要素を追加し、その要素が親ノードとヒープ条件を満たすまで交換していくことで、 xがヒープ条件的に正しい位置に来るような処理をする関数ということが分かりました。

f:id:shoman_panda:20200516220550p:plain
Push(h, 2)のときのヒープの様子

次に出てくる関数はPopです。

func Pop(h Interface) interface{} {
    n := h.Len() - 1
    h.Swap(0, n)
    down(h, 0, n)
    return h.Pop()
}

この関数はヒープの根の要素を取り除き、その値を返す関数だと期待できます。

まず、n := h.Len() - 1によりnに「根を取り除く前の要素数-1」が代入されています。

その後、h.Swap(0, n)で根の要素と最後の要素(= 木構造の一番右の葉)を交換しています。

次にdown(h, 0, n)となっています。このdownInitでも出てきました。

そして最後にh.Popが呼ばれていることから、配列の最後の要素が削除され、返り値となることが分かります。

今、h.Swap(0, n)により、元々根にあった要素が配列の最後に移動しているため、 minヒープの場合は最小値が返ることになります。

逆に、元々最後にあった要素は根に移動したため、ほとんどの場合、 根とその子ノード間でヒープ条件が満たされてはいません。

f:id:shoman_panda:20200516221849p:plain
h.Swap(0, n)後のヒープ

それゆえにdownによって、ヒープ条件が再度成り立つよう、配列が整えられていると予想できます。

downupと同様に、ヒープを実装する上でとても重要な関数です。 コードを見てみましょう。

func down(h Interface, i0, n int) bool {
    i := i0
    for {
        j1 := 2*i + 1
        if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
            break
        }
        j := j1 // left child
        if j2 := j1 + 1; j2 < n && h.Less(j2, j1) {
            j = j2 // = 2*i + 2  // right child
        }
        if !h.Less(j, i) {
            break
        }
        h.Swap(i, j)
        i = j
    }
    return i > i0
}

まず、iが引数のi0で初期化され、ループに入ります。

j1 := 2*i+1と定義されていますが、これはj1iの左側の子ノードを表していると分かります。

その後のif文の条件式で、j1 >= n || j1 < 0となっていますが、 j1 < 0はコメントからもオーバーフローが起きた場合だと分かります。

j1 >= nについては一旦飛ばします。

次に、j2 := j1 + 1となっており、j2iの右側の子ノードを表すことが分かります。

j2 < nj1 >= nと同様に一度飛ばすとして、h.Less(j2, j1)ならばjj2となります。

jはその前の行でj1で初期化されているため、結果的にjにはminヒープの場合には小さい方の値が、 maxヒープの場合には大きい方の値が入ることになります。

そして、そのjiがヒープ条件を満たすかを!h.Less(j, i)によって確認しており、 もしも満たしている場合(つまり!h.Less(j, i)がtrueになる場合)にはbreakします。

もし、minヒープの場合に、子ノードの大きい方と交換すると仮定すると、 状況によってはヒープ条件を満たせない場合があるため、必ず小さい方と交換します。 maxヒープの場合についても同様です。

f:id:shoman_panda:20200516225239p:plain
downの挙動

jiがヒープ条件を満たしていない場合は、h.Swap(i, j)で要素を交換し、次のijとします。

つまり、自身と子ノードを交換し、再帰的にその子ノードと子ノードの子ノードとの間でヒープ条件が成り立つかを確認していきます。

upが自身と親ノードのヒープ条件の成立を確認する処理をしていたのに対し、 downは自身と子ノードのヒープ条件が成り立つかを確認します。

では、最初のif文の条件である、j1 >= nとはどういう意味でしょうか。

nPop内の呼び出しではh.Len() - 1でした。

downは自身と子ノードがヒープ条件を満たすまで再帰的に実行を続けますが、 この処理は削除される最後の要素には適応すべきではありません。

つまり、j1 >= nbreakすることは、ヒープ条件がn-1番目の要素まで成立すること保証しています。

f:id:shoman_panda:20200516223310p:plain
Pop(h)のときのヒープの様子

ここでnはヒープサイズと呼ばれます。 ヒープサイズをh-sizeとすると、h-size <= h.Len()が成り立ち、 h[0]..h[h-size-1]の要素がヒープ条件を満たすことを表します。

downは返り値としてi > i0を返してますが、これはiが更新されたとき、 つまりi0とその子ノードの交換が起きたときにtrueとなることが分かります。

これらより、Popは根の要素を返し、その後ヒープ条件を満たすように配列を整える処理をすることが分かりました。

downの処理が分かったところで、Initに戻ってみます。

func Init(h Interface) {
    // heapify
    n := h.Len()
    for i := n/2 - 1; i >= 0; i-- {
        down(h, i, n)
    }
}

この関数は実は、与えられた配列からヒープを構築する処理をしています。

n/2 - 1番目から0番目の要素について、自身と子ノードがヒープ条件を満たすよう調整をするため、 down(h, i, n)を呼んでいます。

ここで、nh.Len()となっていることから、ヒープサイズは配列のサイズと等しくなります。

ここで、n/2 - 1番目の要素は子ノードを持つ最も右側のノードを表します。

なぜなら、n/2以降の要素は、2*i+1, 2*i+2に対応する要素がなく、子ノードのない葉ノードになっているからです。

つまり、子ノードを持つ最も右側(木構造で表すと最も右下)のノードからヒープ条件を満たすよう、 整えていく処理をしていることが分かります。

f:id:shoman_panda:20200516235203p:plain
Init(h)のときのヒープの様子

それを根まで順番に行うことで、配列全体がヒープ条件を満たすことになります。

次に、 Removeを見ていきます。

func Remove(h Interface, i int) interface{} {
    n := h.Len() - 1
    if n != i {
        h.Swap(i, n)
        if !down(h, i, n) {
            up(h, i)
        }
    }
    return h.Pop()
}

まずnh.Len() - 1としていますが、Popのときと同様、要素を削除するためヒープサイズを配列のサイズ-1としています。

n == iのときは削除する要素が最後の要素であるため、そのままh.Pop()を呼べば良いです。

そうでない場合は、まずh.Swap(i, n)によってi番目の要素とn番目の要素(つまり最後の要素)を交換します。

そして次の行ですが、down(h, i, n)を呼んでおり、これがfalseの場合はup(h, i)の実行に移ります。

downi番目の要素とその子ノードのヒープ条件を整える関数でしたが、 すでに整っている場合はfalseが返ってくるのでした。

つまり、子ノードとヒープ条件が満たされている場合はupを呼ぶことで今度は親ノードとのヒープ条件が成立しているかを確認しているのです。

このようにdown -> upの順番で確認するため、downboolの返り値を返していたのです。 (up -> downの順で常に確認するのであれば、upが返り値を返すようにすれば問題なさそうです。)

いずれにせよ、元々最後の要素であった値は、downまたはupによってi番目からヒープ条件的に適切な位置へと移動することができます。

そして、h.Pop()によって最後の要素(元々i番目にあった要素)を削除し、それを返します。

この挙動からもわかるように、Pop(h)Remove(h, 0)と等しいです。

最後の関数として、Fixがあります。

func Fix(h Interface, i int) {
    if !down(h, i, h.Len()) {
        up(h, i)
    }
}

この関数は、i番目の要素の値が変更された後に呼ばれるべきものです。

ここまで読んでいただけた人であれば、Fixを理解するのはそんなに難しくないはずです。

更新した値がヒープ条件的に適切な位置に移動するよう、downまたはupを呼んでいます。

値を更新しただけなので、ヒープサイズは変わらずh.Len()となっていることに注意してください。

さて、これでcontainer/heapを全て読み終えることができました!

コードを読んでいくことで、Goのライブラリ実装だけではなく、一般にヒープがどういう動きをするかについても理解が深まったかと思います。

一見奇妙に見えたインターフェースも、中身の挙動を知ることで理にかなっていると感じます。

最初に出てきたIntHeapの例でなぜ、q[1, 2, 4, 5, 3]という順になっているか、今はよく分かると思います。

そして、Lessの実装を変えることで、minヒープとmaxヒープが同じコードで実装できる設計は美しいです2

ただ、なぜPushPopを指定されたように実装しなければならないのかを理解するには、 ヒープの動作を理解していないとよく分からないのではないかと思います。

この点は、ジェネリクスがないGoの辛い部分を表している一例かもしれません。

ヒープソートの実装

container/heapにはヒープソートは実装されていませんが、downを使うことで簡単に実装できます。

まず、maxヒープをcontainer/heapで実装することを考えます。

この場合、根にはヒープの最大値が含まれます。

それを最後の要素と交換し、新たな根の要素がヒープ条件を満たすようdownで調整します。 その際、ヒープサイズを1減らすことで、今度は2番目に大きい要素が根に来るようになります。

これを繰り返すことで、要素が昇順でソートされます。 このソートはその場で(in-place)で行われるため、空間計算量はO(1)となります3
実際のコードは以下のようになります。

func Sort(h Heap) {
    for i := h.Len() - 1; i > 0; i-- {
        h.Swap(0, i)
        down(h, 0, i-1)
    }
}

降順でソートしたい場合は、minヒープとなるようLessを変更すれば良いです。

up, down再帰関数表現

上で解説したように、up, downはそれぞれ親ノード、子ノードと自身のヒープ条件を満たすかを再帰的に確認しています。

実際の実装ではfor文によるループで実装されていますが、これらを再帰関数の形に変形することもできます。 変数名は適宜変更していますが、アルゴリズムは同じです。

func up(h Heap, i int) {
    parent := (i - 1) / 2
    if i == parent || !h.Less(i, parent) {
        return
    }
    h.Swap(parent, i)
    up(h, parent)
}

func down(h Heap, i, n int) bool {
    left := 2*i + 1
    if left >= n || left < 0 {
        return false
    }
    smaller := left
    if right := left + 1; right < n && h.Less(right, left) {
        smaller = right
    }
    if h.Less(smaller, i) {
        h.Swap(i, smaller)
        down(h, smaller, n)
        return true
    }
    return false
}

次回はヒープのそれぞれの処理の計算量を見積もっていきたいと思います。


  1. 構造体とポインタを使った実装も可能ですが、少なくともGoでは配列を使って実装する方がシンプルだと思います

  2. おそらく多くの言語のヒープ実装は同じようになっているかと思いますが。

  3. minヒープを作り、Popで取り出した要素を別のメモリに順番に追加していく方法でも昇順ソートは可能ですが、maxヒープを使う方法だとin-placeで昇順ソートが可能です。