Building a distributed concurrent queue with Apache ZooKeeper _1
by Henry Robinson
May 28, 2009
http://www.cloudera.com/blog/2009/05/building-a-distributed-concurrent-queue-with-apache-zookeeper/
In my first few weeks here at Cloudera, I’ve been tasked with helping out with the Apache ZooKeeper system, part of the umbrella Hadoop project. ZooKeeper is a system for coordinating distributed processes. In a distributed environment, getting processes to act in any kind of synchrony is an extremely hard problem. For example, simply having a set of processes wait until they’ve all reached the same point in their execution – a kind of distributed barrier – is surprisingly difficult to do correctly. ZooKeeper offers an API to facilitate this sort of distributed coordination. For example, it is often used to serve locks to client processes – locks are just another kind of coordination primitive – in the form of small files that ZooKeeper tracks.
Cloudera での最初の数週間で、Apache ZooKeeper システムを Hadoop プロジェクトの体系に取り込むための作業に従事した。 ZooKeeper とは、分配プロセス間をを調整するためのシステムのことである。分散環境において、あらゆる同期に対応するプロセスを実現することは、きわめて困難な問題となる。たとえば、プロセス・セットの実行において、すべてのプロセスが同じポイントに到達するまで待つことは、分散という障壁の問題などにより、きわめて難しいとこになる。 ZooKeeper が提供する API により、この種類の分散調整が促進される。たとえば、クライアント・プロセスのロックをサポートするためにも使われるが、それは ZooKeeper が小規模ファイルをトラックする際の、プリミティブな調整機能に過ぎない。
In order to be useful, ZooKeeper must be both highly reliable and available as systems will rely upon it as a critical component. For example, if locks cannot be taken, processes cannot make progress and the whole system will grind to a halt. ZooKeeper is built on a suite of reliable distributed systems techniques and protocols, and is typically run on a cluster of machines so that if some should fail, the remaining ones can continue to provide service. Under the hood, ZooKeeper is responsible for ordering calls made by clients so that each request is processed atomically and in a fixed and firm order.
有用であるためには、対象となるシステムが重要なコンポーネントである ZooKeeper に依存するにつれて、その信頼性と可用性を高めなければならない。たとえば、ロックができないなら、プロセスを進めることができず、システム全体が停止してしまうだろう。 信頼できる分散システムのテクニックとプロトコルの組み合わせの上に ZooKeeper は構築され、また、一般的にはマシン・クラスタ上で実行し、何かが失敗した場合には、別の何かがサービスを提供し続けるようにする。こうした体系の下で、ZooKeeper はクライアントが作成したコールを順番に処理する責任を持ち、それぞれのリクエストが、決められた順初手厳格に自動処理されるようにする。
One of my first contributions to the project was a set of bindings to allow programs written in the Python language to act as clients to a ZooKeeper cluster. ZooKeeper was natively written in Java, and there are already C and Perl bindings. Adding Python bindings increases the number of people that can use the system, and brings the strengths of Python, such as rapid prototyping, to bear when designing distributed systems.
このプロジェクトに対する最初のコントリビューションは、Python で記述されたプログラムが、ZooKeeper クラスタに対するクライアントとして振舞うようにすることだった。 ZooKeeper はネイティブとして Java 記述されているが、すでに C および Perl とのバインディングが存在している。 Python バインディングを加えることで、このシステムを利用できる人を増やし、また、分散システムを設計するときに要する、迅速なプロトタイピングなどにおける Python を強化する。
The Python ZooKeeper bindings are available from the ZooKeeper SVN repository and should be part of the 3.2 release, planned for the next couple of weeks. To use the bindings now, you can either check out the latest version of the code from the SVN repository, or download a tarball containing a recent snapshot here. The zookeeper module exposes the ZooKeeper API to Python, so to get started all you need do is add import zookeeper to your Python script once the module is installed. Instructions on getting up and running are at the end of this post.
Python ZooKeeper バインディングは、ZooKeeper SVN リポジトリから入手可能であり、また、2~3 週間後に計画されている、リリース 3.2 の一部になるはずだ。 現時点では、このバインディングを用いるために、 SVN リポジトリ内のコードについて最近のバージョンをチェックするか、最近のスナップショットを含む tar ファイルをダウンロードすることができる。 この Zookeeper モジュールは、Python に対して ZooKeeper API を開示する。そのため、着手するために必要なすべてが、そのモジュールがインストールした直後から、ZooKeeper がら Python スクリプトに読み込まれる。そのためにインストラクションは、このポストの最後で提供する。
To illustrate some of the ZooKeeper API, I’ve written a distributed FIFO queue in Python – the source code is here – which I wanted to share. The combination of Python and Zookeeper meant that I was able to write the queue in just over 60 lines of code, and most of that deals with local coordination issues between two threads rather than any tricky issues trying to make remote processes behave correctly. I can only give a taste here of how programming with Python and ZooKeeper works. I hope there’s enough here to convince you that ZooKeeper might make a useful component for distributed systems that need a little herding.
いくつかの ZooKeeper API を例証するために、Python で分散 FIFO キューを記述し、そのソースコードがココにあるが、それは共有したいと望むものである。 Python と Zookeeper の組み合わせは、キューを記述することであり、60 行強のコードで実現される。そして、リモート・プロセスを正確に作用させようとする、トリッキーなものではなく、その大部分が、2つのスレッド間でのローカルな調整を取り扱うものとなる。ここで私が提供するのは、ZooKeeper を機能させるために Python を用いて行う、プログラム手法の入り口である。分配システムを取りまとめるために必要な、有用なコンポーネントを ZooKeeper で開発できることを、確信してもらえれば満足だ。
ZooKeeper
ZooKeeper provides a tree abstraction where every node in that tree (or znode, in ZooKeeper parlance) is a file on which a variety of simple operations can be performed. ZooKeeper orders operations on znodes so that they occur atomically. Therefore there is no need to use complex locking protocols to ensure that only one process can access a znode at a time. The tree represents a hierarchical namespace, so that many distinct distributed systems can use a single ZooKeeper instance without worrying about their files having the same name.
ZooKeeper は、ツリー上の全ノード(ZooKeeper 用語だと znode)でシンプルな操作を実現するために、ファイルで構成されるツリーの抽象概念を提供する。 そして、その処理が自動的に行われるよう、ZooKeeper は znodes にオペレーションを発行する。 そのため、複数のプロセスが znode に、一度にアクセスしないことを保証するための、複雑なロック・プロトコルを用いる必要がなくなる。このツリーでは、階層的なネームスペースが表現される。重複するファイル名を気にすることなく、多様な分散システムから ZooKeeper のシングル・インスタンスを使用できる。
Each znode has some associated data – up to a megabyte in current builds – that can be updated atomically. Every update to a znode increases its version number, which allows clients to perform compare-and-swap operations by reading the version and then updating a znode only if the version is still the one that was read.
それぞれの znode は、自動的なアップデートが可能な(現在のビルドでは1メガバイトまで)、いくつかの関連づけられたデータを持つ。 znode に対する全てのアップデートは、そのバージョン・ナンバーをインクリメントする。つまり、対象となるバージョンを読み込むことで、クライアントからの compare-and-swap 操作を実行し、続いて、読み込まれたバージョンと対象となるバージョンが一致する場合にのみ、そのバージョン・ナンバーをインクリメントする。
As a notification mechanism, ZooKeeper provides watches, which are callback methods that are called asynchronously when an event of interest occurs. Watches are attached, typically, to an individual znode. When that znode changes any watcher on the znode will be fired asynchronously on the client. Many methods of the ZooKeeper API have an optional watch argument. Some languages have to work hard to provide callable objects as parameters, but Python makes this easy as callables are first class language constructs. Simply pass any callable, like a method or a lambda expression, to the zookeeper module and when an event of interest occurs, the callable will be executed.
ZooKeeper は通知メカニズムとして、重要なイベントが生じたときに非同時で呼び出される、コールバック・メソッドの watches を提供する。 一般的に、この watches は個々の znode にアタッチされる。 対象となる znode が変更されるとき、その znode 上のあらゆる watcher が、クライアント上で非同時に発火することになる。 ZooKeeper API の多数のメソッドが、オプションとして watch 引数を持つ。いくつかの言語では、呼び出し可能なオブジェクトをパラメータとして提供することが困難だが、その呼び出を容易にする Python は、最適なクラス・コンストラクタとなる。 つまり、メソッドやラムダ式のように、zookeeper モジュールへの受け渡しを行い、重要なイベントが発生したときに、その呼び出し可能なオブジェクトを発火させる。
This call comes from a separate thread of execution, so great care must be taken to ensure that unexpected things do not happen due to your watcher being fired at an arbitrary point in the execution of your script. Normally you will use watchers to notify another thread of a state change. It will often be the case that the main thread will be waiting for the watcher to fire before it can continue. An example of this is in the __init__ method of our ZooKeeperQueue when we try to connect to the server.
このコールは、別個の実行スレッドから発行される。 そのため、スクリプトにおける任意の実行ポイントにおいて発火する watcher により、予期せぬことが起こならないことを保証するために、深い注意を払わなければならない。 一般的に watcher は、他のスレッドにおけるステート変更を通知するために用いられる。 大半のケースにおけるメイン・スレッドは、その処理の継続の前に、イベントを発火させる watcher を待つことになる。 その例として挙げられるのは、サーバーに接続しようとするときの、ZooKeeperQueue における __init__ メソッドである。
Compared to the time a script takes to execute, connections can take a long time to run. So it’s useful that the ZooKeeper API allows us to connect asynchronously, in case there were any work that we wanted to get done while we were waiting for the connection to be established. However, in our case, we just want to wait until the connection is successful, and so we need a mechanism to wait for the watcher to notify us.
スクリプトの実行時間と比較して、接続に要する時間が長くなることもある。したがって、接続の確立を待つ間に、その終了を望む処理が存在する場合に備えて、ZooKeeper API による非同期接続を許可することは有用である。しかし、私たちのケースでは、接続が成功することだけを単純に待つことにした。 つまり、watcher による通知を待つメカニズムを必要とした。
A useful tool for this inter-thread communication is the Condition object in Python, which represents a condition variable, a well-known concurrent programming abstraction. Condition objects may be acquired and released just like locks, but they also expose an API to wait for a notification from another thread and to fire that notification. While a thread is waiting on a Condition it goes to sleep, leaving the operating system with some free CPU to dedicate to other processes. Once a Condition is notified, a thread that is waiting on it is woken up and allowed to continue execution once the notifying thread has released the Condition.
スレッド間のコミュニケーションに有益なツールは、 広く知れ渡っているコンカレント・プログラミングの、抽象概念である条件変数を表現する、Python の Condition オブジェクトである。 Condition オブジェクトは、まさにロックのように取得され開放されるが、他のスレッドからの通知を待ち、通知を発行するための API を公開する。 スレッドがスリープ状態に入るための Condition を待っている間、他のプロセスのためのフリーな CPU と共に、オペレーティング・システムは待機する。 そして、何らかの Condition が通知された直後に、それを待っていたスレッドが起動し、対象となる Condition を有していたスレッドが通知を行った後に、継続した実行が実現される。
This leads to a simple pattern for communicating between watchers and the main thread. Here’s an excerpt from the connection code:
この方式が、watcher とメイン・スレッド間の通信パターン単純化していく。 ここに接続コードの抜粋を、以下に示す:
def watcher(handle,type,state,path):
print "Connected"
self.cv.acquire()
self.connected = True
self.cv.notify()
self.cv.release()
self.cv.acquire()
self.handle = zookeeper.init("localhost:2181", watcher, 10000, 0)
self.cv.wait(10.0)
First we define our watcher which takes four parameters (if you want to provide more parameters or local state to a watcher, one way to do it is to wrap a function call in a local lambda which captures the state). The next line acquires an exclusive lock on a condition variable cv. Why do this now? Once we set our watcher in place, it could be fired at any time – even before the main thread makes progress to the next line of code. If we don’t prevent it from sending a notification on the condition variable before we’re ready to look for it, the notification could get lost and we could wait forever. Notifications aren’t buffered – if no one is waiting on a condition variable, no one gets woken up.
私たちは最初の行で、4つのパラメータを取得する watcher を定義した(watcher に対して、さらに多くのパラメータやローカル・ステートを提供する場合には、ステートをキャプチャするローカル・ラムダ内に、機能をラップする方式がある)。次の行では、条件変数 cv における排他的なロックを取得している。 この段階で、なぜ、この処理を施すのか? 私たちは、watcher を適切な位置にセットした直後から、メイン・スレッドが次の行のコードを処理する前であっても、その発火がいつでも可能になる。 もし、対象となる通知を探す準備ができていなくても、条件変数における通知の送信を阻止しないなら、その通知は消滅して、いつまでも待つことが可能になる。通知はバッファに入れられない。 つまり、誰も条件変数を待っていないなら、何も起動しない。
Then the code initialises ZooKeeper. The zookeeper module gives us an integer handle which we can use to refer to our connection in the future (we can open many connections per client). The next line tells us to wait until we receive a notification on the condition variable that the connection has succeeded. The parameter is a timeout in seconds, after which if we are still not connected we presume that something is wrong and abort.
続いて、このコードがZooKeeper を初期化する。 その Zookeeper モジュールが、将来における接続を参照するために使用できる(クライアントごとに接続をオープンする)。次の行では、接続が成功している条件変数上で、通知を受信するまで待つことになる。接続が行われず、何らかの間違と推測できる場合には、このパラメータは数秒でタイム・アウトされ、abort される。
<続く>
ーーーーー
<関連>
Apache ZooKeeper による分散並列キューの構築 _1
Apache ZooKeeper による分散並列キューの構築 _2
Apache ZooKeeper による分散並列キューの構築 _3
Apache ZooKeeper による分散並列キューの構築 _4
Observers と ZooKeeper _1
Observers と ZooKeeper _2



























