A Distributed Document Repository

Without Serializability

In this report, we describe an implementation of a Distributed Document Repository that uses two well- known algorithms of maintaining distributed dictionaries. We also describe well a method for determining the global state.

This was written along with Chiranjeeb Buragohain in 2002 as part of a class assignment at UCSB's department of computer science.


Introduction

In this report we describe our implementation of a Distributed Document Repository  that uses two well known algorithms of maintaining distributed dictionaries as well as a method determining the global state. Performance tests were conducted  to compare these two algorithms in terms of message size and count required to bring all nodes up to date after a number of modifications recorded and anyalized in the last section.

Architecture

Our implementation of the Distributed Document Repository was written in JAVA and may be run on any connected workstation running a compatible version of the Sun Java Virtual Machine. We utilized the java.net.* suite of network connection primitives to achieve communication from any node to any other node in the cluster. The graphical user interface was written using the java.swingx.* graphical component libraries. In order to view the actual files, external calls to emacs and ghostview were made.

Connection Management

With only the upper bound of the number of nodes expected to participate in the repository passed in as an integer argument, the document repository finds a free port in the 8000-9000 range and contacts an existing name server assumed to be running at a known IP address on a known port. The node registers it’s presence and participation in the repository by giving the name server it’s IP address and port number. The name server then returns a global unique identifier which the node will be known by. Along with this ID, the name server returns a list of all nodes that have connected to the repository at any point between the initialization of the name-server and the registration of the nodes services. In order to keep all nodes informed about other nodes in the repository, an updated list is re-sent to all nodes each time a new node connects.

Although this scheme requires \small{n^2} messages during initial startup, it is required in order to allow for any node to join at any time and be able to interact with all existing nodes without delay. We prefer our name server implementation to that which requires a common configuration file to notify nodes of each others presence. Our scheme is, of course, more applicable to real world situations where the location or IRS of participants are often not known ahead of time. Assigning a unique ID to each of the nodes allowed for a semi fault tolerant system where any node may disconnect itself and re-connect at any time without loss of identity. Under the view based scheme, the node will be updated with a current view of the repository and the node will recover 100% of the information lost. Additional modifications would allow failures to be regonized by other nodes so that under any update scheme, the node will be given information needed to recover from a failure.
 

Communication

High level algorithms in the repository and UI classes need only know the unique ID of the node which they wish to communicate with. Our implementation of the send and receive primitives handle the lookup of the IP address and port required to communicate with the node known by that unique ID. Communication in the system is stateless and each desired interaction only required a single send and receive. This eliminated the requirement to maintain connections and open dialog with a server that may become occupied with another task while communicating. It also allows all other operations to resume if a message send or receive fails at the TCP/IP level.

Three threads were used to keep each node from being tied up performing updates or accepting messages. The main thread performed all of the repository maintenance functions as well as the interaction with the user. Any communication with other nodes beyond the initial service registration was performed with explicit calls and call backs to the other two threads.

The server thread was bound to an unused port between the range of 8000 to 9000 and waited for incoming messages sent to that port. When a message was received by the server, an identifying field was examined and the message dispatched to the appropriate class or thread depending solely on this identifier. Messages containing updates to the log or view of other nodes where sent through a third thread which was scheduled to wake up every 10 seconds and send such messages if necessary. The update scheme is explained in the section on the log based repository.

Repository

Our repository allows three operations for the user, insert, delete and list whose functions are obvious from the names. There are two restrictions on the allowed operations:

  1. There is at most one occurrence of the operation insert(x) for any object x in the view.
  2. delete(x) at node j is legal if and only if x is currently in j’s view.

The first restriction is satisfied by tagging every item which is inserted the node number and a time stamp. Thus two identical items inserted even in the same node differ by virtue of having different time stamps. Since our repository is not serialized (no global mutual exclusion), users can make arbitrary concurrent modifications to the repository. Synchronization between different nodes is achieved through periodic message exchanges. This means that at any point of time, it is not guaranteed that all nodes will exhibit identical views to the users. The repository also doesn’t require synchronized physical clocks; we use logical clocks in each repository to create time
stamps.

We implement two different algorithms to achieve synchronization.

Simple Repository

In the first algorithm proposed by Fischer and Michael [1], the nodes exchange messages which contain the complete view along with time stamps. In addition to the view, every node i maintains a vector of time stamps \small{T_i[j]}. At any time, node i knows about events in node j up to the time  \small{T_i[j]} (according to the clock in node ). Whenever i sends a message to j, j updates its view and time vector according to the time vector Ti sent with the message. The algorithm is simple, but the message size grows in direct proportion to the size of the repository.

Log-based Repository

In the second algorithm, proposed by Wuu and Bernstein [2], the messages contain only a log of the inserts and deletes that has happened. So this solution is more scalable as the size of the repository grows. Every node i maintains a log \small{L_i} which records all insert and delete events as a triplet of event type, node number and time. Also
maintained is a generalization of the time vector which is a time table \small{T_i[j][k]}. If \small{T_i[j][k] = t}, then node i knows for sure that the node j has learned about all events in node k till time t (by node k’s clock). The message exchanged between nodes consist of a part of the log maintained at the sender’s node, and the sender’s time table. When i decides to send a message to j, it checks its log for events which happened at k. If the events at k mentioned in the log happened before time \small{T_i[j][k]}, then j has already known about them and those entries in the log need not be sent. Thus the size of the messages can be drastically cut down. Also as messages propagate through the system, eventually all nodes get to know about events at k up to a time, say t. At this point the nodes can delete from their logs records of all events which happened at k before time t.

