開発^3

Web開発、宇宙開発、ゲーム開発の3種類についてつらつらと

caolan/asyncを使ったNode.jsのフロー制御

JavaScript Advent Calendar 2011 (Node.js/WebSocketsコース)の16日目です。

@koichikさんの13日目の記事にも書かれているように、Node.jsにて非同期処理を管理する方法には以下の2種類があります

  • 非同期処理の呼び出し時にコールバック関数を与える。処理が終わったらエラーの有無と、結果が通知される。
  • EventEmitterに対してon/onceでEventListenerをセットして非同期処理を実行する。非同期処理の実行によって発生したイベントやエラーはリスナーに通知される。

後者の説明はkoichikさんの記事にお任せして、この記事は前者を扱うライブラリcaolan/asyncについてみていきたいと思います。

caolan/asyncって?

https://github.com/caolan/async で公開されているフロー制御ライブラリです。
自分はNode.jsで使っていますが、ブラウザ上で動くJavaScriptを書く場合にも使用できるようです。

https://github.com/joyent/node/wiki/modules#wiki-async-flowに記載されているNode.js用フロー制御モジュールは既に50を超えていますが、試してみた中では一番使いやすいモジュールでした。
# さすがに50すべては試せていないです。githubのwatcher数1000オーバー、folk数62ということで少なくともフロー制御モジュールの最大手ではあるはず。

asyncが実現してくれるフロー制御の一部はまさに「非同期プログラミングの改善」のエッセンスの内容なので、先に読んでおくと理解が深まるかと思います。

何をしてくれる?

asyncがやってくれる処理には様々なものがありますが、まずはasync#seriesを例にして説明していきたいと思います。
hogeディレクトリを掘って、piyoディレクトリを掘って、その下にデータを書きだすコードをコールバックを使って書くと以下のようになります。
(ディレクトリ堀は再帰的にやってくれるモジュールがありますが見なかった方向で)

var fs = require('fs');
fs.mkdir('./hoge', function(err) {
  if(err) {
    // エラー処理
    return;
  }
  
  fs.mkdir('./hoge/piyo', function(err) {
    if(err) {
      // エラー処理
      return;
    }   

    fs.writeFile('./hoge/piyo/neko', 'Hello Node.js', function(err) {
      if(err) {
        // エラー処理
        return;
      }   
      console.log('success');
    }); 
  }); 
});

この例のように、非同期処理の終了を待ってから次の処理をさせる場合、ネストが深くなる、エラー処理があちこちに分散するといった問題点があります。
では、このコードをasyncを使った形で書き直してみましょう。

var fs = require('fs');
var async = require('async');

var tasks = []; 
tasks.push(function(next) { fs.mkdir('./hoge', next); }); 
tasks.push(function(next) { fs.mkdir('./hoge/piyo', next); }); 
tasks.push(function(next) { fs.writeFile('./hoge/piyo/neko', 'Hello Node.js', next); }); 

async.series(tasks, function(err, results) {
  if(err) {
    // エラー処理
    return;
  }
  console.log('success');
  console.log(results);     //  [undefined, undefined, undefined]
});

はい、エラー処理が一カ所にまとまりました。
ネストが深くなることもなく、処理もtasksにまとまっていて良い感じです。

async#seriesは第一引数で与えられたtasksの処理を順番に実行してくれるメソッドです。
tasksに格納された各処理は、自分の処理が終わった時点でnext()を呼び出し、「エラーの有無」と「自分の処理結果」を通知します。
エラーが発生しなかった場合には自動的に次の処理が呼び出され、 エラーが発生した場合には、それより先の処理は実行されずにasync.seriesの第二引数に渡された関数が呼び出されます。
errには発生したエラー(エラーなしの場合はnull)が入り、resultsには各処理の処理結果がまとめて入ります。
今回使ったfs#mkdirやfs#writeFileはエラーの有無のみを通知する為、resultsに関しては次項の例を見てもらった方が分かりやすいかと思います。*1

async#series, async#parallel

asyncモジュールにはseries以外にも様々なメソッドが用意されています。
ここからは、それらの処理の流れを図で表しながら簡単に説明していきたいと思います。

まずはasync#seriesです。
先ほど説明した通り、一つの処理が終わってから次の処理が行われ、それぞれの処理結果がendに集約されています。


凡例:
・青い四角:処理
・赤い四角:データ
・青線:処理の流れ
・赤線:データの流れ
・finish:async#****を呼び出すときに与えられるcallback関数
(エラー発生時か、全部の処理が終わった後に呼ばれる関数)

