shomanのブログ

ただの備忘録

修士課程を振り返って

修士課程を修了しました.以下は,主に自分で後に振り返るためのポエムです.

先日,東京工業大学情報理工学院情報工学情報工学コースを正式に修了し,修士(工学)を獲得した.学部も含めると5年間の学生生活だった.もっとも,小学生から含めると人生の大半を学生という身分で過ごしてきたわけで,それが今日終わると思うとなんだか不思議な気分である.

5年という期間は,大学生活として適切な長さであったように感じる.これより短いと物足りず,長くなると飽きてしまうような気がする.学部を3年で卒業し,修士は期間を延ばさす2年で修了するという選択は,我ながら幸運に恵まれたマジの最適解だった.

そもそも,修士課程に進学するという選択自体,総じるとポジティブになる.とりあえず,修士課程の中核を成す研究の話は一旦置いておくとして,修士課程の良かった点を振り返る.

まず,授業が良かった. 修士の授業内容は基本的に,学部の授業と比較して,実社会との結びつきが強い.そういった授業を好んで選んでいたこともあるが,自分の性格から,実社会との応用部分について学べる機会が与えられている環境は満足だった.広く使われている技術の裏にはどんな理論があるかを学べたり,チーム開発を体験できたことなどは良い記憶として残っている.実際に社会に出た方がよく学べるという意見はもっともであるが,それは次の点も考慮すべきである.

取れる行動の選択肢が多い. これは,大学院生は暇で時間的余裕があるという意味ではないし,大学(院)生の暇さ加減をアピって社会にネガキャン(?)する意図は全くもってない.むしろ,理系大学院生(文系については単に分からない)は基本的に授業や研究で忙しいし,定時が明確に設定されてないという点で多くの労働者よりも余暇は少ないかもしれない.

取れる行動の選択肢が多いとは,例えば,留学に行って海外で学んだり研究するという場所の選択や,ある程度の期間,様々な企業のインターンに行って複数の労働環境を経験できるという点で,選べるオプションが多いという意味である. 労働者は,あくまでも雇用主に対して価値を提供するという契約にある程度縛られるため,逆にお金を払って大学に所属している学生という身分ほどには,選択の自由度は低いと私は理解している.自分は,大学院生時代にスイスに研究留学することもできたし,色んな企業のインターンに行って(実際の業務に比べれば塵程かもしれないが)労働を体験することができた.

留学での一番の気付きは,(残念ながら研究内容ではなく)ご飯の大切さである.これは冗談ではなく,紛れもないマジで,自分にとって海外に住む上で一番重要な要素は飯だった.スイス人には申し訳ないが,スイスの料理はあまりおいしくなかった.おいしいと思ったのは半年の中で数えるほど.常にどこか満たされない感じがあって,ストレスが溜まった.日本から送ってもらったカップヌードルを,毎日食べるか否か,葛藤するほどには病んでいた.逆に,ご飯のおいしさがある程度担保されていれば,それ以外の壁はあるものの,それなりに生活できる気がする.留学での経験はポジティブなものもネガティブなものもたくさんあるが,それだけで1エントリになってしまうので,ここでは飯の重要性に触れるまでとする.

最後は,優秀な人の存在を多く感じられたことである. これはこの前の話にも繋がるが,研究室,留学先,インターン先などでマジで優秀な人に出会えたことが大きい.留学先の研究室のPh.D.の学生との議論では,頭の回転が早く過ぎて何言ってるか訳わからんことも多かったし,そもそも学生は勉強しまくっていた.インターン先では,経験に裏打ちされたゴリゴリの技術力を振りかざしている人もいた.そういった人の存在を認知できたことで,自らの未熟さに気がつくと同時に,学生生活へのモチベーションにもなった.

とりあえず列挙したところで,雑多な印象しかないが,これら3点が大きな価値だったように思える.そもそも修士に進学していない世界線を同時に体験し比較することはできないし,思い出を美化している点もあると思うが,後悔はない.

研究について

修士課程を振り返る上で避けては通れないトピックである「研究」. 最初に断っておくが,自分は大きな研究成果をあげたわけでもなければ,研究に100%全力投球できたわけでもない. その上でいくつか思う点があるので,つらつらと書いていく.

第一に,研究室の先生方がとても良い人だった. うちの研究室は週一回,小規模のミーティングがあり,そこで各々の進捗を報告するという形式であったのだが,疑問点を質問をすれば丁寧に答えてくれ,相談があればきちんと話を聞いてもらえる. 早期卒業することも留学に行くことも承諾してくれたし,基本的に学生の考えを尊重してくれたと感じている. 時に意見が合わないなと思ったときも,後々考えてみれば自分が間違っていたことがほとんどだった.理不尽なことや非合理的なことを要求されたこともない. これらを当たり前と思うかどうかは人それぞれだが,周りの話を聞く限り,当てはまらないことがしばしばあるらしいので,自分は恵まれていたのだろう.

第二に,これはネガティブな感想になるが,研究行為そのものにあまり興味を持てなかった. これは単に研究テーマが合致しなかったというありきたりな問題の帰結かもしれないし,自分の嗜好の問題かもしれない. もちろん,新しいアルゴリズムを思いついたときや,良い実験結果が出たときはそれなりに楽しさを感じたが,それ以外の時間の方が圧倒的に多かった. むしろ,その一時的な楽しさが,それ以外の諸々を打ち消すほどに,自分の中で大きくならなかったと言えるのかも.

自分は,コードを書いたり,実際に少しでも社会に役に立っていると実感できる何かを作っているときがもっとも楽しいし,そのためだけになるべく時間を使いたいと思っている. 研究をしているときでも,アルゴリズムをコードに落とし込んでいるときや,全体の設計を考えているときはワクワクしていた. 一方で,正直なところ,自分の研究内容がどれほど社会に還元されるかは未知数で,その確率はとても小さいように思えてしまった. 論文を読んでいても,時には学術的な面白みを感じることもあったが,その実現可能性に疑問を持たざるを得ないことも往々にしてあった.

