drogulus.dht

Contains a simple implementation of the Kademlia distributed hash table.

  • contact.py - defines a contact (another node) on the network.
  • datastore.py - contains basic data storage classes for storing k/v pairs.
  • kbucket.py - defines the “k-buckets” used to track contacts in the network.
  • node.py - defines the local node within the DHT network.
  • routingtable.py - defines the routing table abstraction that contains information about other nodes and their associated states on the DHT network.

drogulus.dht.contact

Defines a contact (another node) on the network.

class drogulus.dht.contact.Contact(id, address, port, version, last_seen=0)[source]

Represents another known node on the network.

__eq__(other)[source]

Override equals to work with a string representation of the contact’s id.

__init__(id, address, port, version, last_seen=0)[source]

Initialises the contact object with its unique id within the DHT, IP address, port, the Drogulus version the contact is running and a timestamp when the last connection was made with the contact (defaults to 0). The id, if passed in as a numeric value, will be converted into a hexadecimal string.

__ne__(other)[source]

Override != to work with a string representation of the contact’s id.

__repr__()[source]

Returns a tuple containing information about this contact.

__str__()[source]

Override the string representation of the object to be something useful.

__weakref__

list of weak references to the object (if defined)

drogulus.dht.datastore

Contains class definitions that define the local data store for the node.

class drogulus.dht.datastore.DataStore[source]

Base class for implementations of the storage mechanism for the DHT.

__delitem__(key)[source]

Delete the specified key and associated value.

__getitem__(key)[source]

Get the value identified by “key”.

__setitem__(key, value)[source]

Convenience wrapper to setItem. This accepts a tuple of the format: (value, lastPublished, originallyPublished, originalPublisherID).

keys()[source]

Return a list of the keys in this data store.

last_updated(key)[source]

Get the time that a key/value pair identified by the key were last updated in this data store.

original_publish_time(key)[source]

Get the time that key was originally published.

original_publisher_id(key)[source]

Get the node ID of the original publisher of the key/value pair identified by “key”.

set_item(key, value)[source]

Set the value of the key/value pair identified by “key”; this should set the “last published” value for the key/value pair to the current time.

class drogulus.dht.datastore.DictDataStore[source]

A datastore using Python’s in-memory dictionary.

__delitem__(key)[source]

Delete the specified key (and its value)

__getitem__(key)[source]

Get the value identified by key.

keys()[source]

Return a list of the keys in this data store.

last_updated(key)[source]

Get the time the key/value pair identified by key was last updated in this data store.

original_publish_time(key)[source]

Get the time the key/value pair identified by key was originally published

original_publisher_id(key)[source]

Get the original publisher of the data’s node ID.

set_item(key, value)[source]

Set the value of the key/value pair identified by key.

drogulus.dht.kbucket

Defines contact related storage (the so called k-buckets).

class drogulus.dht.kbucket.KBucket(range_min, range_max)[source]

A bucket to store contact information about other nodes in the network. From the original Kademlia paper:

“Kademlia nodes store contact information about each other to route query messages. For each 0 <= i < 160, every node keeps a list of <IP address, port, Node ID> triples for nodes of distance between 2i and 2(i+1) from itself. We call these lists k-buckets. Each k-bucket is kept sorted by time last seen – least-recently seen node at the head, most-recently seen at the tail. For small values of i, the k-buckets will generally be empty (as no appropriate nodes will exist). For large values of i, the lists can grow to size k, where k is a system-wide replication parameter. k is chosen such that any given k nodes are very unlikely to fail within an hour of each other (for example k = 20)”

Nota Bene: This implementation of Kademlia uses a 512 bit key space based upon SHA512 rather than the original 160 bit SHA1 implementation, so i will be < 512.

__init__(range_min, range_max)[source]

Initialises the object with the lower / upper bound limits of the k-bucket’s 512-bit ID space.

__len__()[source]

Returns the number of contacts stored in this k-bucket.

__weakref__

