M. Ahmed

Notes on Zookeeper

Following are my notes from the zookeeper paper

Introduction

  • Used for distributed process coordination.
  • Simplest form of coordination is static/dynamic config of nodes used for leader election, node roles, etc
  • Other Approaches: Different primitives for different form of coordination i.e. SQS for queues, Chubby for locking.
  • Zookeeper’s Approach: Exposes kernel API that helps build primitives.
  • It does not use blocking primitives (locks) because slow nodes can block fast nodes and implementation is complex because it involves failure detection.
  • It uses wait-free data objects organized in a file-like hierarchy.
  • It uses FIFO client order & linearizable writes for efficient implementation which can also be used for consensus.
  • The service has multiple servers with replication for availability and latency.
  • FIFO Client order allows for async operation submission by clients.
  • For linearizability, atomic broadcast protocol called Zab is used for writes.
  • Reads are processed locally and don’t use atomic broadcast.
  • Clients can cache reads and zookeeper can notify them on revalidation using the watch mechanism.
  • In contrast, chubby manages client cache and blocks updates until cache is invalidated for every client that has it (Some clients might be slow), There’s a lease that has an upper bound but zookeeper avoids it all together by using the watch functionality.

Architecture

  • Clients connect to Zookeeper using an API which manages network connection and exposes methods.
  • Node Types:
    • Client: End-user
    • Server: Zookeeper Node
    • Znode: In-memory data object in data tree

Znodes

  • Use UNIX file-like paths i.e. /A/B/C
  • Can have data.
  • Can have children except ephemeral nodes.
  • Types:
    • Regular: Created and deleted by clients.
    • Ephemeral: Created by client and deleted either by client or automatically when session ends (deliberately or in case of failure).
  • Sequential Flags:
    • Client can set an optional flag that adds a monotonically increasing number to node’s name.
    • When a sequential flag is set then the value of the flag in name is greater than the sibling’s flag value.

Watch

  • Clients can opt into watch when reading a value.
  • The callback is triggered only once.
  • Callback also receives connection-loss events to let the client know that updates might be delayed.

Data Model

  • A simple file system model with full-data R/W
  • Or a KV table with hierarchical keys.
  • Hierarchy is used for:
    • Divided sub-trees to different applications.
    • ACL for sub-trees.
  • Should not be used for general data storage.
  • But can be used for store meta-data/config per znode.
  • For example, writing leaderId in a leader election system to a known node.

Sessions

  • A series of transactions on a client
  • Has a timeout
  • Ends when ended by client or timeout occurs (Server thinks client is faulty)
  • Can persist across multiple servers

Client API

Here’s a few methods from the client API:

create(path, data, flags)
delete(path, version)
exists(path, watch)
getData(path, watch)
setData(path, data, version)
getChildren(path, watch)
sync(path)
  • All methods have sync & async variants.
  • Client guarantees callbacks are called in-order.
  • Passing version numbers allow for conditional updates.
  • Passing -1 as version number bypasses conditional updates.

Guarantees

  • Types:
    • Linearizable writes
    • FIFO Client order
  • Linearizability: Writes are A-Linearizable. This means that writes are processed in order but a replica can have stale writes because it’s not a part of the quorum that commit the log or it’s a part of the network partition.
  • FIFO Client Order: Operations are processed at every replica allowing for linear scalability. For example, local writes are processed before reads. This concept is used in the sync operation where write a null causes a replica to write a value (& read every value before the write was written) and then read the next value (latest state).
  • The linearizable writes along with FIFO client order prevents inconsistent reads.
  • A write followed by sync followed by read flushes the write by using utilizing the FIFO Client order guarantee.
  • Service is up if majority servers are active & communicating.
  • If a request is committed then it’s durable as long as a quorum can be eventually achieved.