無論,世の中は無数の研究成果によって成り立っているわけで,私たちはその巨人の肩の上を借りなければ,豊かな世界を観ることはできない. しかし,自分にはもっと異なるアプローチが適していると思った.

研究成果をあげている人たちは本当に尊敬するし,研究に没頭している人たちを見ると,自分には敵わないなと感じる. そもそも,研究という行為に合う人の割合は,同世代の1%くらいなんじゃないかと思ってる.それくらい自分には難しかった.

まとめ

これらの様々を経験できた修士課程を含め,学生生活は楽しかった.良き友人にも出会えたし,ここまで支えてくれた家族にはとても感謝している.

明日からは,ソフトウェアエンジニアとして企業で働く. 中学生・高校生の頃は漠然と,大学生になってからは割と明確に,ソフトウェアを作る人になりたいと思っていたので,とても楽しみにしている.

メルカリインターンでGoの関数の複雑度を測るツールを作った話

こんにちは.先週、メルカリのオンラインインターンに参加してきました!

インターンの概要はここに載っています. mercan.mercari.com

最近いつも研究がやばいと言いながら、参加したい気持ちを抑えられませんでした…

概要

5日間でGoの静的解析を学び、ツールを作るという内容のオンラインインターンでした.

内2日はGoの静的解析についての講義、残りの3日間は開発というスケジュールです.

講義資料はここで公開されており、講義の録画も後日公開される予定みたいです.

講義も分かりやすいく、そもそもGo自体に静的解析周りの環境が充実していたこともあり、実装に集中できたと思います.

skeletonという静的解析のコードの雛形を作ってくれるツールはとても便利です.

開発はたまたま最初のビデオチャットで同じ部屋になった大学の友達のsff1019とチーム開発することにしました.

複雑度計測ツール

例えば、プルリクエスト(PR)をレビューしてる際に、そのソースコードの難易度がどれくらいなのかの目安を知りたいと思ったことはありませんか?

ソースコードの難易度については様々なメトリクスがありますが、今回はその中でもCyclomatic Complexity (循環的複雑度)とHalstead Complexity (日本語訳分からん) を計測する静的解析ツールを作りました. (複雑度の詳細については後述)

また、実際にGithub Actionsを利用することでPRの際に静的解析を走らせ、reviewdogというツールを使うことでPRのdiffに対しアノテーションをつける機能も実装しました.

https://user-images.githubusercontent.com/32924835/92326807-d7d0db00-f08f-11ea-940e-bdb9d6e81546.png

リポジトリは以下です (ツール自体とGithub Actionsのaction).

GitHub - shoooooman/go-complexity-analysis: Calculate complexities of golang functions with static analysis

GitHub - shoooooman/go-complexity-analysis-action: Show complexities of golang functions on pull requests with reviewdog

使い方

インストール

$ go get github.com/shoooooman/go-complexity-analysis/cmd/complexity

実行

$ go vet -vettool=$(which complexity) .

Github Actionsとの連携

PRに対し実行したい場合は、Githubリポジトリ.github/workflows/ 以下に下のようなyamlファイルを作成してください.

on: pull_request

jobs:
  reviewdog:
    runs-on: ubuntu-latest
    name: complexity analysis
    steps:
    - uses: actions/checkout@v2
    - uses: shoooooman/go-complexity-analysis-action@v1
      with:
        github_token: ${{ secrets.GITHUB_TOKEN }}

これでPRの際にactionが実行されアノテーションが表示されるはずです.

デフォルトでは Cyclomatic Complexity > 10 の関数のみに出力される設定になっていますが、これは変更することができます.

詳細な説明はリポジトリのREADMEをお読みください.

複雑度とは

そもそもCyclomatic ComplexityとかHalstead Complexityとか何?という話です.

Cyclomatic Complexity (循環的複雑度)

Wikipediaによるとプログラムの可読性に関係する指標で、以下のように定義されます.

M = E − N + 2P
ただし、
 M = 循環的複雑度
 E = グラフのエッジ数
 N = グラフのノード数
 P = 連結されたコンポーネントの数

こちらの記事も分かりやすかったです.

gocycloを使ってgo言語のプロダクトをシンプルに維持する|moli9ma|note

しかし、実際にソースコードの循環的複雑度を計測する場合には if, if else, for, switch などの分岐の数を数え計算することが多いようです.

今回の実装も関数定義の初期値を1とし、ブロック中の if, else if, for, case, ||, && の数を足すといった方法をとりました.

例えば以下のコードの場合、Cyclomatic Complexityは3になります.

func f(v int) {
    if v == 0 {      // 1つ目
        ...
    } else if v == 1 // 2つ目
        ...
    } else {
        ...
    }
}

こちらを参考に、閾値を設定できます.

循環的複雑度 複雑さの状態 バグ混入確率
10以下 非常に良い構造 25%
30以上 構造的なリスクあり 40%
50以上 テスト不可能 70%
75以上 いかなる変更も誤修正を生む 98%

Halstead Complexity

こちらはソースコードの難易度を表す指標です.

オペランドとオペレーターの数や種類によって計算されます.こちらのページを参考にしました.

Halsteadの計測の実装はチームメンバーのsff1019がやってくれました.

Maintainability Index (おまけ)

複雑度とは少し違いますが、保守性を表すMaintainability Indexというものが、上のCyclomatic ComplexityとHalstead Complexityを用いて計算できます.

せっかくなので、ツールにはこちらの指標も出力するように実装しました.

オリジナルの計算式をMicrosoft正規化したものを使用しました.

Normalized Maintainability Index = MAX(0,(171 - 5.2 * ln(Halstead Volume) - 0.23 * (Cyclomatic Complexity) - 16.2 * ln(Lines of Code))*100 / 171)