list of weak references to the object (if defined)

add_contact(contact)[source]

Adds a contact to the k-bucket. If this is a new contact then it will be appended to the _contacts list. If the contact is already in the k-bucket then it is moved to the end of the _contacts list. The most recently seen contact is always at the end of the _contacts list. If the size of the k-bucket exceeds the constant k then a KBucketFull exception is raised.

get_contact(id)[source]

Returns a contact stored in the k-bucket with the given id. Will raise a ValueError if the contact is not in the k-bucket (the default behaviour of calling index with a value that’s not in the list).

get_contacts(count=0, exclude_contact=None)[source]

Returns a list of up to “count” number of contacts within the k-bucket. If “count” is zero or less, then all contacts will be returned. If there are less than “count” number of contacts in the k-bucket, all contacts will be returned.

If “exclude_contact” is passed (as either a Contact instance or id str) then, if this is found within the list of returned values, it will be discarded before the result is returned.

key_in_range(key)[source]

Checks if a key is within the range covered by this k-bucket. Returns a boolean to indicate if a certain key should be placed within this k-bucket.

remove_contact(id)[source]

Removes a contact with the given id from the k-bucket.

exception drogulus.dht.kbucket.KBucketFull[source]

Raised when the bucket is full.

__weakref__

list of weak references to the object (if defined)

drogulus.dht.node

Contains code that defines the behaviour of the local node in the DHT network.

class drogulus.dht.node.Node(id, client_string='ssl:%s:%d')[source]

This class represents a single local node in the DHT encapsulating its presence in the network.

All interactions with the DHT network by a client application are performed via this class (or a subclass).

__init__(id, client_string='ssl:%s:%d')[source]

Initialises the object representing the node with the given id.

__weakref__

list of weak references to the object (if defined)

handle_error(message, protocol, sender)[source]

Handles an incoming Error message. Currently, this simply logs the error and closes the connection. In future this may remove the sender from the routing table (depending on the error).

handle_find_node(message, protocol)[source]

Handles an incoming FindNode message. Finds the details of up to K other nodes closer to the target key that this node knows about. Responds with a “Nodes” message containing the list of matching nodes.

handle_find_value(message, protocol)[source]

Handles an incoming FindValue message. If the local node contains the value associated with the requested key replies with an appropriate “Value” message. Otherwise, responds with details of up to K other nodes closer to the target key that the local node knows about. In this case a “Nodes” message containing the list of matching nodes is sent to the caller.

handle_nodes(message)[source]

Handles an incoming Nodes message containing information about other nodes on the network that are close to a requested key.

handle_ping(message, protocol)[source]

Handles an incoming Ping message. Returns a Pong message using the referenced protocol object.

handle_pong(message)[source]

Handles an incoming Pong message.

handle_store(message, protocol, sender)[source]

Handles an incoming Store message. Checks the provenance and timeliness of the message before storing locally. If there is a problem, removes the untrustworthy peer from the routing table. Otherwise, at REPLICATE_INTERVAL minutes in the future, the local node will attempt to replicate the Store message elsewhere in the DHT if such time is <= the message’s expiry time.

Sends a Pong message if successful otherwise replies with an appropriate Error.

handle_value(message, sender)[source]

Handles an incoming Value message containing a value retrieved from another node on the DHT. Ensures the message is valid and calls the referenced deferred to signal the arrival of the value.

TODO: How to handle invalid messages and errback the deferred.

join(seed_nodes=None)[source]

Causes the Node to join the DHT network. This should be called before any other DHT operations. The seed_nodes argument must be a list of already known contacts describing existing nodes on the network.

message_received(message, protocol)[source]

Handles incoming messages.

replicate(public_key, name, value, timestamp, expires, meta, signature, duplicate)[source]

Will replicate args to “duplicate” number of nodes in the distributed hash table. Returns a deferred that will fire with a list of send_store deferreds when “duplicate” number of closest nodes have been identified.

