Agile Cat — Azure & Hadoop — Talking Book

September 25, 2009

The Anatomy of Hadoop I/O Pipeline _1

Filed under: Hadoop I/O Pipeline — Agile Cat @ 8:14 am
Tags: , , , , ,

 

YDN Hadoop and Distributed Computing at Yahoo!

Hadoop and Distributed Computing at Yahoo!

August 27, 2009

From <http://developer.yahoo.net/blogs/hadoop/>

今日から何回かに分けて、Hadoop I/O Pipeline に関する対訳を掲載していきます。この領域に関するドキュメントは初めてのものらしく、読んでいても解らないことばかりです。 訳について、問題などありましたら、また、こう読むべきみたいなアドバイスがありましたら、ぜひコメントをつけてください。 Windows Azure + Dryad につながる知識が共有できればと思います。 --- A.C.

Introduction

In a typical Hadoop MapReduce job, input files are read from HDFS. Data are usually compressed to reduce the file sizes. After decompression, serialized bytes are transformed into Java objects before being passed to a user-defined map() function. Conversely, output records are serialized, compressed, and eventually pushed back to HDFS. This seemingly simple, two-way process is in fact much more complicated due to a few reasons:

Hadoop MapReduce における一般的なジョブでは、入力ファイルは HDFS からが読み込まれる。 そのファイル・サイズを低減するために、通常ではデータの圧縮と解凍が行われた後に、シリアライズさたバイト列が Java オブジェクトに変換され、ユーザー定義された map () 関数に引き渡される。 その反対に、出力レコードにもシリアライズと圧縮が行われ、最終的に HDFS の中にプッシュバックされる。この双方向のプロセスはシンプルに見えるが、いくつかの理由により、実際には複雑なものとなる:

  • Compression and decompression are typically done through native library code.
  • End-to-end CRC32 checksum is always verified or calculated during reading or writing.
  • Buffer management is complicated due to various interface restrictions.
  • 一般的には、ネイティブのライブラリ・コードを介して実行される圧縮と解凍
  • 一般的には、Read/Write の際に常に検証/計算される End-to-End CRC32 チェックサム
  • 多様な制約事項により、複雑化していくバッファ管理

In this blog post, I attempt to decompose and analyze the Hadoop I/O pipeline in detail, and explore possible optimizations. To keep the discussion concrete, I am going to use the ubiquitous example of reading and writing line records from/to gzip-compressed text files. I will not get into details on the DataNode side of the pipeline, and instead mainly focus on the client-side (the map/reduce task processes). Finally, all descriptions are based on Hadoop 0.21 trunk at the time of this writing, which means you may see things differently if you are using older or newer versions of Hadoop.

このブログ・ポストで試みるのは、Hadoop I/O パイプラインに関する詳細な分解と分析であり、また、実現可能な最適化の追求である。 その説明を具体的にしていくために、gzip 圧縮されたテキスト・ファイルを対象として、ライン・レコードを Read/Write するサンプルを、随所で利用していく。 また、パイプラインの DataNode 側の詳細には入らない代わりに、主としてクライアント側(map/reduce タスク・プロセス)に焦点を合わせる。この執筆は Hadoop 0.21 の主要部分に基づいているため、あなたの利用しているバージョンとの間で、若干の違いが生じるかもしれない。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

September 24, 2009

The Anatomy of Hadoop I/O Pipeline _2

Filed under: Hadoop I/O Pipeline — Agile Cat @ 8:14 am
Tags: , , , , ,

From <http://developer.yahoo.net/blogs/hadoop/>

Reading Inputs

Figure 1 illustrates the I/O pipeline when reading line records from a gzipped text file using TextInputFormat. The figure is divided in two sides separated by a thin gap. The left side shows the DataNode process, and the right side the application process (namely, the Map task). From bottom to top, there are three zones where buffers are allocated or manipulated: kernel space, native code space, and JVM space. For the application process, from left to right, there are the software layers that a data block needs to traverse through. Boxes with different colors are buffers of various sorts. An arrow between two boxes represents a data transfer or buffer-copy. The weight of an arrow indicates the amount of data being transferred. The label in each box shows the rough location of the buffer (either the variable that references to the buffer, or the module where the buffer is allocated). If available, the size of a buffer is described in square brackets. If the buffer size is configurable, then both the configuration property and the default size are shown. I tag each data transfer with the numeric step where the transfer happens:

