#include "socket.h" #include #ifdef TCP_DEBUG #include "debug.h" #endif extern int globalInit; extern SockIdList globalSockets; /*--------------TCP Interface Procedures -----------------------*/ /* Allocates and returns new socket descriptor. Also creates new * RMP process and adds a new sd_Table entry. Calls socket() to * reserve that descriptor number. */ int RMPSocket() { SockIdList sock = (SockIdList)malloc(sizeof(struct _sockIdList)); if (globalInit == 0) { RMPInit(); } if((sock->sd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { free(sock); return -1; } sock->group = NULL; sock->ttl = 1; sock->qos = QOS_TOTALLY_ORDERED; sock->messages = NULL; sock->state = NOT_CONNECTED; sock->mode = BLOCKING; CRITICALSTART(); sock->next = globalSockets; globalSockets = sock; CRITICALSTOP(); return sock->sd; } /* Connects to a group somewhere, or creates one, by calling RMP::join() * returns: -1 - fail to connect, timeout * 0 - sockId doesn't exist * 1 - connection established */ int RMPConnect(int sockId, char *groupName) { mRMP *sockEntryProcess; SockIdList sockEntry = GetEntryBySID(sockId); int ttlval; if(!sockEntry) return 0; if(sockEntry->group != NULL) return 0; if((ttlval=sockEntry->ttl) > 1) { sockEntry->group = mRMP::join(groupName,0,0,ttlval); if (sockEntry->group == NULL) { DeleteEntryBySID(sockId); return -1; } } else { if ((sockEntry->group = mRMP::join(groupName, NULL, 1)) == NULL){ DeleteEntryBySID(sockId); return -1; } } return 1; } /* Allows the user to set socket options. */ int RMPSetSockOpt(int sockId, int option, void *value, int len) { SockIdList sockEntry = GetEntryBySID(sockId); if (sockEntry == NULL) { return -1; } switch(option) { case QOS_OPT: sockEntry->qos = *(QOS *)value; return 1; case TTL_OPT: sockEntry->ttl = *(unsigned char *)value; return 1; case MODE_OPT: sockEntry->mode = *(MODE *)value; return 1; default: return -1; } } /* Allows user to query socket options. */ int RMPGetSockOpt(int sockId, int option, void *value, int *len) { SockIdList sockEntry = GetEntryBySID(sockId); if (sockEntry == NULL) { return -1; } switch(option) { case QOS_OPT: *(QOS *)value = sockEntry->qos; *len = sizeof(QOS); return 1; case TTL_OPT: *(unsigned char*)value= sockEntry->ttl; *len = sizeof(unsigned char); return 1; case MODE_OPT: *(MODE *)value = sockEntry->mode; *len = sizeof(MODE); return 1; default: return -1; } } /* Event driven socket selection * Return values: -1 -- failure, * 0 -- timeout, * sd -- success. * * struct timeval *timeout, numSockIds, SockId, SockId, ... */ int RMPSelect (int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) { int found = 0, i; fd_set myreadfds, mywritefds; struct timeval start, zero; struct timezone tz; SockIdList sockEntry; mRMPEvent *e; FlowControlStatus flow; zero.tv_sec=0; zero.tv_usec=0; if ((readfds == NULL) || (nfds == 0)) { return found; } memcpy((char *)&myreadfds, (char *)readfds, sizeof(fd_set)); if (writefds != NULL) memcpy((char *)&mywritefds, (char *)writefds, sizeof(fd_set)); gettimeofday(&start, (void*)&tz); do { e = mRMP::mRMPLoop(nfds, readfds, writefds, exceptfds, &zero); if(e != NULL) { handleMessage(e); e->release(); } for (i=0; i<=nfds; i++) { if((timeout != NULL) && !NotTimeOut(*timeout, start)) { return found; } if (FD_ISSET(i, &myreadfds)) { sockEntry = GetEntryBySID(i); if ((sockEntry != NULL) && (sockEntry->messages != NULL)) { FD_CLR(i, &myreadfds); FD_SET(i, readfds); found++; } else { FD_CLR(i, readfds); } } if ((writefds != NULL) && (FD_ISSET(i, &mywritefds))) { sockEntry = GetEntryBySID(i); if (sockEntry != NULL && sockEntry->group != NULL) { flow = sockEntry->group->queryFlowControl(1); if ((flow == FLOW_CONTROL_OK) && (FD_ISSET(i, &mywritefds))) { FD_CLR(i, &mywritefds); FD_SET(i, writefds); found++; } else { FD_CLR(i, writefds); } } else { if(FD_ISSET(i, &mywritefds)) { FD_CLR(i, writefds); } } } } } while ((found == 0) && (timeout == NULL || NotTimeOut(*timeout, start))); return found; } /* Reads nbytes bytes from socket sockId to buffer buf. * returns: -1 - sockId doesn't exist * 0 - * n - # of bytes read */ int RMPRead(int sockId, char *buf, int nbytes) { SockIdList sockEntry = GetEntryBySID(sockId); MessageData *message; int bytesread, len; fd_set readfds; MODE value; if ((sockEntry == NULL) || (buf == NULL) || (nbytes < 0)) { return -1; } if (sockEntry->group == NULL) return 0; FD_SET(sockId, &readfds); while(1) { if (sockEntry->messages != NULL) { if (sockEntry->messages->length <= nbytes) { memcpy(buf, sockEntry->messages->data, sockEntry->messages->length); message = sockEntry->messages; bytesread = sockEntry->messages->length; sockEntry->messages = sockEntry->messages->next; free(message->data); free(message); return bytesread; } else { memcpy(buf, sockEntry->messages->data, nbytes); memcpy(sockEntry->messages->data, sockEntry->messages->data + nbytes, sockEntry->messages->length - nbytes); sockEntry->messages->length -= nbytes; return nbytes; } } else { RMPSelect(sockId, &readfds, NULL, NULL, NULL); RMPGetSockOpt(sockId, MODE_OPT, &value, &len); if(value == NON_BLOCKING) return 0; } } } /* Writs nbytes bytes from buffer buf to socket sockId. */ int RMPWrite(int sockId, char *buf, int nbytes) { SockIdList sockEntry = GetEntryBySID(sockId); mRMPGroup *grp; int qosval, len; FlowControlStatus flow; MessageData *msgs, *newMessage; MODE value; mRMPEvent *event; struct timeval time = {0,0}; if(sockEntry == NULL) return -1; if(sockEntry->group == NULL) return 0; qosval = sockEntry->qos; grp = sockEntry->group; RMPGetSockOpt(sockId, MODE_OPT, &value, &len); if(value == BLOCKING) while((flow=grp->queryFlowControl(1)) != FLOW_CONTROL_OK) { event = mRMP::mRMPLoop(0, NULL, NULL, NULL, &time); if (event != NULL) { handleMessage(event); event->release(); } } else flow=grp->queryFlowControl(1); if(flow !=FLOW_CONTROL_FULL) { /* send the message */ switch(grp->send((void*)buf, nbytes, qosval, 0, FALSE, 0, FALSE)) { case DATA_BUFFERED: case FLOW_CONTROL_OK: case FLOW_CONTROL_WARNING: break; case FLOW_CONTROL_FULL: case BAD_TOKEN_RING_ERROR: case ARGUMENT_ERROR: case SYSTEM_ERROR: DeleteEntryBySID(sockId); return -1; break; } } else { return -1; } CRITICALSTART(); sockEntry = globalSockets; while(sockEntry) { if((sockEntry->group != NULL) && !strcmp(sockEntry->group->returnRMPGroup()->returnGroupName(), grp->returnRMPGroup()->returnGroupName()) && sockEntry->sd != sockId) { newMessage = (MessageData *)malloc(sizeof(MessageData)); newMessage->length = nbytes; newMessage->data = (char*)malloc(sizeof(char)*newMessage->length); newMessage->next = NULL; memcpy(newMessage->data, buf, newMessage->length); msgs = sockEntry->messages; if (msgs == NULL) sockEntry->messages = newMessage; else { while (msgs->next != NULL) msgs = msgs->next; msgs->next = newMessage; } } sockEntry = sockEntry->next; } CRITICALSTOP(); return nbytes; } /* close the socket id, delete the entry from the list * * (RMP process) * */ int RMPClose(int sockId) { SockIdList sockEntry; mRMPEvent *event; struct timeval time = {0,0}; sockEntry = GetEntryBySID(sockId); if((sockEntry == NULL) || (sockEntry->group == NULL)) { return -1; } /* really? only if it is the *last* socket(member) of the group*/ if(LastSocketGroup(sockId)) { while(sockEntry->group->queryFlowControl(1) != FLOW_CONTROL_OK) { event = mRMP::mRMPLoop(0, NULL, NULL, NULL, &time); if (event != NULL) { handleMessage(event); event->release(); } } mRMP::leave(sockEntry->group, NULL, 0); } while(sockEntry->group != NULL) { event = mRMP::mRMPLoop(0, NULL, NULL, NULL, &time); if (event != NULL) { handleMessage(event); event->release(); } } close(sockEntry->sd); return DeleteEntryBySID(sockId); }