/* ** Krishna Puttaswamy // simple application using the dht layer to do some gets and puts */ #include #include #include #include #include #include #include "dht.h" #include "chimera.h" #include "log.h" #include "key.h" #include "route.h" #include "dtime.h" using namespace std; #define USAGE "dhttest [ -j bootstrap:port ] port key" #define OPTSTR "j:" #define RAND() (lrand48()) #define SEED(s) (srand48(s)) extern char *optarg; extern int optind; char mynodekey[200]; bool read_from_file(FILE *fp, char *block) { // assumption is that the file contains each entry of size entry_size if (fgets(block, ENTRY_SIZE, fp) == NULL) return false; int i = 0; // replace new line -- just to making the output pretty int len = strlen(block); for(i = 0; i < len; i++) if(block[i] == '\n') { block[i] = '\0'; break;; } return true; } int main (int argc, char **argv) { int opt; char *hn = NULL; int port, joinport; ChimeraHost *join = NULL; char tmp[256]; int i, j; Message *hello; bool isbootstrap = false; ChimeraState *state; Key key; int seq = -1; ChimeraHost *driver = NULL; while ((opt = getopt (argc, argv, OPTSTR)) != EOF) { switch ((char) opt) { case 'j': for (i = 0; optarg[i] != ':' && i < strlen (optarg); i++); optarg[i] = 0; hn = optarg; sscanf (optarg + (i + 1), "%d", &joinport); break; default: fprintf (stderr, "invalid option %c\n", (char) opt); fprintf (stderr, "usage: %s\n", USAGE); exit (1); } } if ((argc - optind) != 2) { fprintf (stderr, "usage: %s\n", USAGE); exit (1); } port = atoi (argv[optind]); str_to_key (argv[optind + 1], &key); state = chimera_init (port); chimera_setkey (state, key); strcpy(mynodekey, key.keystr); srand (time (NULL)); if (hn != NULL) join = host_get (state, hn, joinport); else isbootstrap = true; chimera_register (state, HELLO_MESSAGE, 1); //char cashmeredebug[256]; //sprintf (cashmeredebug, "logs/LOG_DATA%s", key.keystr); //FILE *cashmerefp = fopen (cashmeredebug, "w"); //log_direct (state->log, LOG_DATA, cashmerefp); //log_direct (state->log, LOG_WARN, stderr); //log_direct (state->log, LOG_ERROR, stderr); driver = host_get (state, "marrow", 11110); if (join != NULL) { chimera_join (state, join); sprintf (tmp, "%d %s joining with %s:%d", port, key.keystr, hn, joinport); } else { sprintf (tmp, "%d %s starting system", port, key.keystr, hn, joinport); } dhtlayer_init(state, mynodekey); hello = message_create (driver->key, HELLO_MESSAGE, strlen (tmp) + 1, tmp); while(!message_send (state, driver, hello, TRUE)); free (hello); if(isbootstrap){ dsleep(60); fprintf(stderr, "I am the bootstrap node, taking the control!!\n"); FILE *blockFP = fopen(BLOCKS_FILE, "r"); int i = 0; char block[100]; for (i = 0; i < 10; i++){ if (read_from_file(blockFP, block) == false) break; fprintf(stderr, "Storing %s \n", block); Key tempKey; str_to_key(block, &tempKey); DhtMessage *send = get_new_dhtmessage(block, block, mynodekey, NULL); //int size; //char* sendmsg = (char *)send->serialize(&size); //chimera_send(state, tempKey, DHT_PUT_ROOT, size, (char *)sendmsg); //free_dhtmessage(send); chimera_send(state, tempKey, DHT_PUT_ROOT, sizeof(DhtMessage), (char *)send); dsleep(.1); } fclose(blockFP); fprintf(stderr, "Inserted %d blocks into the DHT ...now looking them up\n", i); dsleep(120); fprintf(stderr, "now, should lookup \n"); blockFP = fopen(BLOCKS_FILE, "r"); for (i = 0; i < 10; i++){ if (read_from_file(blockFP, block) == false) break; fprintf(stderr, "Looking for %s\n", block); Key tempKey; str_to_key(block, &tempKey); DhtMessage *send = get_new_dhtmessage(block, block, mynodekey, NULL); //int size; //char* sendmsg = (char *)send->serialize(&size); //chimera_send(state, tempKey, DHT_GET_ROOT, size, (char *)sendmsg); //free_dhtmessage(send); chimera_send(state, tempKey, DHT_GET_ROOT, sizeof(DhtMessage), (char *)send); dsleep(.1); } fclose(blockFP); fprintf(stderr, "Lookedup %d blocks into the DHT \n", i); fflush(stderr); sleep(10); blockFP = fopen(HOSTS_FILE, "r"); int count = 0; while(read_from_file(blockFP, block)){ //fprintf(stderr, "block read is %s \n",block); Key tempKey; char host[100], temp[100]; int kkk; sscanf (block, "%s %d %s", host, &kkk, temp); str_to_key(temp, &tempKey); char str[10] = "test"; chimera_send(state, tempKey, DHT_DUMP_STATE, strlen(str)+1 , (char *)str); count++; dsleep(.1); } fclose(blockFP); fprintf(stderr, "Send %d dump messages \n", count); } while (1) { sleep (1000); } }