Obviously, the list can be turned in to a deferred_list to fire when the store commands have completed.

Even if “duplicate” is > K no more than K items will be contained within the list result.

republish(message)[source]

Will check and republish a locally stored message to the wider network.

From the original Kademlia paper:

“To ensure the persistence of key-value pairs, nodes must periodically republish keys. Otherwise, two phenomena may cause lookups for valid keys to fail. First, some of the k nodes that initially get a key-value pair when it is published may leave the network. Second, new nodes may join the network with IDs closer to some published key than the nodes on which the key-value pair was originally published. In both cases, the nodes with a key-value pair must republish it so as once again to ensure it is available on the k nodes closest to the key.

To compensate for nodes leaving the network, Kademlia republishes each key-value pair once an hour. A naive implementation of this strategy would require many messages - each of up to k nodes storing a key-value pair would perform a node lookup followed by k - 1 STORE RPCs every hour. Fortunately, the republish process can be heavily optimized. First, when a node receives a STORE RPC for a given key-value pair, it assumes the RPC was also issued to the other k - 1 closest nodes, and thus the recipient will not republish the key-value pair in the next hour. This ensures that as long as republication intervals are not exactly synchronized, only one node will republish a given key-value pair every hour.

A second optimization avoids performing node lookups before republishing keys. As described in Section 2.4, to handle unbalanced trees, nodes split k-buckets as required to ensure they have complete knowledge of a surrounding subtree with at least k nodes. If, before republishing key-value pairs, a node u refreshes all k-buckets in this subtree of k nodes, it will automatically be able to figure out the k closest nodes to a given key. These bucket refreshes can be amortized over the republication of many keys.

To see why a node lookup is unnecessary after u refreshes buckets in the sub-tree of size >= k, it is necessary to consider two cases. If the key being republished falls in the ID range of the subtree, then since the subtree is of size at least k and u has complete knowledge of the subtree, clearly u must know the k closest nodes to the key. If, on the other hand, the key lies outside the subtree, yet u was one of the k closest nodes to the key, it must follow that u’s k-buckets for intervals closer to the key than the subtree all have fewer than k entries. Hence, u will know all nodes in these k-buckets, which together with knowledge of the subtree will include the k closest nodes to the key.

When a new node joins the system, it must store any key-value pair to which is is one of the k closest. Existing nodes, by similarly exploiting complete knowledge of their surrounding subtrees, will know which key-value pairs the new node should store. Any node learning of a new node therefore issues STORE RPCs to transfer relevant key-value pairs to the new node. To avoid redundant STORE RPCs, however, a node only transfers a key-value pair if it’s [sic] own ID is closer to the key than are the IDs of other nodes.”

Messages are only republished if the following requirements are met:

  • They still exist in the local data store.
  • They have not expired.
  • They have not been updated for REPLICATE_INTERVAL seconds.
retrieve(key)[source]

Given a key, will try to retrieve associated value from the distributed hash table. Returns a deferred that will fire when the operation is complete or failed.

As the original Kademlia explains:

“For caching purposes, once a lookup succeeds, the requesting node stores the <key, value> pair at the closest node it observed to the key that did not return the value.”

This method adds a callback to the NodeLookup to achieve this end.

send_find(contact, target, message_type)[source]

Sends a Find[Node|Value] message to the given contact with the intention of obtaining information at the given target key. The type of find message is specified by message_type.

send_message(contact, message)[source]

Sends a message to the specified contact, adds it to the _pending dictionary and ensures it times-out after the correct period. If an error occurs the deferred’s errback is called.

send_ping(contact)[source]

Sends a ping request to the given contact and returns a deferred that is fired when the reply arrives or an error occurs.

send_store(contact, public_key, name, value, timestamp, expires, meta, signature)[source]

Sends a Store message to the given contact. The value contained within the message is stored against a key derived from the public_key and name. Furthermore, the message is cryptographically signed using the value, timestamp, expires, name and meta values.

trigger_deferred(message, error=False)[source]

