/* ** Krishna Puttaswamy ** A simple DHT abstraction on top of Chimera ** supports put/get with replications; the put/get is only for blockids, this is not a complete dht that stores blocks */ ///////////////////// RULE OF THUMB -- to be followed always in this dht app // never ever claim a lock and then send out any application-level message; // this might very well lead to a deadlock if the message comes back to you, especially under churn this is more likely // so, all the code here never does a send while holding a lock. instead, uses a temporary queue to build up out going // messages and send them after the lock is released #include #include #include #include #include #include #include "dht.h" #include "chimera.h" #include "message.h" #include "log.h" #include "key.h" #include "jrb.h" #include "jval.h" #include "dtime.h" #include "route.h" using namespace std; #define RAND() (lrand48()) #define SEED(s) (srand48(s)) ChimeraState *localstate; char mykey[200]; int my_put_count = 0, my_get_count = 0; int ignore_dht_get_nonroot = 0, ignore_dht_get_root = 0, ignore_dht_put_nonroot = 0, ignore_dht_put_root = 0; int found_blocks = 0, missing_blocks = 0; bool dumped = false; //HashTableEntry hashTable[HASH_TABLE_SIZE]; //int hashTableSize = 0; HashMap localHashMap; static Key **closestReplicas; static Key **backupClosestReplicas; pthread_mutex_t repset_lock; pthread_mutex_t hashtable_lock; typedef struct tempstruct{ DhtMessage *dhtmsg; Key *key; }TStruct; // the blockid is the id of the block stored // putpoint is the random point chosen for storing the id in the ring -- the object should be maintained on DHT_REPLICATION number of nodes around this point // source is the source sending the put/get/dht messages DhtMessage* get_new_dhtmessage(char *blockId, char *putpoint, char *source, char *value) { DhtMessage *msg = (DhtMessage *) malloc(sizeof(DhtMessage)); msg->hops = 0; msg->root = false; strcpy(msg->blockId, blockId); strcpy(msg->source, source); if (putpoint == NULL) { msg->putPoint[0] = '\0'; // fprintf(stderr, "blockid: %s copiedblock: %s \n", blockId, msg->blockId); } else { strcpy(msg->putPoint, putpoint); // fprintf(stderr, "blockid: %s putpoint %s copiedblock: %s copiedputpoint: %s \n", blockId, putpoint, msg->blockId, msg->putPoint); } memset((void*)(&msg->value), 0, sizeof(HintValue)); if (value != NULL){ HintValue *val = (HintValue*)value; memcpy(msg->value.symmetricKey, val->symmetricKey, SYMMETRIC_KEY_LENGTH); memcpy(msg->value.symmetricIV, val->symmetricIV, IV_LENGTH); //strcpy(msg->value.connectionID, val->connectionID); strcpy(msg->value.entryCID, val->entryCID); strcpy(msg->value.exitCID, val->exitCID); strcpy(msg->value.nextHop, val->nextHop); msg->value.hintCommand = val->hintCommand; msg->value.connectionHint = val->connectionHint; msg->value.endOfReversePath = val->endOfReversePath; } return msg; } void free_dhtmessage(DhtMessage *msg){ free(msg); } // refresh the block in the dht; void dht_keepalive(ChimeraState* state, char *block) { log_message (state->log, LOG_DATA, "lock 95: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 95: locked\n"); string blockString(block); HashMap::iterator it = localHashMap.find(blockString); // if the entry is not found, then insert an entry if (it == localHashMap.end()) { fprintf(stderr, "ERROR! no entry to refresh on me\n"); } else{ double t = it->second->insertTime; it->second->insertTime = dtime(); HashMap::iterator itt = localHashMap.find(blockString); //fprintf(stderr, "refreshed the block on me from %lf to %lf \n", t, itt->second->insertTime); } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 95: released\n"); } // put the block in the dht; it is assumed that the key is verified to not already exist in the table void dht_put(ChimeraState* state, char *block, bool root, char *putpoint, char* value) { log_message (state->log, LOG_DATA, "lock 58: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 58: locked\n"); string blockString(block); HashMap::iterator it = localHashMap.find(blockString); // if the entry is not found, then insert an entry if (it == localHashMap.end()) { HashTableEntry *entry = (HashTableEntry *)malloc(sizeof(HashTableEntry)); memset(entry, 0, sizeof(HashTableEntry)); strcpy(entry->key, block); strcpy(entry->putPoint, putpoint); entry->root = root; entry->insertTime = dtime(); // get the time when the object was inserted into the local hashtable // copy the value contents if (value != NULL){ HintValue *temp = (HintValue*) value; memcpy(entry->value.symmetricKey, temp->symmetricKey, SYMMETRIC_KEY_LENGTH); memcpy(entry->value.symmetricIV, temp->symmetricIV, IV_LENGTH); strcpy(entry->value.entryCID, temp->entryCID); strcpy(entry->value.exitCID, temp->exitCID); strcpy(entry->value.nextHop, temp->nextHop); entry->value.hintCommand = temp->hintCommand; entry->value.connectionHint = temp->connectionHint; entry->value.endOfReversePath = temp->endOfReversePath; //fprintf(stderr, "the hintcommand in the dht_put is %d\n", temp->hintCommand); } else{ fprintf(stderr, "unfortunately, the value is zero \n"); memset((void*)(&entry->value), 0, sizeof(HintValue)); } localHashMap.insert(pair (blockString, entry)); log_message (state->log, LOG_DATA, "I %s am adding %s to my hashtable size; %d\n", mykey, block, localHashMap.size()); //fprintf(stderr, "inserting connid:%s with randompoint:%s into the hashtable of %s\n", block, putpoint, mykey); } //fprintf(stderr, "putting %s on %s\n", block, mykey); //fprintf(stderr, "%s has one more non-root entry in hashtable hashtablesize: %d\n", mykey, hashTableSize); pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 58: released\n"); } // get is not supported as of now, as we are storing the private/public keys here char* dht_get(ChimeraState *state, char *blockId) { log_message (state->log, LOG_DATA, "lock 131: attempting\n"); pthread_mutex_lock(&hashtable_lock); char *returnstring = NULL; string keyString = string(blockId); HashMap::iterator it = localHashMap.find(keyString); // if the entry is not found, then insert an entry if (it != localHashMap.end()) { returnstring = (char *)(&it->second->value); } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 131: released\n"); return returnstring; } bool is_present(ChimeraState* state, char *key){ int i = 0; bool ret = false; log_message (state->log, LOG_DATA, "lock 88: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 88: locked\n"); string keyString = string(key); HashMap::iterator it = localHashMap.find(keyString); // if the entry is not found, then insert an entry if (it != localHashMap.end()) { ret = true; } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 88: released\n"); return ret; } void dhtlayer_forward_handler (Key ** kp, Message ** mp, ChimeraHost ** hp) { Message *m = *mp; if (m->type == DHT_GET_ROOT || m->type == DHT_PUT_ROOT || m->type == DHT_GET_NONROOT || m->type == DHT_PUT_NONROOT) { DhtMessage *dhtmsg = (DhtMessage *) (m->payload); dhtmsg->hops++; } } /** sort_keys: */ void sort_keys (ChimeraState* state, Key ** hosts, Key key, int size) { int i, j; Key *tmp; Key dif1; Key dif2; int pmatch1 = 0; int pmatch2 = 0; for (i = 0; i < size; i++) { for (j = i + 1; j < size; j++) { if (hosts[i] != NULL && hosts[j] != NULL) { pmatch1 = key_index (state, key, *hosts[i]); pmatch2 = key_index (state, key, *hosts[j]); if (pmatch2 > pmatch1) { tmp = hosts[i]; hosts[i] = hosts[j]; hosts[j] = tmp; } else if (pmatch1 == pmatch2) { key_distance (state, &dif2, hosts[j], &key); if (key_comp (&dif2, &dif1) < 0) { tmp = hosts[i]; hosts[i] = hosts[j]; hosts[j] = tmp; } } } } } } Key** get_closest_nodes(ChimeraState *state, int *rsize) { int i = 0, j =0; Key **leafSetKeys; *rsize = DHT_REPLICATION; int totalsize = 0; RouteGlobal *routeglob = (RouteGlobal *) state->route; int right_leafset_size = 0, left_leafset_size = 0; leafSetKeys = (Key **)malloc((2*DHT_REPLICATION+1) * sizeof(Key *)); log_message (state->log, LOG_DATA, "lock 208: attempting\n"); pthread_mutex_lock (&routeglob->lock); log_message (state->log, LOG_DATA, "lock 208: locked\n"); right_leafset_size = leafset_size (routeglob->rightleafset); totalsize += right_leafset_size; for (i = 0; i < right_leafset_size; i++) { leafSetKeys[i] = (Key *) malloc(sizeof(Key)); key_assign(leafSetKeys[i], routeglob->rightleafset[i]->key); } left_leafset_size = leafset_size (routeglob->leftleafset); totalsize += left_leafset_size; for (j = 0; j < left_leafset_size; j++, i++) { leafSetKeys[i] = (Key *) malloc(sizeof(Key)); key_assign(leafSetKeys[i], routeglob->leftleafset[j]->key); } pthread_mutex_unlock (&routeglob->lock); log_message (state->log, LOG_DATA, "lock 208: released\n"); // fprintf(stderr, "rsize = %d, right_leafset_size = %d, left_leafset_size = %d, totalsize = %d, i = %d \n", *rsize, right_leafset_size, left_leafset_size, totalsize, i); leafSetKeys[i] = NULL; sort_keys (state, leafSetKeys, routeglob->me->key, i); Key **temp; log_message (state->log, LOG_DATA, "lock 235: attempting\n"); pthread_mutex_lock(&repset_lock); log_message (state->log, LOG_DATA, "lock 235: locked\n"); temp = closestReplicas; closestReplicas = backupClosestReplicas; backupClosestReplicas = temp; /* fprintf(stderr, "my: %s replica set is... ", mykey); for(i = 0; i<2*DHT_REPLICATION; i++){ fprintf(stderr, " %s ", leafSetKeys[i]->keystr); } fprintf(stderr, "\n"); */ // its possible that totalsize is < DHT_REPLICATION in the very early stage of building the network for(i = 0; ilog, LOG_DATA, "lock 235: released\n"); for (i = 0; i < totalsize; i++) free(leafSetKeys[i]); free(leafSetKeys); return closestReplicas; } void broadcast_to_replicas(ChimeraState* state, int msg_type, int size, char *payload) { // send it to the DHT_REPLICATION number of ppl closest to you RouteGlobal *routeglob = (RouteGlobal *) state->route; int rsize = DHT_REPLICATION; //replica set size //Key **closestleafsetkeys = get_right_leafset(state, &rsize); // this is to store only on the right side neighbors // by this time, the closestreplicas should be calculated by the other thread; directly access the pointer std::vector outgoingMessages; log_message (state->log, LOG_DATA, "lock 255: attempting\n"); pthread_mutex_lock(&repset_lock); log_message (state->log, LOG_DATA, "lock 255: locked -- repset_lock\n"); int i = 0; if (msg_type == DHT_GET_NONROOT || msg_type == DHT_PUT_NONROOT){ char temp[1000] = ""; char str[100] = ""; if (msg_type == DHT_GET_NONROOT) sprintf(str, "broadcasting get_nonroot msg from: %s to: ", mykey); else if (msg_type == DHT_PUT_NONROOT) sprintf(str, "broadcasting put_nonroot msg from: %s to: ", mykey); strcat(temp, str); for (i = 0; i < rsize; i++) { sprintf(str, " %s ", closestReplicas[i]->keystr); strcat(temp, str); } //fprintf(stderr, "%s\n", temp); } for (i = 0; i < rsize; i++) { Key *tempKey = (Key *)malloc(sizeof(Key)); key_assign(tempKey, *closestReplicas[i]); TStruct *tempst = (TStruct*)malloc(sizeof(TStruct)); tempst->dhtmsg = (DhtMessage *)payload; tempst->key = tempKey; outgoingMessages.push_back(tempst); } pthread_mutex_unlock(&repset_lock); log_message (state->log, LOG_DATA, "lock 255: released\n"); for (i = 0; i < rsize; i++) { TStruct *tempst = outgoingMessages.at(i); log_message (state->log, LOG_DATA, "sending to %d out of %d to %s\n", i+1, rsize, tempst->key->keystr); if (msg_type == DHT_PUT_NONROOT) { chimera_send (state, *(tempst->key), DHT_PUT_NONROOT, size, (char *)tempst->dhtmsg); } else if (msg_type == DHT_GET_NONROOT) { chimera_send (state, *(tempst->key), DHT_GET_NONROOT, size, (char *)tempst->dhtmsg); } else if (msg_type == DHT_REMOVE_ENTRY) { chimera_send (state, *(tempst->key), DHT_REMOVE_ENTRY, size, (char *)tempst->dhtmsg); } else if (msg_type == DHT_KEEPALIVE_NONROOT) { chimera_send (state, *(tempst->key), DHT_KEEPALIVE_NONROOT, size, (char *)tempst->dhtmsg); } else { fprintf(stderr, "ERROR: wrong message type in broadcast to replicas \n"); } log_message (state->log, LOG_DATA, "sent %s\n",tempst->key->keystr); free(tempst->key); free(tempst); } outgoingMessages.clear(); } void printResult(ChimeraState* state, char *blockId, int hops) { if (is_present(state, blockId) == true) { fprintf(stderr, "Found the block %s in %d hops on %s\n", blockId, hops, mykey); found_blocks++; } else { fprintf(stderr, "Block %s is missing even after %d hops on %s\n", blockId, hops, mykey); missing_blocks++; } } bool present_in_replicaset(ChimeraState *state, char *sourceid) { int i = 0; bool ret = false; log_message (state->log, LOG_DATA, "lock 442: attempting\n"); pthread_mutex_lock(&repset_lock); log_message (state->log, LOG_DATA, "lock 442: locked\n"); for (i = 0; i < DHT_REPLICATION; i++) { if (strcmp(closestReplicas[i]->keystr, sourceid) == 0) { ret = true; break; } } pthread_mutex_unlock(&repset_lock); log_message (state->log, LOG_DATA, "lock 442: released\n"); return ret; } void process_dht_message(ChimeraState* state, char *payload, int size, int type) { if(type == DHT_GET_ROOT || type == DHT_GET_NONROOT) { my_get_count++; } if(type == DHT_PUT_ROOT || type == DHT_PUT_NONROOT) { my_put_count++; } Key newKey; if (type == PUT_COMMAND) { DhtMessage *send = get_new_dhtmessage(payload, payload, mykey, NULL); str_to_key(payload, &newKey); chimera_send(state, newKey, DHT_PUT_ROOT, sizeof(DhtMessage), (char *)send); //fprintf(stderr, "sending putpoint is %s orig: %s\n", send->putPoint, payload); free_dhtmessage(send); } else if (type == GET_COMMAND) { //fprintf(stderr, "got a get command for random point %s at %s\n", payload, mykey); DhtMessage *send = get_new_dhtmessage(payload, NULL, mykey, NULL); str_to_key(payload, &newKey); chimera_send(state, newKey, DHT_GET_ROOT, sizeof(DhtMessage), (char *)send); free_dhtmessage(send); } else if (type == DHT_KEEPALIVE_ROOT){ broadcast_to_replicas(state, DHT_KEEPALIVE_NONROOT, size, payload); DhtMessage *dhtmsg = (DhtMessage *) payload; dht_keepalive(state, dhtmsg->blockId); } else if (type == DHT_KEEPALIVE_NONROOT){ DhtMessage *dhtmsg = (DhtMessage *) payload; dht_keepalive(state, dhtmsg->blockId); } else if (type == DHT_PUT_ROOT) { broadcast_to_replicas(state, DHT_PUT_NONROOT, size, payload); DhtMessage *dhtmsg = (DhtMessage *) payload; dht_put(state, dhtmsg->blockId, true, dhtmsg->putPoint, (char *)(&dhtmsg->value)); //fprintf(stderr, "putting %s on %s; the putpoint is %s\n", dhtmsg->blockId, mykey, dhtmsg->putPoint); } else if (type == DHT_PUT_NONROOT) { DhtMessage *dhtmsg = (DhtMessage *) payload; dht_put(state, dhtmsg->blockId, false, dhtmsg->putPoint, (char *)(&dhtmsg->value)); //fprintf(stderr, "inserting %s into the hashtable of %s\n", dhtmsg->putPoint, mykey); } else if (type == DHT_GET_ROOT) { DhtMessage *dhtmsg = (DhtMessage *) payload; //fprintf(stderr, "got a get message at %s for %s \n", mykey, dhtmsg->blockId); broadcast_to_replicas(state, DHT_GET_NONROOT, size, payload); printResult(state, dhtmsg->blockId, dhtmsg->hops); } else if (type == DHT_GET_NONROOT) { DhtMessage *dhtmsg = (DhtMessage *) payload; printResult(state, dhtmsg->blockId, dhtmsg->hops); } else if (type == DHT_DUMP_STATE) { if (!dumped){ dumped = true; fprintf(stderr, "%s has %d entries in its hashtable \n", mykey, localHashMap.size()); fprintf(stderr, "my_get_count: %d my_put_count: %d \n", my_get_count, my_put_count); fprintf(stderr, "ignored counters dht_get_root: %d dht_get_nonroot: %d dht_put_root: %d dht_put_nonroot: %d \n", ignore_dht_get_root, ignore_dht_get_nonroot, ignore_dht_put_root, ignore_dht_put_nonroot); fprintf(stderr, "found_blocks: %d missing_blocks: %d \n", found_blocks, missing_blocks); fflush(stderr); } } else if (type == DHT_REPORT_TO_ROOT){ //check if i am the root of this key //ignore this message, if so //otherwise, become the root and insert this on all your replicas DhtMessage *dhtmsg = (DhtMessage *) payload; int i = 0; bool found = false; bool broadcast = false; log_message (state->log, LOG_DATA, "lock 365: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 365: locked\n"); string keyString = string(dhtmsg->blockId); HashMap::iterator it = localHashMap.find(keyString); // if the entry is not found, then insert an entry if (it != localHashMap.end()) { HashTableEntry* entry = it->second; found = true; if (entry->root == false) { entry->root = true; broadcast = true; fprintf(stderr, "I %s was not the root of %s, but now iam \n", mykey, dhtmsg->blockId); } } // there was no entry, but i am the new root now if (found == false){ broadcast = true; } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 365: released\n"); if (broadcast == true){ //fprintf(stderr, "broadcasting ... \n"); broadcast_to_replicas(state, DHT_PUT_NONROOT, sizeof(DhtMessage), (char *)dhtmsg); if (found == false){ dht_put(state, dhtmsg->blockId, true, dhtmsg->putPoint, (char *)(&dhtmsg->value)); } } //check if the message came from someone not in your replica set; //in that case, send them a delete message if (!present_in_replicaset(localstate, dhtmsg->source)){ fprintf(stderr, "%s is not present in the repset of %s, but reporting key %s; so, sending delete message\n", dhtmsg->source, mykey, dhtmsg->blockId); DhtMessage *send = get_new_dhtmessage(dhtmsg->blockId, NULL, mykey, NULL); Key tempkey; str_to_key(dhtmsg->source, &tempkey); chimera_send (localstate, tempkey, DHT_REMOVE_ENTRY, sizeof(DhtMessage), (char *)send); free(send); } } else if (type == DHT_REMOVE_ENTRY){ DhtMessage *dhtmsg = (DhtMessage *) payload; string keyString = string(dhtmsg->blockId); log_message (state->log, LOG_DATA, "lock 455: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 455: locked\n"); HashMap::iterator it = localHashMap.find(keyString); if (it != localHashMap.end()) { free(it->second); localHashMap.erase(it); fprintf(stderr, "deleting a dht entry that i had\n"); } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 455: released\n"); } } // take care of the messages delivered to a node void dhtlayer_delivery(ChimeraState* state, Message *msg) { if (msg->type == GET_COMMAND){ fprintf(stderr, "%s iam in charge of a get command now \n",mykey); } process_dht_message(localstate, msg->payload, msg->size, msg->type); } bool has_replicaset_changed(ChimeraState *state) { int i = 0; bool ret = false; log_message (state->log, LOG_DATA, "lock 420: attempting\n"); pthread_mutex_lock(&repset_lock); log_message (state->log, LOG_DATA, "lock 420: locked\n"); for (i = 0; i < DHT_REPLICATION; i++) { if (closestReplicas[i] == NULL || backupClosestReplicas[i] == NULL){ fprintf(stderr, "one of them is null\n"); ret = true; break; } if (!key_equal(*(closestReplicas[i]), *(backupClosestReplicas[i]))){ int j = 0; /* fprintf(stderr, "myrepset: %s new:", mykey); for(j = 0; j < DHT_REPLICATION; j++) fprintf(stderr, " %s ", closestReplicas[j]->keystr); fprintf(stderr, "\n"); fprintf(stderr, "myrepset: %s old:", mykey); for(j = 0; j < DHT_REPLICATION; j++) fprintf(stderr, " %s ", backupClosestReplicas[j]->keystr); fprintf(stderr, "\n"); */ ret = true; break; } } pthread_mutex_unlock(&repset_lock); log_message (state->log, LOG_DATA, "lock 420: released\n"); return ret; } void* delete_dead_dht_entires(void *chstate) { ChimeraState *state = (ChimeraState *) chstate; ChimeraGlobal *chglob = (ChimeraGlobal *) state->chimera; while(1){ std::vector deadKeyQueue; std::vector broadcastQueue; log_message (state->log, LOG_DATA, "lock 588: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 588: locked -- hashtable_lock\n"); double now = dtime(); if (localHashMap.size()> 0){ HashMap::iterator it = localHashMap.begin(); for (; it != localHashMap.end(); it++){ if ((now - it->second->insertTime) >= DEFAULT_DHT_ENTRY_LIFETIME){ deadKeyQueue.push_back(it->second->key); if (it->second->root == true){ DhtMessage *msg = get_new_dhtmessage(it->second->key, it->second->putPoint, mykey, (char *)(&it->second->value)); broadcastQueue.push_back(msg); } } } } // delete all the keys that must be deleted int i = 0; for(i = 0; i < deadKeyQueue.size(); i++){ char *delkey = (char *)deadKeyQueue.at(i); HashMap::iterator it = localHashMap.find(delkey); if (it != localHashMap.end()) { free(it->second); localHashMap.erase(it); fprintf(stderr, "deleting a dht entry that i had\n"); } } deadKeyQueue.clear(); pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 588: released\n"); for (i = 0; i < broadcastQueue.size(); i++){ DhtMessage* temp = broadcastQueue.at(i); broadcast_to_replicas(state, DHT_REMOVE_ENTRY, sizeof(DhtMessage), (char *)temp); //fprintf(stderr, "%s removing the key %s for which i am the root\n", mykey, temp->blockId); free_dhtmessage(temp); } broadcastQueue.clear(); sleep(DHT_REMOVE_DEADENTRY_PERIOD); } } // periodically check if you need to update the replicas; report to the root replica; void* maintain_dhtlayer(void *chstate) { ChimeraState *state = (ChimeraState *) chstate; ChimeraGlobal *chglob = (ChimeraGlobal *) state->chimera; int replicaset_size = 0; // no need to store the return, as its global get_closest_nodes(state, &replicaset_size); sleep(DHT_MAINTAIN_PERIOD); // i am doing this to ensure that both the closetReplicas and the backupClosestReplicas are set get_closest_nodes(state, &replicaset_size); sleep(DHT_MAINTAIN_PERIOD); int i = 0; while(1){ TStruct tmpstruct[10]; std::vector outgoingQueue; int nonroot_count = 0; std::vector broadcastQueue; // get the latest update get_closest_nodes(state, &replicaset_size); bool changed = has_replicaset_changed(state); log_message (state->log, LOG_DATA, "lock 465: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 465: locked -- hashtable_lock\n"); int root_count = 0; if (localHashMap.size()> 0){ //if things have changed, reinsert the keys for which i am the root if(changed){ //fprintf(stderr, "The replica_set of %s has changed in the last period... reinserting the replicas\n", mykey); log_message (state->log, LOG_DATA, "lock 465: has changes\n"); HashMap::iterator it = localHashMap.begin(); for (; it != localHashMap.end(); it++){ if (it->second->root == true){ DhtMessage *msg = get_new_dhtmessage(it->second->key, it->second->putPoint, mykey, (char *)(&it->second->value)); broadcastQueue.push_back(msg); root_count++; } } if (root_count > 0) fprintf(stderr, "repset changed: %s should reinsert %d keys that i am the root for \n", mykey, root_count); } } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 465: released\n"); for (i = 0; i < broadcastQueue.size(); i++){ DhtMessage* temp = broadcastQueue.at(i); broadcast_to_replicas(state, DHT_PUT_NONROOT, sizeof(DhtMessage), (char *)temp); fprintf(stderr, "%s reinserting the key %s for which i am the root\n", mykey, temp->blockId); free_dhtmessage(temp); } broadcastQueue.clear(); log_message (state->log, LOG_DATA, "lock 595: attempting\n"); pthread_mutex_lock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 595: locked -- hashtable_lock\n"); if (localHashMap.size()> 0){ //ping the roots of all the other nonroot_count = 0; int root_count = 0; log_message (state->log, LOG_DATA, "lock 595: root reports\n"); HashMap::iterator it = localHashMap.begin(); for (; it != localHashMap.end(); it++){ if (it->second->root == false){ DhtMessage *msg = get_new_dhtmessage(it->second->key, it->second->putPoint, mykey, (char *)(&it->second->value)); Key *tempKey = (Key *)malloc(sizeof(Key)); str_to_key(it->second->putPoint, tempKey); TStruct *tempst = (TStruct *)malloc(sizeof(TStruct)); tempst->dhtmsg = msg; tempst->key = tempKey; outgoingQueue.push_back(tempst); nonroot_count++; } else{ root_count++; log_message (state->log, LOG_DATA, "lock 595: not sending %d out of %d\n", i, localHashMap.size()); } } //fprintf(stderr, "reported %d keys that i am the non-root for, and hashtablesize is %d root_count %d\n", nonroot_count, hashTableSize, root_count); } pthread_mutex_unlock(&hashtable_lock); log_message (state->log, LOG_DATA, "lock 595: released\n"); for(i = 0; i < outgoingQueue.size(); i++) { TStruct *tempst = outgoingQueue.at(i); //chimera_send (state, *(tmpstruct[i].key), DHT_REPORT_TO_ROOT, sizeof(DhtMessage), (char *)tmpstruct[i].dhtmsg); chimera_send (state, *(tempst->key), DHT_REPORT_TO_ROOT, sizeof(DhtMessage), (char *)tempst->dhtmsg); free(tempst->key); free_dhtmessage(tempst->dhtmsg); free(tempst); } outgoingQueue.clear(); sleep(DHT_MAINTAIN_PERIOD); } } // should be called after chimera_init, before any dht calls int dhtlayer_init(ChimeraState * state, char *mykeyin) { chimera_register (state, PUT_COMMAND, 1); chimera_register (state, GET_COMMAND, 1); chimera_register (state, DHT_GET_ROOT, 1); chimera_register (state, DHT_GET_NONROOT, 1); chimera_register (state, DHT_PUT_NONROOT, 1); chimera_register (state, DHT_PUT_ROOT, 1); chimera_register (state, DHT_KEEPALIVE_ROOT, 1); chimera_register (state, DHT_KEEPALIVE_NONROOT, 1); chimera_register (state, DHT_DUMP_STATE, 1); chimera_register (state, DHT_REPORT_TO_ROOT, 1); chimera_register (state, DHT_REMOVE_ENTRY, 1); chimera_register (state, RANDOMWALK_PUT_ROOT, 1); chimera_register (state, RANDOMWALK_KEEPALIVE_ROOT, 1); chimera_register_delivery_handler(state, PUT_COMMAND, dhtlayer_delivery); chimera_register_delivery_handler(state, GET_COMMAND, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_PUT_ROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_KEEPALIVE_ROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_KEEPALIVE_NONROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_GET_ROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_GET_NONROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_PUT_NONROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_DUMP_STATE, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_REPORT_TO_ROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, DHT_REMOVE_ENTRY, dhtlayer_delivery); chimera_register_delivery_handler(state, RANDOMWALK_PUT_ROOT, dhtlayer_delivery); chimera_register_delivery_handler(state, RANDOMWALK_KEEPALIVE_ROOT, dhtlayer_delivery); // chimera_deliver (state, dhtlayer_delivery); // chimera_forward (state, dhtlayer_forward_handler); pthread_mutex_init (&repset_lock, NULL); pthread_mutex_init (&hashtable_lock, NULL); localstate = state; strcpy(mykey, mykeyin); // allocate memory for the repliaset and its backup // +1 is just to put a null in the end so that i know when things go wrong (through seg fault) // rather than accessing some random shit and seg faulting without knowing the reason closestReplicas = (Key **)malloc((DHT_REPLICATION+1) * sizeof(Key *)); backupClosestReplicas = (Key **)malloc((DHT_REPLICATION+1) * sizeof(Key *)); int i = 0; for(i = 0; i< DHT_REPLICATION; i++){ closestReplicas[i] = (Key *) malloc(sizeof(Key)); backupClosestReplicas[i] = (Key *) malloc(sizeof(Key)); } closestReplicas[i] = NULL; backupClosestReplicas[i] = NULL; // sart a thread to maintain the dht pthread_attr_t attr; pthread_attr_t deadattr; pthread_t tid; if (pthread_attr_init (&attr) != 0) { fprintf(stderr, "Error initializing the dhtlayer_thread\n"); return (0); } if (pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM) != 0) goto out; if (pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED) != 0) goto out; if (pthread_create (&tid, &attr, maintain_dhtlayer, (void *) state) != 0) goto out; // sart a thread to delete dead entries from the dht pthread_t deadid; if (pthread_attr_init (&deadattr) != 0) { fprintf(stderr, "Error initializing the delete_dhtlayer_thread\n"); goto out; } if (pthread_attr_setscope (&deadattr, PTHREAD_SCOPE_SYSTEM) != 0) goto out; if (pthread_attr_setdetachstate (&deadattr, PTHREAD_CREATE_DETACHED) != 0) goto out; if (pthread_create (&deadid, &deadattr, delete_dead_dht_entires, (void *) state) != 0) goto out; return (1); out: pthread_attr_destroy (&attr); pthread_attr_destroy (&deadattr); fprintf(stderr, "Error initializing the dhtlayer_thread\n"); return (0); }