Primitive Examples

  • Configuration Management: Every node can have their config stored at z(c) and set watch to true so whenever the config changes it can listen to the changes but the changes are triggered only once unless the next read also sets the watch flag to true.
  • Rendezvous: used to share state not known beforehand. One process can write the state into the znode and the other can watch for changes to read the state. If the znode is ephemeral then the nodes can kill their process or do cleanup when the znode is deleted.
  • Group Memberships: Every process creates an ephemeral znode under a set node say: z(g), which the unique process id or uses the sequential flag. It also adds it’s metadata on load. When session dies for the process it is automatically removed. Processes can lookup membership by getting children for z(g) and even watch for changes in membership.
  • Simple Locks: Create an ephemeral znode z(i). It represents the lock. When client dies or deletes the lock it is released. Other nodes can check if the znode exists and watch for changes to obtain the lock. Issue is that multiple nodes can compete for locks even tho only one lock is available and only exclusive locking is implemented.
  • Simple Locks Without Crowding: Create sequential ephemeral nodes under a prefix. znode with the lowest sequential flag value gets the lock. Second lowest waits for watch. Repeat until lock acquired.
  • Read & Write Locks: Write locks are same as above. Read locks check if there’s any write znode with sequential number lower than read if not then it gets the lock. Multiple nodes can get the read lock as that’s not an issue.
  • Double Barrier: Need a certain number of nodes to enter. Every node creates an ephemeral node under z(b). The node at threshold creates a ready znode which starts a computation. To exit nodes wait on a particular znode to be deleted before checking the exit condition.

Applications

  • Fetching Service: Yahoo’s FS uses zookeeper for configuration management of fetching servers which allows for fault tolerance. It also uses Zookeeper for leader-election.
  • Katta: A distributed indexer uses Zookeeper for group membership of worker processes and leader-election.
  • Yahoo Message Broker: Pub-sub system uses multiple primary-backup servers for shards with a share-nothing architecture. Uses Zookeeper for distributing topics (config management), failure detection & group-membership.

Implementation

  • Uses replication across servers for high availability.
  • Write requests are atomically broadcasted and then sent to the replicated database.
  • Read requests go to replicated database directly.
  • Replicated database is an in-memory database containing the entire tree.
  • znodes can have maximum 1MB of data by default.
  • All writes are written to disk for recoverability using a WAL (Write-Ahead Log).
  • Writes are sent to a leader from followers. Followers receive proposals about state updates from leaders.

Request Processor

  • Requests are transformed into end txns using the desired state after txn is done.
  • All txns are idempotent once they pass through the request processor.

Atomic Broadcast

  • Requests are forwarded to the leader.
  • Leader broadcasts using atomic broadcast (called Zab)
  • The server that received the request replies back once it delivers the state change.
  • Zab uses majority quorums (2f+1 can withstand f faulty nodes)
  • Zookeeper processes thousands of requests in parallel. To keep it consistent it makes sure that requests are processed in order and changes from previous leaders are delivered to new leaders before it starts broadcasting it’s changes.
  • Logs are used for proposals and WAL to avoid writing to disk multiple times per request.
  • Zab might redeliver messages but that’s okay because txns are idempotent as long as they’re in order.

Replicated Database

  • Replaying the log on boot can take a long time so we take periodic snapshots and only send broadcasts after the snapshots.
  • We use fuzzy snapshots (which do not lock db state) so it might have duplicate txns but that’s okay because they’re idempotent.

Client-Server Interactions

  • Server sends out notifications on writes if any watch paths meet the znode. Servers process writes sequentially. Only the connected server tracks watches for the session.
  • Reads are done locally using using the replicated database. The database might return stale writes. Use sync to flush the writes before the reads.
  • On connecting to a different server, it uses the zxid to see how up to date a server is, And does not respond until it’s caught up with the leader.
  • Every client session has a timeout, leader determines if client is faulty after no server has heard from a client in that timeout. If client sends regular requests no heartbeat is sent but if the time between requests is large then the client sends heartbeats every 3 ms, and switches to a different server if it has not heard from a server for 2-3 ms.