# 図はCacooにて作成。Excelは図を描くツールではないのです。

つづいて、async#parallel。こちらは全部の処理が同時に実行され、すべてが終わった時点でendが呼ばれます。
この場合もseriesと同じく、それぞれの処理結果はendに集約されています。

async#seriesとasync#parallelの違いを見るために「delayミリ秒待ってからtextの内容を出力し、処理結果としてresultを返す関数」を作って試してみます。

function delayedPrint(delay, text, result) {
  return function(next) {
    setTimeout(function() {
      util.log(text);
      next(null, result);
    }, delay);
  }
}

var tasks = []; 
tasks.push(delayedPrint(3000, 'test1', 'result1'));
tasks.push(delayedPrint(2000, 'test2', 'result2'));
tasks.push(delayedPrint(1000, 'test3', 'result3'));
async#seriesの場合
util.log('start');
async.series(tasks, function(err, results) {
  util.log('true end');
  console.log(results);
});

util.log('end?  ... NO. while processing.');

実行結果:

16 Dec 01:39:13 - start
16 Dec 01:39:13 - end?  ... NO. while processing.
16 Dec 01:39:16 - test1
16 Dec 01:39:18 - test2
16 Dec 01:39:19 - test3
16 Dec 01:39:19 - true end
[ 'result1', 'result2', 'result3' ]
async#parallelの場合
util.log('start');
async.series(tasks, function(err, results) {
  util.log('true end');
  console.log(results);
});

util.log('end?  ... NO. while processing.');

実行結果:

16 Dec 01:40:15 - start
16 Dec 01:40:15 - end?  ... NO. while processing.
16 Dec 01:40:16 - test3
16 Dec 01:40:17 - test2
16 Dec 01:40:18 - test1
16 Dec 01:40:18 - true end
[ 'result1', 'result2', 'result3' ]

これらの結果から以下のことが分かります。

  • end?の時点ではまだ処理が終わっていない。全部の処理が終了したのはtrue endの時点。
  • startからendまでの時間がseriesでは直列実行の為、3 + 2 + 1 = 6秒
  • startからendまでの時間がparallelでは並列実行の為で、max(3, 2, 1) = 3秒
  • parallelの例を見るとわかるようにresultsに入ってくるのは処理の完了順ではなくtasksの順

async#waterfall

async#waterfallを図に表すと以下のようになります。

処理の流れ自体はseriesと同等ですが、seriesと違い、処理結果が次の処理に与えられます。
便利なメソッドであることは確かなのですが、このメソッドに関しては気になっていることがある為、後日改めてエントリを起こそうと思います。

async#forEach, async#forEachSeries

前項で紹介したseries, parallel, waterfallは複数の処理の流れを制御する形でしたが、配列の各要素に対して同じ処理を実行することも良くあります。
同期処理であればArray#forEachを使えばよいのですが、その中に非同期処理が含まれている場合、Array#forEachから処理が返った時点では配列の各要素に対する処理が終わっていません。
asyncではこういった場合に使えるメソッドとしてforEach, forEachSeriesというメソッドを用意しています。

名前からもわかるようにforEachは並列処理を行い、forEachSeriesは直列に処理を行います。



async#map, async#mapSeries

forEachやforEachSeriesは配列の各要素に対して処理を行いますが、処理結果を返す機能は持っていません。
配列の要素を変換したい場合などにはArray#mapと同じく、async#mapを用います。
これもforEachと同じくasync#mapとasync#mapSeriesの二つが用意されているため、変換ロジックの重さに応じて好きな方を選ぶことができます。



ほかの関数

今回は紹介しきれませんでしたが、caolan/asyncには他にも以下のようなメソッドを提供しています。

  • makeファイルのように各タスクに前提条件を指定すると、自動的に順番を組み立ててくれるauto
  • forEach/mapのようにArrayが提供していた処理をやってくれるfilter, filterSeries, reject, rejectSeries, reduce, reduceRight, concat, concatSeries, some, any, every, all
  • 条件を満たす間 or 満たすまで処理を続けるwhilst, until
  • 並列で実行する最大数を指定して大量のタスクを処理させられるqueue
  • 重い処理の処理結果を自動的にキャッシュして二回目以降の呼び出しを軽くするmemoize

これらのメソッドとEventEmitterをうまく使うことで処理の流れを整理し、処理の流れが分かりやすいコードを目指しましょう!

明日は@n_matsuiさんです。

*1:例選びに失敗した感がひしひしと