Given a message, will attempt to retrieve the deferred and trigger it with the appropriate callback or errback.

class drogulus.dht.node.NodeLookup(target, message_type, local_node, timeout=600, canceller=None)[source]

Encapsulates a lookup in the DHT given a particular target key and message type. Will callback when a result is found or errback otherwise. If defined, will timeout with an errback.

From the original Kademlia paper:

“The most important procedure a Kademlia participant must perform is to locate the k closest nodes to some given node ID. We call this procedure a node lookup. Kademlia employs a recursive algorithm for node lookups. The lookup initiator starts by picking α nodes from its closest non-empty k-bucket (or, if that bucket has fewer than α entries, it just takes the α closest nodes it knows of). The initiator then sends parallel, asynchronous FIND NODE RPCs to the α nodes it has chosen. α is a system-wide concurrency parameter, such as 3.

In the recursive step, the initiator resends the FIND NODE to nodes it has learned about from previous RPCs. (This recursion can begin before all α of the previous RPCs have returned). Of the k nodes the initiator has heard of closest to the target, it picks α that it has not yet queried and resends the FIND NODE RPC to them. Nodes that fail to respond quickly are removed from consideration until and unless they do respond. If a round of FIND NODEs fails to return a node any closer than the closest already seen, the initiator resends the FIND NODE to all of the k closest nodes it has not already queried. The lookup terminates when the initiator has queried and gotten responses from the k closest nodes it has seen. When α = 1 the lookup algorithm resembles Chord’s in terms of message cost and the latency of detecting failed nodes. However, Kademlia can route for lower latency because it has the flexibility of choosing any one of k nodes to forward a request to.”

READ THIS CAREFULLY! Here’s how this implementation works:

self.target - the target key for the lookup. self.message_type - the message class (either FindNode or FindValue). self.local_node - the local node that created the NodeLookup. self.shortlist - an ordered list containing nodes close to the target. self.contacted - a set of nodes that have been contacted for this lookup. self.nearest_node - the node nearest to the target so far. self.pending_requests - a dictionary of currently pending requests. constants.ALPHA - the number of concurrent asynchronous calls allowed. constants.K - the number of closest nodes to return when complete. constants.LOOKUP_TIMEOUT - the default maximum duration for a lookup.

  1. If “timeout” number of seconds elapse before the lookup is finished then cancel any pending requests and errback with an OutOfTime error. The “timeout” value can be overridden but defaults to constants.LOOKUP_TIMEOUT seconds.
  2. Locally known nodes from the routing table seed self.shortlist.
  3. The nearest node to the target in self.shortlist is set as self.nearest_node.
  4. No more than constants.ALPHA nearest nodes that are in self.shortlist but not in self.contacted are sent a message that is an instance of self.message_type. Each request is added to the self.pending_requests list. The length of self.pending_requests must never be more than constants.ALPHA.
  5. As each node is contacted it is added to the self.contacted set.
  6. If a node doesn’t reply or an error is encountered it is removed from self.shortlist and self.pending_requests. Start from step 3 again.
  7. When a response to a request is returned successfully remove the request from self.pending_requests.
  8. If it’s a FindValue message and a suitable value is returned (see note at the end of these comments) cancel all the other pending calls in self.pending_requests and fire a callback with the returned value. If the value is invalid remove the node from self.shortlist and start from step 3 again without cancelling the other pending calls.
  9. If a list of closer nodes is returned by a peer add them to self.shortlist and sort - making sure nodes in self.contacted are not mistakenly re-added to the shortlist.
  10. If the nearest node in the newly sorted self.shortlist is closer to the target than self.nearest_node then set self.nearest_node to the new closer node and start from step 3 again.
  11. If self.nearest_node remains unchanged DO NOT start a new call.
  12. If there are no other requests in self.pending_requests then check that the constants.K nearest nodes in the self.contacted set are all closer than the nearest node in self.shortlist. If they are, and it’s a FindNode message call back with the constants.K nearest nodes in the self.contacted set. If the message is a FindValue, errback with a ValueNotFound error.
  13. If there are still nearer nodes in self.shortlist to some of those in the constants.K nearest nodes in the self.contacted set then start from step 3 again (forcing the local node to contact the close nodes that have yet to be contacted).