Figure 1 が示すのは、gzip されたテキスト・ファイルから、TextInputFormat を使ってラインレコードを読み込むときの I/O パイプラインである。 この図は、左右の 2つのパートに分割されている。 左側は DataNode プロセスを示し、右側はアプリケーション・プロセス(すなわち Mapタスク)を示す。ボトムらトップへ向けて、Kernel/Native/JVM の 3つのゾーンがあり、それぞれにバッファが割り当てられ操作される。 アプリケーション・プロセスに関しては、左から右へ向けて、データ・ブロックが横断しなくてはならないソフトウェア・レイヤが存在する。それぞれのカラーで色づけされたボックスは、各種のソートに用いるバッファである。ボックス間を結ぶ矢印ラインは、データ転送あるいはバッファコピーを示す。矢印ラインの太さは、それぞれのデータ量を示す。それぞれのボックス内のラベルは、バッファの大まかなロケーション(バッファに参照する変数あるいは、バッファが割り当てられるモジュール)を示す。記載が可能な場合には、バッファ・サイズをカッコ内に記している。バッファ・サイズをコンフィグレーションできる場合には、そのプロパティとデフォルト・サイズの双方を示す。データが転送される際の、それぞれのステップに対して、以下のタグを付けている:

Hadoop IO_1


Figure 1: Reading line records from gzipped text files.

  1. Data transferred from DataNode to MapTask process. DBlk is the file data block; CBlk is the file checksum block. File data are transferred to the client through Java nio transferTo (aka UNIX sendfile syscall). Checksum data are first fetched to DataNode JVM buffer, and then pushed to the client (details are not shown). Both file data and checksum data are bundled in an HDFS packet (typically 64KB) in the format of: {packet header | checksum bytes | data bytes}.
  2. Data received from the socket are buffered in a BufferedInputStream, presumably for the purpose of reducing the number of syscalls to the kernel. This actually involves two buffer-copies: first, data are copied from kernel buffers into a temporary direct buffer in JDK code; second, data are copied from the temporary direct buffer to the byte[] buffer owned by the BufferedInputStream. The size of the byte[] in BufferedInputStream is controlled by configuration property “io.file.buffer.size”, and is default to 4K. In our production environment, this parameter is customized to 128K.
  3. Through the BufferedInputStream, the checksum bytes are saved into an internal ByteBuffer (whose size is roughly (PacketSize / 512 * 4) or 512B), and file bytes (compressed data) are deposited into the byte[] buffer supplied by the decompression layer. Since the checksum calculation requires a full 512 byte chunk while a user’s request may not be aligned with a chunk boundary, a 512B byte[] buffer is used to align the input before copying partial chunks into user-supplied byte[] buffer. Also note that data are copied to the buffer in 512-byte pieces (as required by FSInputChecker API). Finally, all checksum bytes are copied to a 4-byte array for FSInputChecker to perform checksum verification. Overall, this step involves an extra buffer-copy.
  4. The decompression layer uses a byte[] buffer to receive data from the DFSClient layer. The DecompressorStream copies the data from the byte[] buffer to a 64K direct buffer, calls the native library code to decompress the data and stores the uncompressed bytes in another 64K direct buffer. This step involves two buffer-copies.
  5. LineReader maintains an internal buffer to absorb data from the downstream. From the buffer, line separators are discovered and line bytes are copied to form Text objects. This step requires two buffer-copies.

 

  1. DataNode から MapTask プロセスへ向けた、データの転送。 DBlk はファイルのデータ・ブロックであり、CBlk はファイルのチェックサム・ブロックである。ファイル・データは、Java nio transferTo (UNIX sendfile syscall のこと)を介してクライアントに転送される。最初にチェックサム・データが DataNode JVM バッファにフェッチされ、続いてクライアント(細部は示していない)にプッシュされる。 ファイルとチェックサムのデータが {packet header | checksum bytes | data bytes} のフォーマットにより、HDFS パケット(一般は 64kb)にまとめられる。
  2. ソケットから受信されたデータは、おそらく、カーネルに対する syscalls 数を減らすことを目的として、 BufferedInputStream 内のバッファに入れられる。 実際には、2つのバッファを用いたコピーが行われる。データは最初に、JDK コードによるカーネル・バッファから、テンポラリ・バッファへダイレクトにコピーされる。続いて、テンポラリ・バッファ内のデータは、BufferedInputStream が管理する byte [] バッファにコピーされる。 BufferedInputStream の byte [] のサイズは、コンフィグレーション・プロパティである “io.file.buffer.size” により制御されるが、デフォルトは 4K となっている。 私たちの実運用環境では、このパラメータを128K にカスタマイズした。
  3. BufferedInputStream を介することで、チェックサム・バイトは(その大まかなサイズは(PacketSize / 512 * 4)あるいは 512 B )内部の ByteBuffer 内に保存される。そして、ファイル・バイト(圧縮されたデータ)は、解凍レイヤが提供する byte [] バッファ内に堆積していく。 ユーザー・リクエストはチャンクの境界線をそろえないかもしれないが、チェックサムの計算ではフルの 512 byte チャンクが必要とされるため、ユーザーが提供する byte [] バッファ内に、部分的なチャンクをコピーしてしまう前に、512 byte [] バッファが使用される。 さらに、(FSInputChecker API が必要とする場合には)512 byte 単位のバッファに、データがコピーされることに注意すべきだ。 最終的に、FSInputChecker によるチェックサム検査を実現するために、すべてのチェックサム・バイトは 4 byte 配列内にコピーされる。 全体的に、このステップでは、余分のバッファ・コピーが生じる。
  4. 解凍レイヤでは、DFSClient レイヤからのデータを受信するために、byte [] バッファが使用される。 DecompressorStream では、byte [] バッファから 64K のダイレクト・バッファにデータをコピーし、データを解凍するためにネイティブ・ライブラリ・コードをコールし、解凍されたデータを別の 64K ダイレクト・バッファにストアする。 このステップでは、2つのバッファ・コピーが生じる。
  5. LineReader はダウンストリームからのデータを取り入れるために、内部バッファを保持する。 バッファから、ライン・セパレータが発見され、ライン・バイトが Text オブジェクトを形成するためにコピーされる。このステップでは、2つのバッファ・コピーが必要となる。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