Lines of Code (コードの行数) もコードの静的解析を行うことで取得できます.

改善すべき点

現時点ではPRを出した際に、diffのあるファイルに含まれる全ての関数についてCyclomatic Complexityを表示します.

本来は、diffを含む関数のみに出力すべきですが、範囲が広くなってしまっています.

diffの位置を取得しそれを静的解析ツールに渡せれば、その中でdiffを含む関数を調べ、その定義部分に出力するといったことができるのですが、良い方法が分かっていません.

何か良い案があれば教えていただけると嬉しいです.

ちなみにアノテーションではなくコメントで複雑度を表示しようとすると、Githubの仕様上、diffの前後数行までしかコメントをつけることができないため、厳しいです.

感想

5日間という短い間でしたが、インターン前と比べ静的解析やGoの仕様への (加えてGithub Actionsも) 理解が深まったと思います.

そして何より静的解析が楽しかったです.インターン後も時間があるときに何かツールを開発してみようと考えています.

CI連携について詰まった部分について話すと長くなりそうなので、また別の記事にでも書ければと思います.

関係者のみなさん、ありがとうございました!

Goのcontainer/heapのソースコードを読んでいく

container/heapとは

Goの標準ライブラリには、データ構造の一種であるヒープがcontainer/heapによって提供されています。

github.com

heapはヒープソート(heap sort)や優先度付きキュー(priority queue)の実装に使われる重要なデータ構造のうちの一つです。

ただ、container/heapはヒープに関する処理を実装したインターフェースを提供しており、プログラマは自分の用意した型にインターフェースを実装することで実際にその型をヒープとして使用できるようになるという、一見変わった構造をしているため外から見ると中で何をしているかよく分かりません。

例えば、ドキュメントに例としても載っている、intを格納するヒープは以下のように実装できます。

package main

import (
    "container/heap"
    "fmt"
)

type IntHeap []int

// heap.InterfaceをIntHeapに実装
func (h IntHeap) Len() int           { return len(h) }
func (h IntHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] }

func (h *IntHeap) Push(e interface{}) {
    *h = append(*h, e.(int))
}

func (h *IntHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[:n-1]
    return x
}

func main() {
    q := &IntHeap{1, 3, 4}
    heap.Init(q)
    heap.Push(q, 5)
    heap.Push(q, 2)
    fmt.Println(*q) // [1, 2, 4, 5, 3]
    for len(*q) > 0 {
        fmt.Printf("%v ", heap.Pop(q)) // 1, 2, 3, 4, 5
    }
}

用意した型IntHeapをヒープとして使用するためには、Len, Swap, Less, Push, Popというメソッド(heap.Interfaceのメソッド群)を実装してあげる必要があります。

IntHeap.Lessで何やら順序付けを定義しているようです。 また、IntHeap.Pushは末尾に要素を追加しているだけ、IntHeap.Popは末尾の要素を削除しそれを返しているだけ。

なぜこれでIntHeapがヒープとして使えるようになるのか、よく分かりません。

そして、実際にIntHeapをヒープとして使うためにはheap.Initという関数を最初に呼ぶ必要があります。

その後、heap.Pushという関数でヒープに要素を追加し、heap.Popという関数を呼ぶことで最小値が順番に返ってくることが分かります。

さらに、heap.Pushで要素を追加したあとのqの中身を見てみると、昇順に要素がソートされているのではなく、[1, 2, 4, 5, 3]という謎の順番になっていることが分かります。

目標

今回はcontainer/heapのソースコードを読んでいくことで、中で何が行われているのかを理解することを目標とします。

(そしてコードを読むことは、一般にヒープというデータ構造がどのようなアルゴリズムを含んでいるのかを知る、とても良い機会になると思います。)

ヒープとは

まず、ソースコードを読む前にヒープがどのような性質を持つデータ構造であるかを説明します。

(二分木)ヒープは図のような、最も下のレベル以外が完全に埋まっている二分木と見なすことのできる構造をしています。

f:id:shoman_panda:20200516192731p:plain
ヒープの二分木表現

ヒープには2種類あり、それぞれminヒープ、maxヒープと呼ばれます。 どちらもヒープ条件(heap property)というものを満たしますが、条件はminヒープとmaxヒープで異なります。

minヒープの場合、任意のノードについて、

親ノードの値 \leq 子ノードの値

となります。 したがって、minヒープは根に最小の要素を格納し、あるノードを根とする部分木が含む値は、そのノード自身の値以上になると言えます。

maxヒープは逆で、根の値が最大になります。

親ノードの値 \geq 子ノードの値

よくヒープは木構造で表されることが多く、これはこのヒープ条件を視覚化する上で分かりやすいと思います。

ただ、木構造配列で表すことができ、実際の実装では配列を使用します1

f:id:shoman_panda:20200516203546p:plain
ヒープの配列表現

親ノードと子ノードはそれぞれ配列の添字を使うことで互いにアクセスできます。

配列の添字を0-origin(最初の要素の添字が0)としたとき、


\begin{eqnarray}
parent_i &=& (i-1)/2\\
left_i &=& 2*i+1\\
right_i &=& 2*i+2
\end{eqnarray}

が成り立ちます。

ここまで確認したところで、実際にcontainer/heapの実装を見ていきましょう!

container/heapのソースコード

まず、Interfaceという名前のinterfaceが定義されています。

type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

PushxLen()番目の要素(つまり末尾)として加えるメソッド、 PopLen()-1番目の要素(つまり最後の要素)を削除し返すメソッド と書かれています。

つまりは、このInterfaceを使う側はそのような処理をそれぞれPushPopとして 実装することを期待されています。

sort.Interfaceが埋め込まれていますが、これはsortパッケージに以下のように定義されています。