Note on validating values: In the future there may be constraints added to the FindValue query (such as only accepting values created after time T).

__init__(target, message_type, local_node, timeout=600, canceller=None)[source]

Sets up the lookup to search for a certain target key with a particular message_type using the DHT state found in the local_node. Will cancel after timeout seconds. See the documentation for twisted.internet.defer.Deferred for explanation of canceller.

cancel()[source]

Cancels this lookup in a clean fashion. This function is dedicated to @terrycojones whose efforts at cancelling deferreds deserve some sort of tribute. ;-)

exception drogulus.dht.node.RoutingTableEmpty[source]

Fired when a lookup is attempted without any peers in the local node’s routing table.

__weakref__

list of weak references to the object (if defined)

exception drogulus.dht.node.ValueNotFound[source]

Fired when a NodeLookup cannot find a value associated with a specified key.

__weakref__

list of weak references to the object (if defined)

drogulus.dht.node.response_timeout(message, protocol, node)[source]

Called when a pending message (identified with a uuid) awaiting a response via a given protocol object times-out. Closes the connection and removes the deferred from the “pending” dictionary.

drogulus.dht.routingtable

Contains code that represents Kademlia’s routing table tree structure.

class drogulus.dht.routingtable.RoutingTable(parent_node_id)[source]

From the original paper:

“The routing table is a binary tree whose leaves are k-buckets. Each k-bucket contains nodes with some common prefix in their ID. The prefix is the k-bucket’s position in the the binary tree. Thus, each k-bucket covers some range of the ID space, and together the k-buckets cover the entire 512-bit ID space with no overlap.”

__init__(parent_node_id)[source]

The parentNodeID is the 512-bit ID of the node to which this routing table belongs.

__weakref__

list of weak references to the object (if defined)

add_contact(contact)[source]

Add the given contact to the correct k-bucket; if it already exists, its status will be updated.

blacklist(contact)[source]

Marks the referenced contact as blacklisted because it has misbehaved in some way. For example, it may have attempted to propagate a non valid value or responded to a node lookup with an incorrect response. Once blacklisted a contact is never allowed to be in the routing table or replacement cache.

find_close_nodes(key, rpc_node_id=None)[source]

Finds up to “K” number of known nodes closest to the node/value with the specified key. If rpc_node_id is supplied the referenced node will be excluded from the returned contacts.

The result is a list of “K” node contacts of type dht.contact.Contact. Will only return fewer than “K” contacts if not enough contacts are known.

The result is ordered from closest to furthest away from the target key.

get_contact(contact_id)[source]

Returns the (known) contact with the specified node ID. Will raise a ValueError if no contact with the specified ID is known.

get_refresh_list(start_index=0, force=False)[source]

Finds all k-buckets that need refreshing, starting at the k-bucket with the specified index. This bucket and those further away from it will be refreshed. Returns node IDs to be searched for in order to refresh those k-buckets in the routing table. If the “force” parameter is True then all buckets with the specified range will be refreshed, regardless of the time they were last accessed.

remove_contact(contact_id, forced=False)[source]

Attempt to remove the contact with the specified contactID from the routing table.

The operation will only succeed if either the number of failed RPCs made against the contact is >= constants.ALLOWED_RPC_FAILS or the ‘forced’ flag is set to True (defaults to False).

If there are any contacts in the replacement cache for the affected bucket then the most up-to-date contact in the replacement cache will be used as a replacement.

touch_kbucket(key)[source]

Update the lastAccessed timestamp of the k-bucket which covers the range containing the specified key string in the key/ID space.

The lastAccessed field is used to ensure a k-bucket doesn’t become stale.