Introduction
In this report we describe our implementation of a Distributed Document Repository that uses two well known algorithms of maintaining distributed dictionaries as well as a method determining the global state. Performance tests were conducted to compare these two algorithms in terms of message size and count required to bring all nodes up to date after a number of modifications recorded and anyalized in the last section.Architecture
Our implementation of the Distributed Document Repository was written in JAVA and may be run on any connected workstation running a compatible version of the Sun Java Virtual Machine. We utilized the java.net.* suite of network connection primitives to achieve communication from any node to any other node in the cluster. The graphical user interface was written using the java.swingx.* graphical component libraries. In order to view the actual files, external calls to emacs and ghostview were made.Connection Management
With only the upper bound of the number of nodes expected to participate in the repository passed in as an integer argument, the document repository finds a free port in the 8000-9000 range and contacts an existing name server assumed to be running at a known IP address on a known port. The node registers it’s presence and participation in the repository by giving the name server it’s IP address and port number. The name server then returns a global unique identifier which the node will be known by. Along with this ID, the name server returns a list of all nodes that have connected to the repository at any point between the initialization of the name-server and the registration of the nodes services. In order to keep all nodes informed about other nodes in the repository, an updated list is re-sent to all nodes each time a new node connects.Although this scheme requires
Communication
High level algorithms in the repository and UI classes need only know the unique ID of the node which they wish to communicate with. Our implementation of the send and receive primitives handle the lookup of the IP address and port required to communicate with the node known by that unique ID. Communication in the system is stateless and each desired interaction only required a single send and receive. This eliminated the requirement to maintain connections and open dialog with a server that may become occupied with another task while communicating. It also allows all other operations to resume if a message send or receive fails at the TCP/IP level.Three threads were used to keep each node from being tied up performing updates or accepting messages. The main thread performed all of the repository maintenance functions as well as the interaction with the user. Any communication with other nodes beyond the initial service registration was performed with explicit calls and call backs to the other two threads.
The server thread was bound to an unused port between the range of 8000 to 9000 and waited for incoming messages sent to that port. When a message was received by the server, an identifying field was examined and the message dispatched to the appropriate class or thread depending solely on this identifier. Messages containing updates to the log or view of other nodes where sent through a third thread which was scheduled to wake up every 10 seconds and send such messages if necessary. The update scheme is explained in the section on the log based repository.
Repository
Our repository allows three operations for the user, insert, delete and list whose functions are obvious from the names. There are two restrictions on the allowed operations:- There is at most one occurrence of the operation insert(x) for any object x in the view.
- delete(x) at node j is legal if and only if x is currently in j’s view.
The first restriction is satisfied by tagging every item which is inserted the node number and a time stamp. Thus two identical items inserted even in the same node differ by virtue of having different time stamps. Since our repository is not serialized (no global mutual exclusion), users can make arbitrary concurrent modifications to the repository. Synchronization between different nodes is achieved through periodic message exchanges. This means that at any point of time, it is not guaranteed that all nodes will exhibit identical views to the users. The repository also doesn’t require synchronized physical clocks; we use logical clocks in each repository to create time
stamps.
We implement two different algorithms to achieve synchronization.
Simple Repository
In the first algorithm proposed by Fischer and Michael [1], the nodes exchange messages which contain the complete view along with time stamps. In addition to the view, every node i maintains a vector of time stampsLog-based Repository
In the second algorithm, proposed by Wuu and Bernstein [2], the messages contain only a log of the inserts and deletes that has happened. So this solution is more scalable as the size of the repository grows. Every node i maintains a logmaintained is a generalization of the time vector which is a time table
Repository Architecture
The repository communicates with the rest of the system via a well-defined message passing interface. For the user, the repository supports the methods insert, delete and list. The implementation details (logged repository vs non-logged) are completely hidden from the rest of the system. For the use of the system, it supports two additional methods : getMessage(i) and putMessage(m) . getMessage constructs and returns a message m for node i, while putMessage simply updates the repository from a message m received from some other node. Access to the repository for all the three threads in the process were synchronized via the native Java synchronization primitives.Global Snapshots
We allow for any given node to take global snapshots at any point in the computation. This is achieved by following a solution proposed by Chandy and Lamport [3]. In order to obtain an accurate consistent state of the system, the node requesting the snapshot broadcasts a message to all other nodes instructing them to take a snapshot of their view and begin logging all communication that they receive. Then, each node receiving the broadcast message from the originator, broadcasts a separate message to all other nodes requesting the snapshot be sent to the originator and the communication logs cease. This second message flushes all communication channels and thestate of each channel is recorded in the log of the receiving node. After receiving this second message, each node sends their snapshot along with the message logs to the originating node. The originator then sorts each message log according to sender and displays each nodes snapshot along with the state of the communication channels in
a pop-up window.
Performance
Performance test were conducted using a variable number of nodes ranging from 10 to 80 . For each test the repository was pre-loaded with 1000 entries before any insertions or deletions were performed. 40% of the nodes performed 5 random insertions or deletions each while the rest were passive listeners. After all nodes sychronizedthier view of the repository, we collected data regarding the message size and number that were necessary. The average size of the messages used in the update are summarized in fig. 1. The average number of update messages sent per node are summarized in fig.2.
We saw that the view based repository required a large message size regardless of the number of nodes present. The log based approach sent messages that were less than 5% of the size sent by the view based approached for tests with 20 nodes. The log based approach message size increased
Figure 1. Size of messages sent as a function of number of clients. The lines are fits to constant and quadratic functions.
The number of messages sent in order to propagate it’s changes increased roughly linearly with the number of nodes. Obviously this is not a scalable solution. This linear size increase is a side-effect of a truly randomized selection of update message destination. 80% of the nodes became up to date far sooner and with fewer messages than all 100% as some messages were needlessly sent to nodes which were already mostly up to date.
Figure 2. Number of messages sent as a function of number of clients.
Conclusion
The largest open area for improvement is the scheme that determines the frequency and the destination of the update messages sent from each node. Our randomized algorithm worked better that intermediate attempts that examined the known differernce in other nodes repositories in a greedy manner. This intermediate solution seemed to create small sub groups of communicating nodes that would not talk to each other without some forced non-deterministic updates that broke from the greedy strategy.The number of messages could be reduced if a token ring like structure were implemented and used for propagating updates. However, this has the disadvantage of requiring
Synchronization for the repository was implemented very crudely : every thread that accesses the repository completely locked the whole repository. Fine grained locking as well as reader/writer locks would definitely improve performance.
References
- M. J. Fischer and A. Michael, Sacrificing Serializability to Attain High Avail- ability of Data in an Unreliable Network, ACM SIGACT-SIGMOD Symposium on Principles of Database Systems, 1982 pp 70-75.
- G. T. Wuu and A. J. Bernstein, Solutions to the Replicated Log and Dictionary Problems Proceedings of the Third Annual ACM Symposium on Distributed Computing, 1984 pp 233-242.
- K. Mani Chandy , L. Lamport, Distributed Snapshots: Determining Global States of Distributed Systems, ACM Transactions on Computer Systems ( TOCS), v.3, 1985 pp 63-75






Erik-Jan
Invite as author
Usage of consistent hashing
A consistent hash of a document is a code that any node in the network can compute. Every node in the network is also responsible for a certain set of hashes. By randomly assigning the hash codes to nodes we can distribute documents evenly over an unlimited set of nodes. Messages can be routed efficiently through such a network as well, creating the perfect basis for a distributed document system!
Flaws in the system above that would be solved by a distributed hash table mainly involve the central name server. Such a central server puts a limit on the scalability. The need to send out huge lists of nodes in the network is also removed, since every node will only need to know a few neighbors to route messages through the network. This also increases the reliability. Failure of one or more nodes can be dealt with in most of the cases.
In the approach described here, each node learns of the entire state of the repository. This is useful in environments where scanning performance of each item in the repository is prioritized over single-item lookup operations. In both approaches, no centralized component is necessary. Both are fully distributed and true peer 2 peer algorithms with each node having the same responsibility as any other.
EditSaveCancelDeleteDeleteBlock this userReport abusive commentHide report window