type Interface interface {
    // Len is the number of elements in the collection.
    Len() int
    // Less reports whether the element with
    // index i should sort before the element with index j.
    Less(i, j int) bool
    // Swap swaps the elements with indexes i and j.
    Swap(i, j int)
}

ここで、heap.Interfaceコメントにとても重要なことが書かれています。

このインターフェースを実装した全ての型は、以下の不変式を満たすminヒープとして使える
!h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() -- (★)

(★)はどういう意味でしょうか。 先ほどのIntHeapの例のように

func Less(i, j int) { return h[i] < h[j] }

と定義した場合を考えてみます。

すると、 !h.Less(j, i) <=> h[j] >= h[i] となり、これが 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() を満たす全ての(i, j)で成り立つと言っています。

2*i+1 <= j <= 2*i+2は先ほどの親ノードと子ノードの添字の関係を見ると、jiの子ノードに対応していることが分かります。

つまりこの場合、(★)は全てのノードについて、子ノードの値は親ノード以上であるというminヒープ条件を表しているのです!

順序関係を逆にして、

func Less(i, j int) { return h[i] > h[j] }

とした場合は、(★)はmaxヒープ条件を表すことになります。

すなわち、このheap.Interfaceを実装することで、常にminヒープ条件、またはmaxヒープ条件が満たされ、実装した型をヒープとして使える、と主張しているわけです。

ここまででheap.Interfaceの意味することは分かったのですが、それぞれのメソッドがどう使われているかはまだ謎です。 読み進めていきましょう。

次にInitという関数が定義されています。 これはIntHeapの例で見たように、最初に呼ばれるべき関数です。

func Init(h Interface) {
    // heapify
    n := h.Len()
    for i := n/2 - 1; i >= 0; i-- {
        down(h, i, n)
    }
}

何やらdownという関数を約半数の要素について呼び出しているようです。

一旦飛ばして次を見てみます。

次にはPushという関数が定義されています。

func Push(h Interface, x interface{}) {
    h.Push(x)
    up(h, h.Len()-1)
}

ここではまず、Interface型とinterface{}型の引数を受け取っています。

若干ややこしいですが、hは先ほど定義してあったInterfaceという名前の型の変数であり、 xinterface{}型の変数です。

interface{}型の変数はどのような型も取り得る型で、組み込みで定義されているものです。

さて、Pushではまずh.Push(x)が実行されています。 h.Push(x)は配列の最後に値を追加することを想定したメソッドです。

その後、up(h, h.Len()-1)を呼び出しています。

upはパッケージプライベートな関数で、以下のように定義されています。

func up(h Interface, j int) {
    for {
        i := (j - 1) / 2 // parent
        if i == j || !h.Less(j, i) {
            break
        }
        h.Swap(i, j)
        j = i
    }
}

実は、upはヒープのアルゴリズムを理解する上で非常に重要な役割を担っています。

まず、jの親ノードの添字をiに代入しています。

次のif文の条件には、i == j || !h.Less(j, i)とあります。

i == jは子ノードと親ノードの添字が等しいとき、つまりjが根であるときに満たされます。

そして2つ目の条件の!h.Less(j, i)ですが、これはまさに先ほど見たヒープ条件を表している式です。

つまり、jが根ノード、もしくはjが親ノードとヒープ条件を満たしている場合にbreakし実行を終了します。

逆にヒープ条件を満たさない場合はその次のh.Swap(i, j)が実行され、 i番目の要素とj番目の要素が交換されます。

これはつまり、jから見たときに自身と親ノードを交換することを意味します。

それにより、元々のjの位置にあるノードは親ノードとヒープ条件を満たすことができます。 しかし、元々のiの位置にあるノードはその親ノードとヒープ条件を満たすかは不明です。

f:id:shoman_panda:20200516215924p:plain
upの挙動

よって、j = iとすることで、元々iの位置にあったノードについて再度、ヒープ条件が成り立つかを確認していきます。

まとめると、upは引数のj番目のノードが親ノードとヒープ条件を満たしているかを確認し、 満たしていない場合は要素を交換する。そして、交換した場合は再帰的に親ノードとヒープ条件を満たしているかを確認していく、といった処理をしています。

Pushのコードに戻ってみましょう。

func Push(h Interface, x interface{}) {
    h.Push(x)
    up(h, h.Len()-1)
}

ここでは、up(h, h.Len()-1)が呼ばれています。

h.Len()-1は配列の最後の添字を表すので、jはたった今追加されたxを格納する場所を指します。

つまり、Pushは要素を追加し、その要素が親ノードとヒープ条件を満たすまで交換していくことで、 xがヒープ条件的に正しい位置に来るような処理をする関数ということが分かりました。

f:id:shoman_panda:20200516220550p:plain
Push(h, 2)のときのヒープの様子

次に出てくる関数はPopです。

func Pop(h Interface) interface{} {
    n := h.Len() - 1
    h.Swap(0, n)
    down(h, 0, n)
    return h.Pop()
}

この関数はヒープの根の要素を取り除き、その値を返す関数だと期待できます。

まず、n := h.Len() - 1によりnに「根を取り除く前の要素数-1」が代入されています。

その後、h.Swap(0, n)で根の要素と最後の要素(= 木構造の一番右の葉)を交換しています。

次にdown(h, 0, n)となっています。このdownInitでも出てきました。

そして最後にh.Popが呼ばれていることから、配列の最後の要素が削除され、返り値となることが分かります。

今、h.Swap(0, n)により、元々根にあった要素が配列の最後に移動しているため、 minヒープの場合は最小値が返ることになります。

逆に、元々最後にあった要素は根に移動したため、ほとんどの場合、 根とその子ノード間でヒープ条件が満たされてはいません。

f:id:shoman_panda:20200516221849p:plain
h.Swap(0, n)後のヒープ

それゆえにdownによって、ヒープ条件が再度成り立つよう、配列が整えられていると予想できます。

