#include "socket.h" #include #ifdef TCP_DEBUG #include "debug.h" #endif extern SockIdList globalSockets; /*-----------------Event Handler--------------------------- */ void handleMessage(mRMPEvent *in) { mMessage *mmsg; MessageData *msgs, *newMessage; RMPGroup *grp; char *groupName, *ptr; SockIdList sockEntry; int i; switch (in->returnRMPEventType()) { case MESSAGE: #ifdef TCP_DEBUG cout << " Message ID (" << in->returnData()->returnMessageID() << "),"; cout << " Message Length (" << in->returnMessageLength(); cout << "), Handler (0x"; cout << hex << in->returnMessageHandler() << dec << "), QOS ("; cout << (u_long)in->returnMessageQOS() << ")" << endl; cout << " Message Source (" << in->returnMessageSourceIP(); cout << ")(" << returnTextAddress(in->returnMessageSourceIP()); cout << "), Port (" << in->returnMessageSourcePort() << ")" << endl; cout << " Message: Number of Buffers(" << in->returnData()->returnNumOfBuffers() << "), Reply requested (" << in->replyRequested() << ")" << endl; cout << " Is a reply (" << in->isAReply() << "), Reply to message (" << in->replyToMessage() << ")" << endl; #endif grp=in->returnRMPGroupForEvent(); CRITICALSTART(); groupName=grp->returnGroupName(); sockEntry = globalSockets; while(sockEntry) { if((sockEntry->group != NULL) && !strcmp(groupName, sockEntry->group->returnRMPGroup()->returnGroupName())) { mmsg = in->returnData(); newMessage = (MessageData *)malloc(sizeof(MessageData)); newMessage->length = mmsg->returnMessageLength(); newMessage->data = (char*)malloc(sizeof(char)*mmsg->returnMessageLength()); newMessage->next = NULL; ptr = newMessage->data; for (i=0; ireturnNumOfBuffers(); i++) { memcpy(ptr, mmsg->returnNextMessage(), mmsg->returnNextMessageLength()); ptr += mmsg->returnNextMessageLength(); } msgs = sockEntry->messages; if (msgs == NULL) sockEntry->messages = newMessage; else { while (msgs->next != NULL) msgs = msgs->next; msgs->next = newMessage; } } sockEntry = sockEntry->next; } break; case SUB_MESSAGE: grp=in->returnRMPGroupForEvent(); CRITICALSTART(); groupName=grp->returnGroupName(); sockEntry = globalSockets; while(sockEntry) { if((sockEntry->group != NULL) && !strcmp(groupName, sockEntry->group->returnRMPGroup()->returnGroupName())) { mmsg = in->returnData(); newMessage = (MessageData *)malloc(sizeof(MessageData)); newMessage->length = mmsg->returnMessageLength(); newMessage->data = (char*)malloc(sizeof(char)*mmsg->returnNextMessageLength()); newMessage->next = NULL; memcpy(ptr, mmsg->returnNextMessage(), mmsg->returnNextMessageLength()); msgs = sockEntry->messages; if (msgs == NULL) sockEntry->messages = newMessage; else { while (msgs->next != NULL) msgs = msgs->next; msgs->next = newMessage; } } sockEntry = sockEntry->next; } break; case FORMED_OWN_GROUP: /* Event is a FORMED_OWN_GROUP: print group list */ cout << " Formed own group." << endl; printGroupList(in); grp=in->returnRMPGroupForEvent(); groupName=grp->returnGroupName(); sockEntry = GetEntryByGroup(groupName); sockEntry->state = WAITING; break; case LEFT_GROUP: /* Event is a LEFT_GROUP event. Release group */ cout << " Left group." << endl; grp=in->returnRMPGroupForEvent(); groupName=grp->returnGroupName(); sockEntry = GetEntryByGroup(groupName); in->returnRMPGroupForEvent()->release(); sockEntry->group = NULL; break; case MEMBERSHIP_CHANGE: /* Event is a MEMBERSHIP_CHANGE: print group list */ cout << " Membership Change." << endl; printGroupList(in); break; case JOINED_GROUP: /* Event is a JOINED_GROUP: print group list */ cout << " Joined Group." << endl; printGroupList(in); grp=in->returnRMPGroupForEvent(); groupName=grp->returnGroupName(); sockEntry = globalSockets; while(sockEntry) { if((sockEntry->group != NULL) && !strcmp(sockEntry->group->returnRMPGroup()->returnGroupName(), groupName)) { sockEntry->state = CONNECTED; } sockEntry = sockEntry->next; } break; case FAILURE_RECOVERY: /* Event is a FAILURE_RECOVERY. Print group list */ cout << " Successful Reformation." << endl; printGroupList(in); break; case ATOMICITY_VIOLATED: /* Event is a ATOMICITY_VIOLATED. Print group list */ cout << " Successful Reformation with Possible Atomicity Violations."; cout << endl; printGroupList(in); break; case FAILED_RECOVERY: /* Event is a FAILED_RECOVERY. */ cout << " Failed Reformation. Exiting" << endl; exit(0); break; case FAILURE_ABORT: /* Event is a FAILURE_ABORT. Reformation is not active */ cout << " Failure detected. Left group." << endl; exit(0); break; default: break; } }