Repository Architecture

The repository communicates with the rest of the system via a well-defined message passing interface. For the user, the repository supports the methods insert, delete and list. The implementation details (logged repository vs non-logged) are completely hidden from the rest of the system. For the use of the system, it supports two additional methods : getMessage(i) and putMessage(m) . getMessage constructs and returns a message m for node i, while putMessage simply updates the repository from a message m received from some other node. Access to the repository for all the three threads in the process were synchronized via the native Java synchronization primitives.

Global Snapshots

We allow for any given node to take global snapshots at any point in the computation. This is achieved by following a solution proposed by Chandy and Lamport [3]. In order to obtain an accurate consistent state of the system, the node requesting the snapshot broadcasts a message to all other nodes instructing them to take a snapshot of their view and begin logging all communication that they receive. Then, each node receiving the broadcast message from the originator, broadcasts a separate message to all other nodes requesting the snapshot be sent to the originator and the communication logs cease. This second message flushes all communication channels and the
state of each channel is recorded in the log of the receiving node. After receiving this second message, each node sends their snapshot along with the message logs to the originating node. The originator then sorts each message log according to sender and displays each nodes snapshot along with the state of the communication channels in
a pop-up window.

Performance

Performance test were conducted using a variable number of nodes ranging from 10 to 80 . For each test the repository was pre-loaded with 1000 entries before any insertions or deletions were performed. 40% of the nodes performed 5 random insertions or deletions each while the rest were passive listeners. After all nodes sychronized
thier view of the repository, we collected data regarding the message size and number that were necessary. The average size of the messages used in the update are summarized in fig. 1. The average number of update messages sent per node are summarized in fig.2.

We saw that the view based repository required a large message size regardless of the number of nodes present. The log based approach sent messages that were less than 5% of the size sent by the view based approached for tests with 20 nodes. The log based approach message size increased \small{O(n^2)} with the number of nodes, but remained smaller than the messages sent in the view based approach by 18% for even the largest collection of nodes. The quadratic increase was due to the \small{n^2} increase in the size of the time table sent with each message. For an 80 node simulation, the size of the time table was 25.6Kb (1600 entries at 4 bytes each). For smaller repositories and larger collections of nodes, the overhead involved with a quadratically increasing message size eliminates any size savings gained by transferring only the log of modified entries.


Figure 1. Size of messages sent as a function of number of clients. The lines are fits to constant and quadratic functions.

The number of messages sent in order to propagate it’s changes increased roughly linearly with the number of nodes. Obviously this is not a scalable solution. This linear size increase is a side-effect of a truly randomized selection of update message destination. 80% of the nodes became up to date far sooner and with fewer messages than all 100% as some messages were needlessly sent to nodes which were already mostly up to date.


Figure 2. Number of messages sent as a function of number of clients.

Conclusion

The largest open area for improvement is the scheme that determines the frequency and the destination of the update messages sent from each node. Our randomized algorithm worked better that intermediate attempts that examined the known differernce in other nodes repositories in a greedy manner. This intermediate solution seemed to create small sub groups of communicating nodes that would not talk to each other without some forced non-deterministic updates that broke from the greedy strategy.
 
The number of messages could be reduced if a token ring like structure were implemented and used for propagating updates. However, this has the disadvantage of requiring \small{O(n^2)} time to update all nodes. This could be optimized with \small{O(log\b{n})} shortcuts in the ring allowing faster prorogation of updates.
 
Synchronization for the repository was implemented very crudely : every thread that accesses the repository completely locked the whole repository. Fine grained locking as well as reader/writer locks would definitely improve performance.

References

  1. M. J. Fischer and A. Michael, Sacrificing Serializability to Attain High Avail- ability of Data in an Unreliable Network, ACM SIGACT-SIGMOD Symposium on Principles of Database Systems, 1982 pp 70-75.
  2. G. T. Wuu and A. J. Bernstein, Solutions to the Replicated Log and Dictionary Problems Proceedings of the Third Annual ACM Symposium on Distributed Computing, 1984 pp 233-242.
  3. K. Mani Chandy , L. Lamport, Distributed Snapshots: Determining Global States of Distributed Systems, ACM Transactions on Computer Systems ( TOCS), v.3, 1985 pp 63-75

Comments

Usage of consistent hashing

Since your article was written in 2002 a lot has happened in the area of distributed computing. There is a big chance that the write already knows this. Just for those that are not aware of it yet: the document system described above would nowadays be designed by using consistent hashing. Consistent hashing is the basic ingredient of distributed hash tables. Well known examples of DHTs are the file sharing networks we all know.

A consistent hash of a document is a code that any node in the network can compute. Every node in the network is also responsible for a certain set of hashes. By randomly assigning the hash codes to nodes we can distribute documents evenly over an unlimited set of nodes. Messages can be routed efficiently through such a network as well, creating the perfect basis for a distributed document system!

Flaws in the system above that would be solved by a distributed hash table mainly involve the central name server. Such a central server puts a limit on the scalability. The need to send out huge lists of nodes in the network is also removed, since every node will only need to know a few neighbors to route messages through the network. This also increases the reliability. Failure of one or more nodes can be dealt with in most of the cases.

Last edited Jul 27, 2008 10:40 PM
Report abusive comment
William Strathearn
William Strathearn
Software Engineer at Google
Mountain View, CA
Article rating:
Your rating:

Reviews

    Knol translations

    Categories

    Based on community consensus.

    Activity for this knol

    This week:

    20pageviews

    Totals:

    3746pageviews
    3comments