downupと同様に、ヒープを実装する上でとても重要な関数です。 コードを見てみましょう。

func down(h Interface, i0, n int) bool {
    i := i0
    for {
        j1 := 2*i + 1
        if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
            break
        }
        j := j1 // left child
        if j2 := j1 + 1; j2 < n && h.Less(j2, j1) {
            j = j2 // = 2*i + 2  // right child
        }
        if !h.Less(j, i) {
            break
        }
        h.Swap(i, j)
        i = j
    }
    return i > i0
}

まず、iが引数のi0で初期化され、ループに入ります。

j1 := 2*i+1と定義されていますが、これはj1iの左側の子ノードを表していると分かります。

その後のif文の条件式で、j1 >= n || j1 < 0となっていますが、 j1 < 0はコメントからもオーバーフローが起きた場合だと分かります。

j1 >= nについては一旦飛ばします。

次に、j2 := j1 + 1となっており、j2iの右側の子ノードを表すことが分かります。

j2 < nj1 >= nと同様に一度飛ばすとして、h.Less(j2, j1)ならばjj2となります。

jはその前の行でj1で初期化されているため、結果的にjにはminヒープの場合には小さい方の値が、 maxヒープの場合には大きい方の値が入ることになります。

そして、そのjiがヒープ条件を満たすかを!h.Less(j, i)によって確認しており、 もしも満たしている場合(つまり!h.Less(j, i)がtrueになる場合)にはbreakします。

もし、minヒープの場合に、子ノードの大きい方と交換すると仮定すると、 状況によってはヒープ条件を満たせない場合があるため、必ず小さい方と交換します。 maxヒープの場合についても同様です。

f:id:shoman_panda:20200516225239p:plain
downの挙動

jiがヒープ条件を満たしていない場合は、h.Swap(i, j)で要素を交換し、次のijとします。

つまり、自身と子ノードを交換し、再帰的にその子ノードと子ノードの子ノードとの間でヒープ条件が成り立つかを確認していきます。

upが自身と親ノードのヒープ条件の成立を確認する処理をしていたのに対し、 downは自身と子ノードのヒープ条件が成り立つかを確認します。

では、最初のif文の条件である、j1 >= nとはどういう意味でしょうか。

nPop内の呼び出しではh.Len() - 1でした。

downは自身と子ノードがヒープ条件を満たすまで再帰的に実行を続けますが、 この処理は削除される最後の要素には適応すべきではありません。

つまり、j1 >= nbreakすることは、ヒープ条件がn-1番目の要素まで成立すること保証しています。

f:id:shoman_panda:20200516223310p:plain
Pop(h)のときのヒープの様子

ここでnはヒープサイズと呼ばれます。 ヒープサイズをh-sizeとすると、h-size <= h.Len()が成り立ち、 h[0]..h[h-size-1]の要素がヒープ条件を満たすことを表します。

downは返り値としてi > i0を返してますが、これはiが更新されたとき、 つまりi0とその子ノードの交換が起きたときにtrueとなることが分かります。

これらより、Popは根の要素を返し、その後ヒープ条件を満たすように配列を整える処理をすることが分かりました。

downの処理が分かったところで、Initに戻ってみます。

func Init(h Interface) {
    // heapify
    n := h.Len()
    for i := n/2 - 1; i >= 0; i-- {
        down(h, i, n)
    }
}

この関数は実は、与えられた配列からヒープを構築する処理をしています。

n/2 - 1番目から0番目の要素について、自身と子ノードがヒープ条件を満たすよう調整をするため、 down(h, i, n)を呼んでいます。

ここで、nh.Len()となっていることから、ヒープサイズは配列のサイズと等しくなります。

ここで、n/2 - 1番目の要素は子ノードを持つ最も右側のノードを表します。

なぜなら、n/2以降の要素は、2*i+1, 2*i+2に対応する要素がなく、子ノードのない葉ノードになっているからです。

つまり、子ノードを持つ最も右側(木構造で表すと最も右下)のノードからヒープ条件を満たすよう、 整えていく処理をしていることが分かります。

f:id:shoman_panda:20200516235203p:plain
Init(h)のときのヒープの様子

それを根まで順番に行うことで、配列全体がヒープ条件を満たすことになります。

次に、 Removeを見ていきます。

func Remove(h Interface, i int) interface{} {
    n := h.Len() - 1
    if n != i {
        h.Swap(i, n)
        if !down(h, i, n) {
            up(h, i)
        }
    }
    return h.Pop()
}

まずnh.Len() - 1としていますが、Popのときと同様、要素を削除するためヒープサイズを配列のサイズ-1としています。

n == iのときは削除する要素が最後の要素であるため、そのままh.Pop()を呼べば良いです。

そうでない場合は、まずh.Swap(i, n)によってi番目の要素とn番目の要素(つまり最後の要素)を交換します。

そして次の行ですが、down(h, i, n)を呼んでおり、これがfalseの場合はup(h, i)の実行に移ります。

downi番目の要素とその子ノードのヒープ条件を整える関数でしたが、 すでに整っている場合はfalseが返ってくるのでした。

つまり、子ノードとヒープ条件が満たされている場合はupを呼ぶことで今度は親ノードとのヒープ条件が成立しているかを確認しているのです。

このようにdown -> upの順番で確認するため、downboolの返り値を返していたのです。 (up -> downの順で常に確認するのであれば、upが返り値を返すようにすれば問題なさそうです。)

いずれにせよ、元々最後の要素であった値は、downまたはupによってi番目からヒープ条件的に適切な位置へと移動することができます。

そして、h.Pop()によって最後の要素(元々i番目にあった要素)を削除し、それを返します。

この挙動からもわかるように、Pop(h)Remove(h, 0)と等しいです。

最後の関数として、Fixがあります。

func Fix(h Interface, i int) {
    if !down(h, i, h.Len()) {
        up(h, i)
    }
}