September 23, 2009

The Anatomy of Hadoop I/O Pipeline _3

Filed under: Hadoop I/O Pipeline — Agile Cat @ 8:13 am
Tags: , , , , ,

From <http://developer.yahoo.net/blogs/hadoop/>

Optimizing Input Pipeline

Adding everything up, including a “copy” for decompressing bytes, the whole read pipeline involves seven buffer-copies to deliver a record to MapTask’s map() function since data are received in the process’s kernel buffer. There are a couple of things that could be improved in the above process:

バイト列を解凍するための ”Copy” も含めて、すべてを合算すると、データがプロセスのカーネル・バッファに受信されてから、MapTask の map () 関数にレコードを受け渡すまでに、Read パイプラインの全体で7つのバッファ・コピーを伴うことになる。 上記のプロセスにおいて、改善が可能な2つのものがある:

  • Many buffer-copies are needed simply to convert between direct buffer and byte[] buffer.
  • Checksum calculation can be done in bulk instead of one chunk at a time.
  • 大量のバッファ・コピーが、ダイレクト・バッファと byte[] バッファ間での、シンプルな変換を必要とする。
  • チェックサムの計算は、チャンクごとではなく、バルクでの一括処理が可能である。

Hadoop IO_2

Figure 2: Optimizing input pipeline.

Figure 2 shows the post-optimization view where the total number of buffer copies is reduced from seven to three:

Figure 2 が示すのは、バッファ・コピーの回数を 7回から 3回に減らす場合の、ポスト・オプティマイゼーションの様子である:

  1. An input packet is decomposed into the checksum part and the data part, which are scattered into two direct buffers: an internal one for checksum bytes, and the direct buffer owned by the decompression layer to hold compressed bytes. The FSInputChecker accesses both buffers directly.
  2. The decompression layer deflates the uncompressed bytes to a direct buffer owned by the LineReader.
  3. LineReader scans the bytes in the direct buffer, finds the line separators from the buffer, and constructs Text objects.

 

  1. 入力パケットは、チェックサム・バイトのためのインターなものと、圧縮されたバイト列を保持する解凍レイヤが管理するダイレクト・バッファで構成される、2つのダイレクト・バッファの中で、チェックサム部分とデータ部分に分解される。 FSInputChecker は、双方のバッファに対して、ダイレクトにアクセスする。
  2. この解凍レイヤは、LineReader が管理するダイレクト・バッファへ向けて、圧縮されていないバイト列を収縮させる。
  3. LineReader は、ダイレクト・バッファ内のバイト列をスキャンし、バッファからライン・セパレータを探し出し、Text オブジェクトを構成する。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

September 22, 2009

The Anatomy of Hadoop I/O Pipeline _4

Filed under: Hadoop I/O Pipeline — Agile Cat @ 8:12 am
Tags: , , , , ,

From <http://developer.yahoo.net/blogs/hadoop/>

Writing Outputs

Now let’s shift gears and look at the write-side of the story. Figure 3 illustrates the I/O pipeline when a ReduceTask writes line records into a gzipped text file using TextOutputFormat. Similar to Figure 1, each data transfer is tagged with the numeric step where the transfer occurs:

