Source code for drogulus.dht.node

# -*- coding: utf-8 -*-
"""
Contains code that defines the behaviour of the local node in the DHT network.
"""

# Copyright (C) 2012-2013 Nicholas H.Tollervey.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from twisted.python import log
from twisted.internet import reactor, defer
from twisted.internet.endpoints import clientFromString
import time
from uuid import uuid4

from drogulus import constants
from drogulus.utils import sort_contacts
from drogulus.net.messages import (Error, Ping, Pong, Store, FindNode, Nodes,
                                   FindValue, Value)
from drogulus.net.protocol import DHTFactory
from routingtable import RoutingTable
from datastore import DictDataStore
from contact import Contact
from drogulus.crypto import validate_message, construct_key
from drogulus.version import get_version


[docs]class RoutingTableEmpty(Exception): """ Fired when a lookup is attempted without any peers in the local node's routing table. """ pass
[docs]class ValueNotFound(Exception): """ Fired when a NodeLookup cannot find a value associated with a specified key. """ pass
[docs]def response_timeout(message, protocol, node): """ Called when a pending message (identified with a uuid) awaiting a response via a given protocol object times-out. Closes the connection and removes the deferred from the "pending" dictionary. """ uuid = message.uuid pending = node._pending if uuid in pending: pending[uuid].cancel() del pending[uuid] protocol.transport.abortConnection() node._routing_table.remove_contact(message.node)
[docs]class NodeLookup(defer.Deferred): """ Encapsulates a lookup in the DHT given a particular target key and message type. Will callback when a result is found or errback otherwise. If defined, will timeout with an errback. From the original Kademlia paper: "The most important procedure a Kademlia participant must perform is to locate the k closest nodes to some given node ID. We call this procedure a node lookup. Kademlia employs a recursive algorithm for node lookups. The lookup initiator starts by picking α nodes from its closest non-empty k-bucket (or, if that bucket has fewer than α entries, it just takes the α closest nodes it knows of). The initiator then sends parallel, asynchronous FIND NODE RPCs to the α nodes it has chosen. α is a system-wide concurrency parameter, such as 3. In the recursive step, the initiator resends the FIND NODE to nodes it has learned about from previous RPCs. (This recursion can begin before all α of the previous RPCs have returned). Of the k nodes the initiator has heard of closest to the target, it picks α that it has not yet queried and resends the FIND NODE RPC to them. Nodes that fail to respond quickly are removed from consideration until and unless they do respond. If a round of FIND NODEs fails to return a node any closer than the closest already seen, the initiator resends the FIND NODE to all of the k closest nodes it has not already queried. The lookup terminates when the initiator has queried and gotten responses from the k closest nodes it has seen. When α = 1 the lookup algorithm resembles Chord’s in terms of message cost and the latency of detecting failed nodes. However, Kademlia can route for lower latency because it has the flexibility of choosing any one of k nodes to forward a request to." READ THIS CAREFULLY! Here's how this implementation works: self.target - the target key for the lookup. self.message_type - the message class (either FindNode or FindValue). self.local_node - the local node that created the NodeLookup. self.shortlist - an ordered list containing nodes close to the target. self.contacted - a set of nodes that have been contacted for this lookup. self.nearest_node - the node nearest to the target so far. self.pending_requests - a dictionary of currently pending requests. constants.ALPHA - the number of concurrent asynchronous calls allowed. constants.K - the number of closest nodes to return when complete. constants.LOOKUP_TIMEOUT - the default maximum duration for a lookup. 0. If "timeout" number of seconds elapse before the lookup is finished then cancel any pending requests and errback with an OutOfTime error. The "timeout" value can be overridden but defaults to constants.LOOKUP_TIMEOUT seconds. 1. Locally known nodes from the routing table seed self.shortlist. 2. The nearest node to the target in self.shortlist is set as self.nearest_node. 3. No more than constants.ALPHA nearest nodes that are in self.shortlist but not in self.contacted are sent a message that is an instance of self.message_type. Each request is added to the self.pending_requests list. The length of self.pending_requests must never be more than constants.ALPHA. 4. As each node is contacted it is added to the self.contacted set. 5. If a node doesn't reply or an error is encountered it is removed from self.shortlist and self.pending_requests. Start from step 3 again. 6. When a response to a request is returned successfully remove the request from self.pending_requests. 7. If it's a FindValue message and a suitable value is returned (see note at the end of these comments) cancel all the other pending calls in self.pending_requests and fire a callback with the returned value. If the value is invalid remove the node from self.shortlist and start from step 3 again without cancelling the other pending calls. 8. If a list of closer nodes is returned by a peer add them to self.shortlist and sort - making sure nodes in self.contacted are not mistakenly re-added to the shortlist. 9. If the nearest node in the newly sorted self.shortlist is closer to the target than self.nearest_node then set self.nearest_node to the new closer node and start from step 3 again. 10. If self.nearest_node remains unchanged DO NOT start a new call. 11. If there are no other requests in self.pending_requests then check that the constants.K nearest nodes in the self.contacted set are all closer than the nearest node in self.shortlist. If they are, and it's a FindNode message call back with the constants.K nearest nodes in the self.contacted set. If the message is a FindValue, errback with a ValueNotFound error. 12. If there are still nearer nodes in self.shortlist to some of those in the constants.K nearest nodes in the self.contacted set then start from step 3 again (forcing the local node to contact the close nodes that have yet to be contacted). Note on validating values: In the future there may be constraints added to the FindValue query (such as only accepting values created after time T). """
[docs] def __init__(self, target, message_type, local_node, timeout=constants.LOOKUP_TIMEOUT, canceller=None): """ Sets up the lookup to search for a certain target key with a particular message_type using the DHT state found in the local_node. Will cancel after timeout seconds. See the documentation for twisted.internet.defer.Deferred for explanation of canceller. """ defer.Deferred.__init__(self, canceller) self.target = target self.message_type = message_type self.local_node = local_node # A set of nodes that have been contacted for this lookup. self.contacted = set() # Holds currently pending requests. self.pending_requests = {} if timeout: reactor.callLater(timeout, self.cancel) # To hold peers in the DHT that are known to the local node that are # possibly close to the target key. Closest nodes come first. self.shortlist = self.local_node._routing_table.\ find_close_nodes(target) if self.target != self.local_node.id: # Update the last_accessed attribute of the affected k-bucket. self.local_node._routing_table.touch_kbucket(target) if not self.shortlist: # The node knows of no other nodes within the DHT. self.errback(RoutingTableEmpty()) return # Holds the currently closest node to the target. self.nearest_node = self.shortlist[0] # Start the lookup process self._lookup()
def _cancel_pending_requests(self): """ Causes the deferreds waiting on pending requests to be cancelled in a clean fashion. """ requests = self.pending_requests.values() for request in requests: request.cancel() self.pending_requests = {}
[docs] def cancel(self): """ Cancels this lookup in a clean fashion. This function is dedicated to @terrycojones whose efforts at cancelling deferreds deserve some sort of tribute. ;-) """ self._cancel_pending_requests() defer.Deferred.cancel(self)
def _handle_error(self, uuid, contact, error): """ Callback to handle error conditions. If a node doesn't reply or an error is encountered it is removed from self.shortlist and self.pending_requests. Start the _lookup again. """ if contact in self.shortlist: self.shortlist.remove(contact) if uuid in self.pending_requests: del self.pending_requests[uuid] log.msg('Error during interaction with %r' % contact) log.msg(error) self._lookup() def _blacklist(self, contact): """ Removes a contact from the shortlist and routing table while adding it to the global blacklist of misbehaving peers. """ if contact in self.shortlist: self.shortlist.remove(contact) self.local_node._routing_table.blacklist(contact) log.msg('Blacklisting %r' % contact) def _handle_response(self, uuid, contact, response): """ Callback to handle expected responses (unexpected responses result in the remote node being blacklisted and a TypeError being thrown). When a response to a request is returned successfully remove the request from self.pending_requests. If it's a FindValue message and a suitable value is returned (see note at the end of these comments) cancel all the other pending calls in self.pending_requests and fire a callback with with the returned value. If the value is invalid blacklist the node, remove it from self.shortlist and start from step 3 again without cancelling the other pending calls. If a list of closer nodes is returned by a peer add them to self.shortlist and sort - making sure nodes in self.contacted are not mistakenly re-added to the shortlist. If the nearest node in the newly sorted self.shortlist is closer to the target than self.nearest_node then set self.nearest_node to the new closer node and start from step 3 again. If self.nearest_node remains unchanged DO NOT start a new lookup call. If there are no other requests in self.pending_requests then check that the constants.K nearest nodes in the self.contacted set are all closer than the nearest node in self.shortlist. If they are, and it's a FindNode message call back with the constants.K nearest nodes found in the self.contacted set. If the message is a FindValue, errback with a ValueNotFound error. If there are still nearer nodes in self.shortlist to some of those in the constants.K nearest nodes in the self.contacted set then start from step 3 again (forcing the local node to contact the close nodes that have yet to be contacted). Note on validating values: In the future there may be constraints added to the FindValue query (such as only accepting values created after time T). """ # Remove originating request from pending requests. del self.pending_requests[uuid] # Ensure the response is of the expected type[s]. if not ((isinstance(response, Value) and self.message_type == FindValue) or isinstance(response, Nodes)): # Blacklist the problem contact from the routing table (since it # doesn't behave). self._blacklist(contact) raise TypeError("Unexpected response type from %r" % contact) # Is the response the expected Value we're looking for..? if isinstance(response, Value): # Check if it's a suitable value (the key matches) if response.key == self.target: # Ensure the Value has not expired. if response.expires < time.time(): # Do not blacklist expired nodes but simply remove them # from the shortlist (handled by the errback). raise ValueError("Expired value returned by %r" % contact) # Cancel outstanding requests. self._cancel_pending_requests() # Ensure the returning contact is removed from the shortlist # (so it's possible to discern the closest non-returning node) if contact in self.shortlist: self.shortlist.remove(contact) # Success! The correct Value has been found. Fire the instance # with the result. self.callback(response) else: # Blacklist the problem contact from the routing table since # it's not behaving properly. self._blacklist(contact) raise ValueError("Value with wrong key returned by %r" % contact) else: # Otherwise it must be a Nodes message containing closer nodes. # Add the returned nodes to the shortlist. Sort the shortlist in # order of closeness to the target and ensure the shortlist never # gets longer than K. candidate_contacts = [candidate for candidate in response.nodes if candidate not in self.shortlist] self.shortlist = sort_contacts(candidate_contacts + self.shortlist, self.target) # Check if the nearest_node remains unchanged. if self.nearest_node == self.shortlist[0]: # Check for remaining pending requests. if not self.pending_requests: # Check all the candidates in the shortlist have been # contacted. candidates = [candidate for candidate in self.shortlist if candidate in self.contacted] if len(candidates) == len(self.shortlist): # There is a result. if self.message_type == FindValue: # Can't find a value at the key. msg = ("Unable to find value for key: %r" % self.target) self.errback(ValueNotFound(msg)) else: # Success! Found nodes close to the specified # target key. self.callback(self.shortlist) else: # There are still un-contacted peers in the shortlist # so restart the lookup in order to check them. self._lookup() else: # There are still pending requests to complete but do not # restart the lookup pass else: # There is a new nearest node. self.nearest_node = self.shortlist[0] # Restart the lookup given the newly found nodes in the # shortlist. self._lookup() def _lookup(self): """ Sends parallel lookup messages to the self.shortlist of contacts. No more than constants.ALPHA nearest nodes that are in self.shortlist but not in self.contacted are sent a message that is an instance of self.message_type. Each request is added to the self.pending_requests list. The length of self.pending_requests must never be more than constants.ALPHA. As each node is contacted it is added to the self.contacted set. """ for contact in self.shortlist: if contact not in self.contacted: # Guard to ensure only ALPHA requests are ever active at any # one time if len(self.pending_requests) >= constants.ALPHA: break uuid, deferred = self.local_node.send_find(contact, self.target, self.message_type) self.pending_requests[uuid] = deferred self.contacted.add(contact) def callback(result): """ Passes the result to the NodeLookup instance to handle. """ self._handle_response(uuid, contact, result) def errback(error): """ Passes the error to the NodeLookup instance to handle. """ self._handle_error(uuid, contact, error) deferred.addCallback(callback) deferred.addErrback(errback)
[docs]class Node(object): """ This class represents a single local node in the DHT encapsulating its presence in the network. All interactions with the DHT network by a client application are performed via this class (or a subclass). """
[docs] def __init__(self, id, client_string='ssl:%s:%d'): """ Initialises the object representing the node with the given id. """ # The node's ID within the distributed hash table. self.id = id # The routing table stores information about other nodes on the DHT. self._routing_table = RoutingTable(id) # The local key/value store containing data held by this node. self._data_store = DictDataStore() # A dictionary of IDs for messages pending a response and associated # deferreds to be fired when a response is completed. self._pending = {} # The template string to use when initiating a connection to another # node on the network. self._client_string = client_string # The version of Drogulus that this node implements. self.version = get_version() log.msg('Initialised node with id: %r' % self.id)
[docs] def join(self, seed_nodes=None): """ Causes the Node to join the DHT network. This should be called before any other DHT operations. The seed_nodes argument must be a list of already known contacts describing existing nodes on the network. """ if not seed_nodes: raise ValueError('Seed nodes required for node to join network') for contact in seed_nodes: self._routing_table.add_contact(contact) # Looking up the node's ID on the network will populate the routing # table with fresh nodes as well as tell us who our nearest neighbours # are. # TODO: Add callback to kick off refresh of k-buckets in future..? raise Exception('FIX ME!') # Ensure the refresh of k-buckets is set up properly. return NodeLookup(self.id, FindNode, self)
[docs] def message_received(self, message, protocol): """ Handles incoming messages. """ # Update the routing table. peer = protocol.transport.getPeer() other_node = Contact(message.node, peer.host, peer.port, message.version, time.time()) log.msg('Message received from %s' % other_node) log.msg(message) self._routing_table.add_contact(other_node) # Sort on message type and pass to handler method. Explicit > implicit. if isinstance(message, Ping): self.handle_ping(message, protocol) elif isinstance(message, Pong): self.handle_pong(message) elif isinstance(message, Store): self.handle_store(message, protocol, other_node) elif isinstance(message, FindNode): self.handle_find_node(message, protocol) elif isinstance(message, FindValue): self.handle_find_value(message, protocol) elif isinstance(message, Error): self.handle_error(message, protocol, other_node) elif isinstance(message, Value): self.handle_value(message, other_node) elif isinstance(message, Nodes): self.handle_nodes(message)
[docs] def send_message(self, contact, message): """ Sends a message to the specified contact, adds it to the _pending dictionary and ensures it times-out after the correct period. If an error occurs the deferred's errback is called. """ d = defer.Deferred() # open network call. client_string = self._client_string % (contact.address, contact.port) client = clientFromString(reactor, client_string) connection = client.connect(DHTFactory(self)) # Ensure the connection will potentially time out. connection_timeout = reactor.callLater(constants.RPC_TIMEOUT, connection.cancel) def on_connect(protocol): # Cancel pending connection_timeout if it's still active. if connection_timeout.active(): connection_timeout.cancel() # Send the message and add a timeout for the response. protocol.sendMessage(message) self._pending[message.uuid] = d reactor.callLater(constants.RESPONSE_TIMEOUT, response_timeout, message, protocol, self) def on_error(error): log.msg('***** ERROR ***** interacting with %s' % contact) log.msg(error) self._routing_table.remove_contact(message.node) if message.uuid in self._pending: del self._pending[message.uuid] d.errback(error) connection.addCallback(on_connect) connection.addErrback(on_error) return d
[docs] def trigger_deferred(self, message, error=False): """ Given a message, will attempt to retrieve the deferred and trigger it with the appropriate callback or errback. """ if message.uuid in self._pending: deferred = self._pending[message.uuid] if error: error.message = message deferred.errback(error) else: deferred.callback(message) # Remove the called deferred from the _pending dictionary. del self._pending[message.uuid]
[docs] def handle_ping(self, message, protocol): """ Handles an incoming Ping message. Returns a Pong message using the referenced protocol object. """ pong = Pong(message.uuid, self.id, self.version) protocol.sendMessage(pong, True)
[docs] def handle_pong(self, message): """ Handles an incoming Pong message. """ self.trigger_deferred(message)
[docs] def handle_store(self, message, protocol, sender): """ Handles an incoming Store message. Checks the provenance and timeliness of the message before storing locally. If there is a problem, removes the untrustworthy peer from the routing table. Otherwise, at REPLICATE_INTERVAL minutes in the future, the local node will attempt to replicate the Store message elsewhere in the DHT if such time is <= the message's expiry time. Sends a Pong message if successful otherwise replies with an appropriate Error. """ # Check provenance is_valid, err_code = validate_message(message) if is_valid: # Ensure the node doesn't already have a more up-to-date version # of the value. current = self._data_store.get(message.key, False) if current and (message.timestamp < current.timestamp): # The node already has a later version of the value so # return an error. details = { 'new_timestamp': '%d' % current.timestamp } raise ValueError(8, constants.ERRORS[8], details, message.uuid) # Good to go, so store value. self._data_store.set_item(message.key, message) # Reply with a pong so the other end updates its routing table. pong = Pong(message.uuid, self.id, self.version) protocol.sendMessage(pong, True) # At some future time attempt to replicate the Store message # around the network IF it is within the message's expiry time. raise Exception("FIX ME!") # Need to check that callLater is called as part of the tests. reactor.callLater(constants.REPLICATE_INTERVAL, self.republish, message) else: # Remove from the routing table. log.msg('Problem with Store command: %d - %s' % (err_code, constants.ERRORS[err_code])) self._routing_table.blacklist(sender) # Return an error. details = { 'message': 'You have been blacklisted.' } raise ValueError(err_code, constants.ERRORS[err_code], details, message.uuid)
[docs] def handle_find_node(self, message, protocol): """ Handles an incoming FindNode message. Finds the details of up to K other nodes closer to the target key that *this* node knows about. Responds with a "Nodes" message containing the list of matching nodes. """ target_key = message.key # List containing tuples of information about the matching contacts. other_nodes = [(n.id, n.address, n.port, n.version) for n in self._routing_table.find_close_nodes(target_key)] result = Nodes(message.uuid, self.id, other_nodes, self.version) protocol.sendMessage(result, True)
[docs] def handle_find_value(self, message, protocol): """ Handles an incoming FindValue message. If the local node contains the value associated with the requested key replies with an appropriate "Value" message. Otherwise, responds with details of up to K other nodes closer to the target key that the local node knows about. In this case a "Nodes" message containing the list of matching nodes is sent to the caller. """ match = self._data_store.get(message.key, False) if match: result = Value(message.uuid, self.id, match.key, match.value, match.timestamp, match.expires, match.public_key, match.name, match.meta, match.sig, match.version) protocol.sendMessage(result, True) else: self.handle_find_node(message, protocol)
[docs] def handle_error(self, message, protocol, sender): """ Handles an incoming Error message. Currently, this simply logs the error and closes the connection. In future this *may* remove the sender from the routing table (depending on the error). """ # TODO: Handle error 8 (out of date data) log.msg('***** ERROR ***** from %s' % sender) log.msg(message)
[docs] def handle_value(self, message, sender): """ Handles an incoming Value message containing a value retrieved from another node on the DHT. Ensures the message is valid and calls the referenced deferred to signal the arrival of the value. TODO: How to handle invalid messages and errback the deferred. """ # Check provenance is_valid, err_code = validate_message(message) if is_valid: self.trigger_deferred(message) else: log.msg('Problem with incoming Value: %d - %s' % (err_code, constants.ERRORS[err_code])) log.msg(message) # Remove the remote node from the routing table. self._routing_table.remove_contact(sender.id, True) error = ValueError(constants.ERRORS[err_code]) self.trigger_deferred(message, error)
[docs] def handle_nodes(self, message): """ Handles an incoming Nodes message containing information about other nodes on the network that are close to a requested key. """ self.trigger_deferred(message)
[docs] def send_ping(self, contact): """ Sends a ping request to the given contact and returns a deferred that is fired when the reply arrives or an error occurs. """ new_uuid = str(uuid4()) ping = Ping(new_uuid, self.id, self.version) return self.send_message(contact, ping)
[docs] def send_store(self, contact, public_key, name, value, timestamp, expires, meta, signature): """ Sends a Store message to the given contact. The value contained within the message is stored against a key derived from the public_key and name. Furthermore, the message is cryptographically signed using the value, timestamp, expires, name and meta values. """ uuid = str(uuid4()) compound_key = construct_key(public_key, name) store = Store(uuid, self.id, compound_key, value, timestamp, expires, public_key, name, meta, signature, self.version) return self.send_message(contact, store)
[docs] def send_find(self, contact, target, message_type): """ Sends a Find[Node|Value] message to the given contact with the intention of obtaining information at the given target key. The type of find message is specified by message_type. """ new_uuid = str(uuid4()) find_message = message_type(new_uuid, self.id, target, self.version) deferred = self.send_message(contact, find_message) return (new_uuid, deferred)
def _process_lookup_result(self, nearest_nodes, public_key, name, value, timestamp, expires, meta, signature, length): """ Given a list of nearest nodes will return a list of send_store based deferreds for the item to be stored in the DHT. The list will contain up to "length" number of deferreds. """ list_of_deferreds = [] for contact in nearest_nodes[:length]: deferred = self.send_store(contact, public_key, name, value, timestamp, expires, meta, signature) list_of_deferreds.append(deferred) return list_of_deferreds
[docs] def replicate(self, public_key, name, value, timestamp, expires, meta, signature, duplicate): """ Will replicate args to "duplicate" number of nodes in the distributed hash table. Returns a deferred that will fire with a list of send_store deferreds when "duplicate" number of closest nodes have been identified. Obviously, the list can be turned in to a deferred_list to fire when the store commands have completed. Even if "duplicate" is > K no more than K items will be contained within the list result. """ if duplicate < 1: # Guard to ensure meaningful duplication count. raise ValueError('Duplication count may not be less than 1') result = defer.Deferred() compound_key = construct_key(public_key, name) lookup = NodeLookup(compound_key, FindNode, self) def on_success(nodes): """ A list of close nodes have been found so send store messages to the "duplicate" closest number of them and fire the "result" deferred with the resulting DeferredList of pending deferreds. """ deferreds = self._process_lookup_result(nodes, public_key, name, value, timestamp, expires, meta, signature, duplicate) result.callback(deferreds) def on_error(error): """ Catch all for errors during the lookup phase. Simply pass them on via the "result" deferred. """ result.errback(error) lookup.addCallback(on_success) lookup.addErrback(on_error) return result
[docs] def retrieve(self, key): """ Given a key, will try to retrieve associated value from the distributed hash table. Returns a deferred that will fire when the operation is complete or failed. As the original Kademlia explains: "For caching purposes, once a lookup succeeds, the requesting node stores the <key, value> pair at the closest node it observed to the key that did not return the value." This method adds a callback to the NodeLookup to achieve this end. """ lookup = NodeLookup(key, FindValue, self) def cache(result): """ Called once the lookup succeeds in order to store the item at the node closest to the key that did not return the value. """ caching_contact = None for candidate in lookup.shortlist: if candidate in lookup.contacted: caching_contact = candidate break if caching_contact: log.msg("Caching to %r" % caching_contact) self.send_store(caching_contact, result.public_key, result.name, result.value, result.timestamp, result.expires, result.meta, result.sig) return result lookup.addCallback(cache) return lookup
[docs] def republish(self, message): """ Will check and republish a locally stored message to the wider network. From the original Kademlia paper: "To ensure the persistence of key-value pairs, nodes must periodically republish keys. Otherwise, two phenomena may cause lookups for valid keys to fail. First, some of the k nodes that initially get a key-value pair when it is published may leave the network. Second, new nodes may join the network with IDs closer to some published key than the nodes on which the key-value pair was originally published. In both cases, the nodes with a key-value pair must republish it so as once again to ensure it is available on the k nodes closest to the key. To compensate for nodes leaving the network, Kademlia republishes each key-value pair once an hour. A naive implementation of this strategy would require many messages - each of up to k nodes storing a key-value pair would perform a node lookup followed by k - 1 STORE RPCs every hour. Fortunately, the republish process can be heavily optimized. First, when a node receives a STORE RPC for a given key-value pair, it assumes the RPC was also issued to the other k - 1 closest nodes, and thus the recipient will not republish the key-value pair in the next hour. This ensures that as long as republication intervals are not exactly synchronized, only one node will republish a given key-value pair every hour. A second optimization avoids performing node lookups before republishing keys. As described in Section 2.4, to handle unbalanced trees, nodes split k-buckets as required to ensure they have complete knowledge of a surrounding subtree with at least k nodes. If, before republishing key-value pairs, a node u refreshes all k-buckets in this subtree of k nodes, it will automatically be able to figure out the k closest nodes to a given key. These bucket refreshes can be amortized over the republication of many keys. To see why a node lookup is unnecessary after u refreshes buckets in the sub-tree of size >= k, it is necessary to consider two cases. If the key being republished falls in the ID range of the subtree, then since the subtree is of size at least k and u has complete knowledge of the subtree, clearly u must know the k closest nodes to the key. If, on the other hand, the key lies outside the subtree, yet u was one of the k closest nodes to the key, it must follow that u's k-buckets for intervals closer to the key than the subtree all have fewer than k entries. Hence, u will know all nodes in these k-buckets, which together with knowledge of the subtree will include the k closest nodes to the key. When a new node joins the system, it must store any key-value pair to which is is one of the k closest. Existing nodes, by similarly exploiting complete knowledge of their surrounding subtrees, will know which key-value pairs the new node should store. Any node learning of a new node therefore issues STORE RPCs to transfer relevant key-value pairs to the new node. To avoid redundant STORE RPCs, however, a node only transfers a key-value pair if it's [sic] own ID is closer to the key than are the IDs of other nodes." Messages are only republished if the following requirements are met: * They still exist in the local data store. * They have not expired. * They have not been updated for REPLICATE_INTERVAL seconds. """ pass