この関数は、i番目の要素の値が変更された後に呼ばれるべきものです。

ここまで読んでいただけた人であれば、Fixを理解するのはそんなに難しくないはずです。

更新した値がヒープ条件的に適切な位置に移動するよう、downまたはupを呼んでいます。

値を更新しただけなので、ヒープサイズは変わらずh.Len()となっていることに注意してください。

さて、これでcontainer/heapを全て読み終えることができました!

コードを読んでいくことで、Goのライブラリ実装だけではなく、一般にヒープがどういう動きをするかについても理解が深まったかと思います。

一見奇妙に見えたインターフェースも、中身の挙動を知ることで理にかなっていると感じます。

最初に出てきたIntHeapの例でなぜ、q[1, 2, 4, 5, 3]という順になっているか、今はよく分かると思います。

そして、Lessの実装を変えることで、minヒープとmaxヒープが同じコードで実装できる設計は美しいです2

ただ、なぜPushPopを指定されたように実装しなければならないのかを理解するには、 ヒープの動作を理解していないとよく分からないのではないかと思います。

この点は、ジェネリクスがないGoの辛い部分を表している一例かもしれません。

ヒープソートの実装

container/heapにはヒープソートは実装されていませんが、downを使うことで簡単に実装できます。

まず、maxヒープをcontainer/heapで実装することを考えます。

この場合、根にはヒープの最大値が含まれます。

それを最後の要素と交換し、新たな根の要素がヒープ条件を満たすようdownで調整します。 その際、ヒープサイズを1減らすことで、今度は2番目に大きい要素が根に来るようになります。

これを繰り返すことで、要素が昇順でソートされます。 このソートはその場で(in-place)で行われるため、空間計算量はO(1)となります3
実際のコードは以下のようになります。

func Sort(h Heap) {
    for i := h.Len() - 1; i > 0; i-- {
        h.Swap(0, i)
        down(h, 0, i-1)
    }
}

降順でソートしたい場合は、minヒープとなるようLessを変更すれば良いです。

up, down再帰関数表現

上で解説したように、up, downはそれぞれ親ノード、子ノードと自身のヒープ条件を満たすかを再帰的に確認しています。

実際の実装ではfor文によるループで実装されていますが、これらを再帰関数の形に変形することもできます。 変数名は適宜変更していますが、アルゴリズムは同じです。

func up(h Heap, i int) {
    parent := (i - 1) / 2
    if i == parent || !h.Less(i, parent) {
        return
    }
    h.Swap(parent, i)
    up(h, parent)
}

func down(h Heap, i, n int) bool {
    left := 2*i + 1
    if left >= n || left < 0 {
        return false
    }
    smaller := left
    if right := left + 1; right < n && h.Less(right, left) {
        smaller = right
    }
    if h.Less(smaller, i) {
        h.Swap(i, smaller)
        down(h, smaller, n)
        return true
    }
    return false
}

次回はヒープのそれぞれの処理の計算量を見積もっていきたいと思います。


  1. 構造体とポインタを使った実装も可能ですが、少なくともGoでは配列を使って実装する方がシンプルだと思います

  2. おそらく多くの言語のヒープ実装は同じようになっているかと思いますが。

  3. minヒープを作り、Popで取り出した要素を別のメモリに順番に追加していく方法でも昇順ソートは可能ですが、maxヒープを使う方法だとin-placeで昇順ソートが可能です。

AtCoder ABC165 C - Many Requirements

以下の条件を満たす数列Aを全て生成します。

  • Aは、長さNの正整数列である。
  • 1 ≤ A_1 ≤ A_2 ≤ \cdots ≤ A_N ≤ M

その後、それぞれに対しQ個ある制約を当てはめスコアを計算し、最大値を求めます。

atcoder.jp

上の条件を満たす数列の生成は深さ優先探索(DFS)でできます。

Goでの実装は以下です。

// 条件を満たす数列を全て生成
func dfs(seq []int, rst *[][]int) {
    if len(seq) == n {
        *rst = append(*rst, seq)
        return
    }

    // 1つ前の要素の数 (最初の要素の場合は1)
    var last int
    if len(seq) == 0 {
        last = 1
    } else {
        last = seq[len(seq)-1]
    }
    // last以上M以下の数を追加
    for i := last; i <= m; i++ {
        dfs(append(seq, i), rst)
    }
}

var n, m int

func main() {
    seq := make([]int, 0)
    rst := make([][]int, 0)
    dfs(seq, &rst)
    fmt.Println(rst)
    // n=m=3の場合: [[1 1 1] [1 1 2] [1 1 3] [1 2 2] [1 2 3] [1 3 3] [2 2 2] [2 2 3] [2 3 3] [3 3 3]]
}

あとはそれぞれの数列に対し、Q個の制約を当てはめ、スコアを計算していくだけです。

解答例の全体は以下のようになります。

type item struct {
    A, B, C, D int
}

var n, m int

// 条件を満たす数列を全て生成
func dfs(seq []int, rst *[][]int) {
    if len(seq) == n {
        *rst = append(*rst, seq)
        return
    }

    // 1つ前の要素の数 (最初の要素の場合は1)
    var last int
    if len(seq) == 0 {
        last = 1
    } else {
        last = seq[len(seq)-1]
    }
    // last以上M以下の数を追加
    for i := last; i <= m; i++ {
        dfs(append(seq, i), rst)
    }
}

// 数列の得点を制約より計算
func calcPoint(seq []int, items []item) int {
    point := 0
    for _, it := range items {
        if seq[it.B]-seq[it.A] == it.C {
            point += it.D
        }
    }
    return point
}

// 全ての数列の中から最大の得点を計算
func findMaxPoint(seqs [][]int, items []item) int {
    max := 0
    for _, seq := range seqs {
        if v := calcPoint(seq, items); v > max {
            max = v
        }
    }
    return max
}