ここで視点を変えて、Write 側からのストーリーを追いかけていく。 Figure 3 が示すのは、ReduceTask が TextOutputFormat を用いて、gzip されたテキスト・ファイルにライン・レコードを書き込む様子である。 Figure 1 のように、それぞれのデータ転送について、そのステップ順を示すタグを付けている:

Hadoop IO_3 Figure 3: Writing line records into gzipped text files.

  1. TextOutputFormat’s RecordWriter is unbuffered. When a user emits a line record, the bytes of the Text object are copied straight into a 64KB direct buffer owned by the compression layer. For a very long line, it will be copied to this buffer 64KB at a time for multiple times.
  2. Every time the compression layer receives a line (or part of a very long line), the native compression code is called, and compressed bytes are stored into another 64KB direct buffer. Data are then copied from that direct buffer to an internal byte[] buffer owned by the compression layer before pushing down to the DFSClient layer because the DFSClient layer only accepts byte[] buffer as input. The size of this buffer is again controlled by configuration property “io.file.buffer.size”. This step involves two buffer-copies.
  3. FSOutputSummer calculates the CRC32 checksum from the byte[] buffer from the compression layer, and deposits both data bytes and checksum bytes into a byte[] buffer in a Packet object. Again, checksum calculation must be done on whole 512B chunks, and an internal 512B byte[] buffer is used to hold partial chunks that may result from compressed data not aligned with chunk boundaries. Checksums are first calculated and stored in a 4B byte[] buffer before being copied to the packet. This step involves one buffer-copy.
  4. When a packet is full, the packet is pushed to a queue whose length is limited to 80. The size of the packet is controlled by configuration property “dfs.write.packet.size” and is default to 64KB. This step involves no buffer-copy.
  5. A DataStreamer thread waits on the queue and sends the packet to the socket whenever it receives one. The socket is wrapped with a BufferedOutputStream. But the byte[] buffer is very small (no more than 512B) and it is usually bypassed. The data, however, still needs to be copied to a temporary direct buffer owned by JDK code. This step requires two data copies.
  6. Data are sent from the ReduceTask’s kernel buffer to the DataNode’s kernel buffer. Before the data are stored in Block files and checksum files, there are a few buffer-copies in DataNode side. Unlike the case of DFS read, both file data and checksum data will traverse out of kernel, and into JVM land. The details of this process are beyond the discussion here and are not shown in the figure.

 

  1. TextOutputFormat の RecordWriter はバッファリングされていない。 ユーザーがライン・レコードを発行するとき、Text オブジェクトのバイト列は、圧縮レイヤが管理する 64KB ダイレクト・バッファ内に、そのままコピーされる。きわめて長いラインの場合には、何回かに分けて、64KB ずつバッファにコピーされるだろう。
  2. 圧縮レイヤがライン(あるいは長いラインの一部)を受信するときには必ず、ネイティブの圧縮コードがコールされ、そして、圧縮されたバイト列は別の 64KB ダイレクト・バッファにストアされる。 DFSClient レイヤは、byte [] バッファを単なる入力として受け入れるため、データが DFSClient レイヤにプッシュされる前に、圧縮レイヤが管理する内部の byte [] バッファへ向けたコピーが実行される。 このバッファのサイズは、コンフィグレーション・プロパティである “io.file.buffer.size” により制御される。このステップは、2つのバッファコピーを伴う。
  3. FSOutputSummer は、圧縮レイヤから得られる byte [] バッファに基づいてCRC32 チェックサムを計算し、また、チェックサム・バイトとデータ・バイトの双方を Packet オブジェクト内の byte [] バッファに蓄積する。 チェックサム計算を、512B チャンク全体に対して再実施しななくてはならない。そして、チャンクの境界線が不揃いな圧縮データに起因するかもしれない、部分的なチャンクを保持するために、内部の 512B byte [] バッファが使用される。 チェックサムが最初に計算され、パケットにコピーされる前に、4B byte [] バッファにストアされる。 このステップで、1つのバッファ・コピーが生じる。
  4. パケットがあふれるときには、長さが 80 に制限されたキューに、パケット自体がプッシュされる。 パケット・サイズはコンフィグレーション・プロパティ “dfs.write.packet.size” により制御されるが、そのデフォルトは 64KB となる。このステップは、バッファコピーを伴わない。
  5. DataStreamer スレッドは対象となるキューの支配下にあり、何らかのデータを受信するときには必ず、ソケットへ向けてパケットを送信する。 このソケットは、BufferedOutputStream に取り込まれている。しかし、byte [] バッファはきわめて小さいため(512B 以上にはならない)、通常ではバイパスされる。 ただ、JDK コードによるい管理されるテンポラリなダイレクト・バッファに、データをコピーする必要が依然として残っている。 このステップでは、2つのデータ・コピーが必要となる。
  6. ReduceTask のカーネル・バッファから DataNode のカーネル・バッファまで、データは送信される。 データが Block ファイルとチェックサム・ファイルにストアされる前に、 DataNode サイドで何回かのバッファ・コピーが行われる。 DFS Read のケースとは異なり、ファイル・データとチェックサム・データの双方が、カーネルを横断してJVM にたどり着く。 そのプロセスの詳細については、この図に示していない。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

