/* ** $Id: cashmere.c,v 1.1 2008/02/01 20:14:43 krishnap Exp $ ** Krishna Puttaswamy ** description: */ #include #include #include #include #include "chimera.h" #include "cashmere.h" #include "log.h" #include "route.h" #include "key.h" #include "dtime.h" #define USAGE "cashmere [ -j bootstrap:port ] port key" #define OPTSTR "j:" #define DEBUG 0 extern char *optarg; extern int optind; #define HELLO_MESSAGE 51 #define SEND_MESSAGE 52 ChimeraHost *driver = NULL; ChimeraState *state; Key key; int seq = -1; Key randomKeys[TEST_NETWORK_SIZE]; int randomKeysCount = 0; int kkk = 1; // start sending from some random node void send_handler (Key * key, Message * msg) { // some randome key and message for now if (msg->type == SEND_MESSAGE) { fprintf(stderr, "GOT send messsage\n"); log_message (state->log, LOG_CASHMERE, "calling cashmere_Send in test file\n"); ///* int i = 0; log_message(state->log, LOG_CASHMERE, "before The randomKeysCount is %d i is %d\n", randomKeysCount, i); i = getRandomNumber()%randomKeysCount; log_message(state->log, LOG_CASHMERE, "The randomKeysCount is %d i is %d\n", randomKeysCount, i); log_message (state->log, LOG_CASHMERE, "sending message to %s\n", randomKeys[i].keystr); cashmere_send (state, randomKeys[i], msg->size, msg->payload); //*/ /* int i = 0; char payload[EVAL_MESSAGE_SIZE]; for( i = 0; i < EVAL_MESSAGE_SIZE - 2; i++) payload[i] = 'x'; payload[i] = '\0'; int kkk = 0; kkk = rand() % randomKeysCount; fprintf(stderr, "Sending cashmere message to %s \n", randomKeys[kkk].keystr); for(i = 0; i < EVAL_MESSAGE_NUMBER + 50; i++) { cashmere_send(state, randomKeys[kkk], EVAL_MESSAGE_SIZE, payload); } */ fflush(stderr); } else if (msg->type == CASHMERE_MESSAGE || msg->type == CASHMERE_REPLY_MESSAGE) { log_message (state->log, LOG_CASHMERE, "calling process_cashmere_message in test file\n"); process_cashmere_message (state, msg, msg->type); } else if (msg->type == CASHMERE_GROUP_BROADCAST_MESSAGE || msg->type == CASHMERE_REPLY_GROUP_BROADCAST_MESSAGE) { log_message (state->log, LOG_CASHMERE, "calling group_broadcast in test file\n"); process_group_broadcast_message (state, msg, msg->type); } } void read_hosts_entries(char *filename) { FILE *fp = fopen ("./hosts", "r"); if (fp == NULL) { perror (filename); exit (1); } char hn[200], keyinput[200]; int port = 0; char s[1000]; randomKeysCount = 0; while (fgets (s, 256, fp) != NULL) { if (randomKeysCount == TEST_NETWORK_SIZE-1) break; sscanf (s, "%s %d %s", hn, &port, keyinput); str_to_key (keyinput, &randomKeys[randomKeysCount]); randomKeysCount++; } fclose (fp); } int main (int argc, char **argv) { int opt; char *hn = NULL; int port = 0, joinport = 0; ChimeraHost *join = NULL; char tmp[256]; int i, j; Message *hello; char *configfile = "./hosts"; while ((opt = getopt (argc, argv, OPTSTR)) != EOF) { switch ((char) opt) { case 'j': for (i = 0; optarg[i] != ':' && i < strlen (optarg); i++); optarg[i] = 0; hn = optarg; sscanf (optarg + (i + 1), "%d", &joinport); break; default: fprintf (stderr, "invalid option %c\n", (char) opt); fprintf (stderr, "usage: %s\n", USAGE); exit (1); } } if ((argc - optind) != 2) { fprintf (stderr, "usage: %s\n", USAGE); exit (1); } port = atoi (argv[optind]); str_to_key (argv[optind + 1], &key); state = chimera_init (port); chimera_setkey (state, key); srand (time (NULL)); if (hn != NULL) { join = host_get (state, hn, joinport); } chimera_register (state, HELLO_MESSAGE, 1); chimera_register (state, SEND_MESSAGE, 1); chimera_register (state, CASHMERE_MESSAGE, 1); chimera_register (state, CASHMERE_REPLY_MESSAGE, 1); chimera_register (state, CASHMERE_GROUP_BROADCAST_MESSAGE, 1); chimera_register (state, CASHMERE_REPLY_GROUP_BROADCAST_MESSAGE, 1); char networkdebug[256], routingdebug[256]; // sprintf(networkdebug, "LOG_NETWORKDEBUG_%s", keyinput.keystr); //sprintf (routingdebug, "LOG_ROUTING_%s", key.keystr); // FILE *networklogfp= fopen(networkdebug, "w"); //FILE *routingfp = fopen (routingdebug, "w"); //log_direct (state->log, LOG_ROUTING, routingfp); // log_direct(state->log, LOG_NETWORKDEBUG, networklogfp); char cashmeredebug[256]; sprintf (cashmeredebug, "logs/LOG_CASHMERE%s", key.keystr); // FILE *cashmerefp = fopen (cashmeredebug, "w"); // log_direct (state->log, LOG_CASHMERE, cashmerefp); //log_direct (state->log, LOG_WARN, stderr); //log_direct (state->log, LOG_ERROR, stderr); cashmere_init (state); read_hosts_entries(configfile); // message_handler (state, SEND_MESSAGE, send_handler, 1); chimera_deliver (state, send_handler); // log_direct(state->log, LOG_ROUTING, stderr); driver = host_get (state, "marrow", 11110); if (join != NULL) { chimera_join (state, join); sprintf (tmp, "%d %s joining with %s:%d", port, key.keystr, hn, joinport); } else { sprintf (tmp, "%d %s starting system", port, key.keystr, hn, joinport); } hello = message_create (driver->key, HELLO_MESSAGE, strlen (tmp) + 1, tmp); while(!message_send (state, driver, hello, TRUE)); free(hello->payload); free (hello); calc_cycles_persec(state); while (1) { sleep (3000); } }