func solve(items []item) int {
    seq := make([]int, 0)
    rst := make([][]int, 0)
    dfs(seq, &rst)
    return findMaxPoint(rst, items)
}

func main() {
    n, m = nextInt(), nextInt()
    q := nextInt()
    items := make([]item, q)
    for i := 0; i < q; i++ {
        items[i] = item{nextInt() - 1, nextInt() - 1, nextInt(), nextInt()}
    }
    ans := solve(items)
    fmt.Println(ans)
}

この問題は制約側を先に処理しようとするとハマると思います…

Golangのjson.UnmarshalでJSONをデコードできない

問題

以下のコードで、Person構造体のp.nameに"hoge"が入ることを期待してました。

package main

import (
    "encoding/json"
    "fmt"
)

type Person struct {
    name string `json:"name"`
}

func main() {
    str := `{"name": "hoge"}`
    var p Person
    if err := json.Unmarshal([]byte(str), &p); err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(p.name)
}

しかし、実際にはp.nameは空となってしまいます。

結論

フィールドの先頭を大文字にしよう

type Person struct {
    Name string `json:"name"`
}

これでp.Nameにちゃんと"hoge"が入ります。

当たり前すぎて……ただ、json.Unmarshalは対応するフィールドが見つからなくてもエラーは出さないため、時間を取られてしまいました。

AtCoder ABC157 D - Friend Suggestions BFSとUnionFind

問題概要

SNSN人の人がいます。 このN人の間には、 M組の「友達関係」と、 K組の「ブロック関係」が存在します(いずれも双方向)。 各人において、友達の友達の…とたどって行き着く人のうち、直接の友達でもブロック関係にもない人(友達候補)の数を求めてください。

atcoder.jp

前置き

この問題はUnionFindを使う解法が最も多そうです。 UnionFindについてはこれがとても分かりやすかったです。

qiita.com

ただ自分はUnionFindを知らなかったため、幅優先探索(BFS)で解きました

(UnionFindでの実装は最後に載せてあります。)

各連結成分を求めてあげて、その中で自分と友達でもなく、ブロック関係でもない人の数を求めれば良いので、

  • (人iの友達候補の数) = (人 i の連結成分のサイズ) − (人i と人j が同じ連結成分に含まれて、人 i と人 j が友達関係もしくはブロック関係にあるような j の数) −1 (自分自身)

となります。

解法概要

まず、友達関係の入力から無向グラフを作成します。

その後、ノードiの連結成分を求めるため、ノードiを始点としたBFSによって隣接しているノードを探索していきます。 もしも自分が含まれている連結成分が、同じ連結成分の他のノードを始点としたBFSによってすでに求まっている場合は、それを再利用します。

データ構造としては、 sets map[*Node]map[*Node]bool という、ハッシュを入れ子にした形を使います。 sets[people[i]] とすると、iの連結成分の要素をハッシュとして取り出せます。

また、人iの直接の友達は、ノードiの隣接ノードなので、その数はすぐ求まります。

さらに、ブロック関係については、人iと同じ連結成分に含まれているとき(sets[people[i]]の要素であるとき)のみ、数をカウントしていきます。

以上によって必要な要素は求まります。

コード

言語はGoです。Goにはキューが標準ライブラリに用意されていないため、自分で実装しています(とは言っても簡単にできます)。

findSetsという関数によって、グラフの連結成分の集合(sets)を求めている部分が今回の肝です。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
)

type Queue struct {
    data []interface{}
}

func (q *Queue) Push(v interface{}) {
    q.data = append(q.data, v)
}

func (q *Queue) Pop() (interface{}, error) {
    if q.Empty() {
        return nil, fmt.Errorf("queue is empty")
    }
    v := q.data[0]
    q.data = q.data[1:]
    return v, nil
}

func (q *Queue) Empty() bool {
    return len(q.data) == 0
}

func NewQueue() *Queue {
    return &Queue{make([]interface{}, 0)}
}

type Graph struct {
    Nodes []*Node
}

type Node struct {
    Val    interface{}
    Neighs []*Node
}

func dfs(g *Graph, node *Node, groups map[*Node]*Node) map[*Node]bool {
    set := make(map[*Node]bool)
    q := NewQueue()
    q.Push(node)
    set[node] = true
    for !q.Empty() {
        src, _ := q.Pop()
        // Queueの要素がinterface{}なので型キャストが必要
        if v, ok := src.(*Node); ok {
            for _, neigh := range v.Neighs {
                if !set[neigh] {
                    q.Push(neigh)
                    set[neigh] = true
                    groups[neigh] = node
                }
            }
        }
    }
    return set
}

func findSets(g *Graph) map[*Node]map[*Node]bool {
    sets := make(map[*Node]map[*Node]bool)
    groups := make(map[*Node]*Node)
    nodes := g.Nodes
    for _, node := range nodes {
        // 同じ連結成分の他のノードによって求まっている場合、それを再利用
        if leader, ok := groups[node]; ok {
            sets[node] = sets[leader]
            continue
        }
        sets[node] = dfs(g, node, groups)
    }
    return sets
}

var sc = bufio.NewScanner(os.Stdin)

func nextInt() int {
    sc.Scan()
    i, e := strconv.Atoi(sc.Text())
    if e != nil {
        panic(e)
    }
    return i
}