September 21, 2009

The Anatomy of Hadoop I/O Pipeline _5

Filed under: Hadoop I/O Pipeline — Agile Cat @ 8:12 am
Tags: , , , , ,

From <http://developer.yahoo.net/blogs/hadoop/>

Optimizing Output Pipeline

Overall, including the “copy” for compressing bytes, the process described above requires six buffer-copies for an output line record to reach ReduceTask’s kernel buffer. What could we do to optimize the write pipeline?

出力ライン・レコードが ReduceTask のカーネル・バッファにたどり着くためには、バイトを圧縮する “Copy” も含めた全体として、6つのバッファ・コピーが必要となる。この Write パイプラインを最適化するために、何ができるだろうか?

  • We can probably reduce a few buffer-copies.
  • The native compression code may be called less frequently if we call it only after the input buffer is full (block compression codecs like LZO already do this).
  • Checksum calculations can be done in bulk instead of one chunk at a time.
  • おそらく、いくつかのバッファ・コピーを省略できる。
  • 入力バッファがフルになったときだけに、ネイティブのコードをコールするようにすれば(LZO などの圧縮コーデックでは実現済み)、その頻度を低減できる。
  • チェックサムの計算は、チャンクごとではなく、バルクでの一括処理が可能である。

Hadoop IO_4

Figure 4: Optimizing output pipeline.

Figure 4 shows how it looks like after these optimizations, where a total of four buffer-copies are necessary:

Figure 4 が示すのは、それらの最適化を行った後の、4回のバッファ・コピーが必要な場合の様子である:

  1. Bytes from a user’s Text object are copied to a direct buffer owned by the TextOutputFormat layer.
  2. Once this buffer is full, native compression code is called and compressed data is deposited to a direct buffer owned by the compression layer.
  3. FSOutputSummer computes the checksum for bytes in the direct buffer from the compression layer and saves both data bytes and checksum bytes into a packet’s direct buffer.
  4. A full packet will be pushed into a queue, and, in background, the DataStreamer thread sends the packet through the socket, which copies the bytes to be copied to kernel buffers.

 

  1. ユーザーの Text オブジェクトから得られるバイト列は、TextOutputFormat レイヤが管理するダイレクト・バッファにコピーされる。
  2. このバッファがフルになった後にネイティブの圧縮コードがコールされ、圧縮レイヤが管理するダイレクト・バッファに圧縮されたデータが蓄積される。
  3. FSOutputSummer が、圧縮レイヤから得られたダイレクト・バッファ内のバイト列に対して、チェックサムを計算し、データ・バイトとチェックサム・バイトの双方を、パケットのダイレクト・バッファ内に保存する。
  4. いっぱいになったパケットはキューにプッシュされ、カーネル・バッファにコピーされるべきバイト列をコピーするために、DataStreamer はバック・グラウンドにおいて、ソケットを介してパケットを送信する。

Conclusion

This blog post came out of an afternoon spent asking ourselves specific questions about Hadoop’s I/O and validating the answers in the code. It turns out, after combing through class after class, that the pipeline is more complex than we originally thought. While each of us is familiar with one or more components, we found the preceding, comprehensive picture of Hadoop I/O elucidating, and we hope other developers and users will, too. Effecting the optimizations outlined above will be a daunting task, and this is the first step toward a more performant Hadoop.

このブログ・ポストは、私たち自身で Hadoop の I/O に関する議論を行い、コードにおける妥当性を検査した結果である。クラスごとに綿密に調査した後に、このパイプ・ラインは、これまでに考えてきたよりも、さらに複雑であることが分かった。私たちの一人一人は、1つあるいは複数のコンポーネントに精通しているが、Hadoop I/O を説明するための包括的な図の重要性を認識した。他のデベロッパーやユーザーも、そう考えてくれると期待している。前述の効果的な最適化の概要は、困難なタスクになるだろう。そして、このことが、効果的な Hadoop へ向けた最初のステップとなる。

– Hong Tang

From <http://developer.yahoo.net/blogs/hadoop/>

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

Blog at WordPress.com.