func main() {
    sc.Split(bufio.ScanWords)
    n, m, k := nextInt(), nextInt(), nextInt()

    people := make([]*Node, n)
    for i := 0; i < n; i++ {
        people[i] = &Node{i, make([]*Node, 0)}
    }

    for i := 0; i < m; i++ {
        a, b := nextInt()-1, nextInt()-1

        people[a].Neighs = append(people[a].Neighs, people[b])
        people[b].Neighs = append(people[b].Neighs, people[a])
    }

    g := &Graph{people}

    // 連結成分を求める
    sets := findSets(g)

    blocked := make(map[*Node]int)
    for i := 0; i < k; i++ {
        c, d := nextInt()-1, nextInt()-1

        setc := sets[people[c]]
        if setc[people[d]] {
            // 同じ連結成分に含まれるときだけカウント
            blocked[people[c]]++
        }

        setd := sets[people[d]]
        if setd[people[c]] {
            // 同じ連結成分に含まれるときだけカウント
            blocked[people[d]]++
        }
    }

    for i := 0; i < n; i++ {
        pi := people[i]
        set := sets[pi]
        // 上の式そのまま
        fmt.Printf("%v ", len(set)-len(pi.Neighs)-blocked[pi]-1)
    }
    fmt.Println()
}

感想

  • BFSによって連結成分を求める、という部分は他の問題にも応用できそう?
  • 最初、入力をfmt.Scan()で受けていたら、そこがボトルネックになってTLEになっていたので注意…ちゃんとbufioを使おう
  • グラフとBFSの良い練習になりました

おまけ

勉強がてらUnionFindでも実装してみたので、載せておきます。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
)

type UnionFind struct {
    Par   []int
    Ranks []int
    Sizes []int
}

func NewUnionFind(n int) *UnionFind {
    uf := &UnionFind{make([]int, n), make([]int, n), make([]int, n)}
    for i := range uf.Par {
        uf.Par[i] = i
        uf.Ranks[i] = 0
        uf.Sizes[i] = 1
    }
    return uf
}

func (uf *UnionFind) Root(x int) int {
    if x == uf.Par[x] {
        return x
    }
    uf.Par[x] = uf.Root(uf.Par[x]) // 経路圧縮
    return uf.Par[x]
}

func (uf *UnionFind) Unite(x, y int) {
    rx := uf.Root(x)
    ry := uf.Root(y)
    if rx == ry {
        return
    }
    if uf.Ranks[rx] < uf.Ranks[ry] {
        uf.Par[rx] = ry
        uf.Sizes[ry] += uf.Sizes[rx]
    } else {
        uf.Par[ry] = rx
        uf.Sizes[rx] += uf.Sizes[ry]
        if uf.Ranks[rx] == uf.Ranks[ry] {
            uf.Ranks[rx]++
        }
    }
}

func (uf *UnionFind) Same(x, y int) bool {
    return uf.Root(x) == uf.Root(y)
}

func (uf *UnionFind) Size(x int) int {
    return uf.Sizes[uf.Root(x)]
}

var sc = bufio.NewScanner(os.Stdin)

func nextInt() int {
    sc.Scan()
    i, e := strconv.Atoi(sc.Text())
    if e != nil {
        panic(e)
    }
    return i
}

func main() {
    sc.Split(bufio.ScanWords)
    n, m, k := nextInt(), nextInt(), nextInt()
    uf := NewUnionFind(n)

    friends := make(map[int]int)
    for i := 0; i < m; i++ {
        a, b := nextInt()-1, nextInt()-1
        uf.Unite(a, b)
        friends[a]++
        friends[b]++
    }

    // Sizesをufに持たせない場合
    // sizes := make([]int, n)
    // for i := range sizes {
    //     r := uf.Root(uf.Par[i])
    //     sizes[r]++
    // }

    blocks := make([]int, n)
    for i := 0; i < k; i++ {
        c, d := nextInt()-1, nextInt()-1
        if uf.Same(c, d) {
            blocks[c]++
            blocks[d]++
        }
    }

    for i := 0; i < n; i++ {
        ans := uf.Size(i) - friends[i] - blocks[i] - 1
        fmt.Printf("%v ", ans)
    }
    fmt.Println()
}

最小値をO(1)で返すスタックの実装

概要

前の記事でGoによるスタックの実装を紹介しました。

shoman.hatenablog.com

そんな中でCracking the Coding Interviewの3.2の問題で以下のようなものがありました。

問題

pushとpopに加えて、最小の要素を返す関数minを持つスタックをどのようにデザインしますか?ただしpush, pop, min関数はすべてO(1)の実行時間になるようにしてください。

世界で闘うプログラミング力を鍛える本 ~コーディング面接189問とその解法~

世界で闘うプログラミング力を鍛える本 ~コーディング面接189問とその解法~

pushまたはpopする際に毎回最小値を探していると、実行時間はO(n)になってしまい条件に合いません。

O(1)で最小値を得るためには、スタックの各要素に、その要素以下における最小値を保存しておけば良いです。

解法

前回実装したスタックを少し修正します。

type Stack 修正前

type Stack []int

type Stack 修正後

type data struct {
    val int
    min int
}

スタックに格納するintとともに、その要素以下の最小値minをスタックに積みます。

Push 修正前

func (s *Stack) Push(v int) {
    *s = append(*s, v)
}

Push 修正後

func (s *Stack) Push(v int) {
    min, err := s.Min()
    if err == nil && v < min {
        min = v
    }
    d := data{v, min}
    *s = append(*s, d)
}

要素をpushする際に、push前のトップの要素に含まれる最小値と、新たにpushする値を比較します。 新たな値の方が小さい場合はそれを新たな要素のminとし、 新たな値の方が大きい場合はpush前のトップのminを新たな要素のminとします。

Min 追加

func (s *Stack) Min() (int, error) {
    if s.Empty() {
        return 0, fmt.Errorf("stack is empty")
    }

    return (*s)[len(*s)-1].min, nil
}

トップ要素のminを返します。

実行例

func main() {
    s := GenStack()

    s.Push(0)
    s.Push(-1)
    s.Push(1)
    fmt.Println(*s) // [{0 0} {-1 -1} {1 -1}]

    min, _ := s.Min()
    fmt.Println(min) // -1
}

コメント

  • 最小値を保存しておく別のスタックを用意し、更新されたときだけpush, popする解法もあります