Browse Source

dynamic stream buffer, sending and receiving of split packages

Kajetan Johannes Hammerle 5 years ago
parent
commit
638ea4ae39
8 changed files with 207 additions and 30 deletions
  1. 17 6
      Client.c
  2. 4 4
      Makefile
  3. 25 13
      Server.c
  4. 6 1
      ServerMain.c
  5. 74 0
      SocketUtils.c
  6. 10 0
      SocketUtils.h
  7. 65 2
      Stream.c
  8. 6 4
      Stream.h

+ 17 - 6
Client.c

@@ -8,6 +8,7 @@
 #include <unistd.h>
 #include "Stream.h"
 #include "Client.h"
+#include "SocketUtils.h"
 
 void clientInitDefaults(Client* c)
 {
@@ -42,10 +43,11 @@ int clientInit(Client* c, char* ip, short port)
     {
         printf("Connection with server (%s) established\n", inet_ntoa(socketData.sin_addr));
         Stream in;
-        streamInit(&in);
+        streamInit(&in, 1024);
         if(clientReceive(c, &in) == -1)
         {
             printf("Server did not respond");
+            streamRemove(&in);
             clientRemove(c);
             return -1;
         }
@@ -55,6 +57,7 @@ int clientInit(Client* c, char* ip, short port)
             if(streamGetChar(&in, &answer) == -1)
             {
                 printf("Server sent an invalid response");
+                streamRemove(&in);
                 clientRemove(c);
                 return -1;
             }
@@ -70,10 +73,12 @@ int clientInit(Client* c, char* ip, short port)
                 char error[100];
                 streamGetChars(&in, error, 100);
                 printf("%s\n", error);
+                streamRemove(&in);
                 clientRemove(c);
                 return -1;
             }
         }
+        streamRemove(&in);
     }
     else
     {
@@ -87,10 +92,9 @@ int clientInit(Client* c, char* ip, short port)
 
 int clientReceive(Client* c, Stream* in)
 {
-    int size = recv(c->socket, in->data, in->size, 0);
+    int size = receiveAll(c->socket, in);
     if(size > 0)
     {
-        in->size = size;
         return 0;
     }
     return -1;
@@ -148,7 +152,7 @@ void clientWaitForData(Client* c)
         else if(type >= 0 && type < 4)
         {
             Stream out;
-            streamInit(&out);
+            streamInit(&out, 16);
             streamWriteChar(&out, type);
             
             switch(type)
@@ -170,14 +174,19 @@ void clientWaitForData(Client* c)
                     streamWriteChars(&out, buffer);
                     break;
                 case 2:
-                    // empty
+                    for(int i = 0; i < 260; i++)
+                    {
+                        streamWriteInt(&out, i);
+                    }
                     break;
             }
             
             if(clientSendData(c, &out) == -1)
             {
+                streamRemove(&out);
                 break;
             }
+            streamRemove(&out);
         }
         else
         {
@@ -188,10 +197,12 @@ void clientWaitForData(Client* c)
 
 int clientSendData(Client* c, Stream* s)
 {
-    if(send(c->socket, s->data, s->index, MSG_NOSIGNAL) == -1)
+    int bytes = sendAll(c->socket, s);
+    if(bytes == -1)
     {
         perror("Cannot send data");
         return -1;
     }
+    //printf("%d Bytes from %d sent\n", bytes, s->index);
     return 0;
 }

+ 4 - 4
Makefile

@@ -7,15 +7,15 @@ run_all: all
 	./server
 	./client 127.0.0.1
 
-server: ServerMain.c Server.c Server.h Stream.c Stream.h
-	gcc $(FLAGS) -o $@ ServerMain.c Server.c Stream.c $(SERVER_LINKER)
+server: ServerMain.c Server.c Server.h Stream.c Stream.h SocketUtils.c SocketUtils.h
+	gcc $(FLAGS) -o $@ ServerMain.c Server.c Stream.c SocketUtils.c $(SERVER_LINKER)
 
 run_server: server
 	./server
 
 
-client: ClientMain.c Client.c Client.h Stream.c Stream.h
-	gcc $(FLAGS) -o $@ ClientMain.c Client.c Stream.c
+client: ClientMain.c Client.c Client.h Stream.c Stream.h SocketUtils.c SocketUtils.h
+	gcc $(FLAGS) -o $@ ClientMain.c Client.c Stream.c SocketUtils.c
 
 run_client: client
 	./client 127.0.0.1

+ 25 - 13
Server.c

@@ -8,6 +8,7 @@
 #include <arpa/inet.h>
 #include <sys/types.h>  
 #include "Stream.h" 
+#include "SocketUtils.h"
 
 void serverInitDefaults(Server* s)
 {
@@ -156,33 +157,41 @@ void* clientHandler(void* data)
     Stream in;
     while(1)
     {
-        streamInit(&in);
-        int size = recv(s->clientSockets[id], in.data, in.size, 0);
+        streamInit(&in, 1024);
+        int size = receiveAll(s->clientSockets[id], &in);
         if(size > 0)
         {
-            int package = ((int) in.data[0]);
-            if(package >= 0 && package < s->hIndex)
+            char package;
+            if(streamGetChar(&in, &package) == -1)
             {
-                printf("Received package %d from %d\n", package, id);
-                in.index = 1;
-                in.size = size;
-                s->handlers[package](&in);
+                printf("Invalid package %d from %d\n", (int) package, id);
             }
             else
             {
-                printf("Invalid package %d from %d\n", package, id);
+                if(package >= 0 && package < s->hIndex)
+                {
+                    printf("Received package %d from %d\n", (int) package, id);
+                    s->handlers[(int) package](&in);
+                }
+                else
+                {
+                    printf("Invalid package %d from %d\n", (int) package, id);
+                }
             }
         }
         else if(size == 0)
         {
             printf("Client %d closed remote socket\n", id);
+            streamRemove(&in);
             break;
         }   
         else
         {
             perror("recv error");
+            streamRemove(&in);
             break;
         }
+        streamRemove(&in);
     }
     
     if(close(s->clientSockets[id]) != 0)
@@ -201,7 +210,7 @@ void* clientHandler(void* data)
 
 int serverSend(int clientSocket, Stream* out)
 {
-    if(send(clientSocket, out->data, out->index, 0) == -1)
+    if(sendAll(clientSocket, out) == -1)
     {
         perror("Cannot send data");
         return -1;
@@ -240,10 +249,11 @@ void serverWaitForConnection(Server* s)
                         perror("Cannot create thread");  
                         
                         Stream out;
-                        streamInit(&out);
+                        streamInit(&out, 16);
                         streamWriteChar(&out, -1);
                         streamWriteChars(&out, "Cannot create thread\n");
                         serverSend(clientSocket, &out);
+                        streamRemove(&out);
                         close(clientSocket);
                     }
                     else
@@ -251,7 +261,7 @@ void serverWaitForConnection(Server* s)
                         s->clientSockets[i] = clientSocket;
                         
                         Stream out;
-                        streamInit(&out);
+                        streamInit(&out, 64);
                         streamWriteChar(&out, 1);
                         streamWriteChars(&out, "Welcome to the server, please enter your command:\n");
                         if(serverSend(clientSocket, &out) == -1)
@@ -260,6 +270,7 @@ void serverWaitForConnection(Server* s)
                             close(clientSocket);
                             s->clientSockets[i] = -1;
                         }
+                        streamRemove(&out);
                     }
                     break;
                 }
@@ -270,10 +281,11 @@ void serverWaitForConnection(Server* s)
                     printf("max clients reached\n");    
                     
                     Stream out;
-                    streamInit(&out);
+                    streamInit(&out, 16);
                     streamWriteChar(&out, -1);
                     streamWriteChars(&out, "the server is full\n");
                     serverSend(clientSocket, &out);
+                    streamRemove(&out);
                     close(clientSocket);
                     break;
                 }

+ 6 - 1
ServerMain.c

@@ -43,7 +43,12 @@ void package1(Stream* in)
 
 void package2(Stream* in)
 {
-    printf("I'm not containing any data.\n");
+    int data;
+    while(streamGetInt(in, &data) != -1)
+    {
+        printf("%d ", data);
+    }
+    printf("\n");
 }
 
 int main()

+ 74 - 0
SocketUtils.c

@@ -0,0 +1,74 @@
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <stdio.h>
+#include "SocketUtils.h"
+#include "Stream.h"
+
+int sendAll(int socket, Stream* out)
+{
+    char* data = out->data;
+    int left = out->index;
+    streamUpdateIndex(out);
+    while(left > 0)
+    {
+        int bytes = send(socket, data, left, MSG_NOSIGNAL);
+        if(bytes == -1)
+        {
+            perror("Cannot send data");
+            return -1;
+        }
+        left -= bytes;
+        data += bytes;
+    }
+    return out->index;
+}
+
+int receiveAll(int socket, Stream* in)
+{   
+    int size = recv(socket, in->data, in->size, 0);
+    if(size > 0)
+    {
+        int dataLength = streamGetIndex(in);
+        if(size == dataLength)
+        {
+            in->size = dataLength;
+            return dataLength;
+        }
+        streamEnsureCapacity(in, dataLength);
+        in->size = dataLength;
+        
+        char* data = in->data + size;
+        int left = dataLength - size;
+        while(left > 0)
+        {
+            size = recv(socket, data, left, 0);
+            if(size > 0)
+            {
+                data += size;
+                left -= size;
+            }
+            else if(size == 0)
+            {
+                printf("Remote Socket closed\n");
+                return -1;
+            }   
+            else
+            {
+                perror("recv error");
+                return -1;
+            }
+        }
+        return dataLength;
+    }
+    else if(size == 0)
+    {
+        printf("Remote Socket closed\n");
+        return 0;
+    }   
+    else
+    {
+        perror("recv error");
+        return -1;
+    }
+    return -1;
+}

+ 10 - 0
SocketUtils.h

@@ -0,0 +1,10 @@
+#ifndef SOCKETUTILS_H
+#define SOCKETUTILS_H
+
+#include "Stream.h"
+
+int sendAll(int socket, Stream* out);
+int receiveAll(int socket, Stream* in);
+
+#endif
+

+ 65 - 2
Stream.c

@@ -1,10 +1,70 @@
 #include "Stream.h"
+#include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 
-void streamInit(Stream* s)
+void streamInit(Stream* s, int size)
+{
+    s->index = 4;
+    s->size = size;
+    if(size > 0)
+    {
+        s->data = malloc(sizeof(char) * size);
+    }
+    else
+    {
+        s->data = NULL;
+    }
+}
+
+void streamRemove(Stream* s)
 {
     s->index = 0;
-    s->size = BUFFER_SIZE;
+    s->size = 0;
+    free(s->data);
+    s->data = NULL;
+}
+
+void streamUpdateIndex(Stream* s)
+{
+    if(s->size >= 4)
+    {
+        s->data[0] = s->index & 0xFF;
+        s->data[1] = (s->index >> 8) & 0xFF;
+        s->data[2] = (s->index >> 16) & 0xFF;
+        s->data[3] = (s->index >> 24) & 0xFF;
+    }    
+}
+
+int streamGetIndex(Stream* s)
+{
+    if(s->size >= 4)
+    {
+        return (s->data[0] & 0xFF) | ((s->data[1] & 0xFF) << 8) | ((s->data[2] & 0xFF) << 16) | ((s->data[3] & 0xFF) << 24);
+    }
+    return -1;
+}
+
+void streamEnsureCapacity(Stream* s, int length)
+{
+    if(length >= s->size)
+    {
+        int newSize = s->size;
+        if(newSize >= 0)
+        {
+            newSize = 1;
+        }
+        while(length >= newSize)
+        {
+            newSize *= 2;
+        }
+        char* newData = malloc(sizeof(char) * newSize);
+        memcpy(newData, s->data, s->size);
+        free(s->data);
+        s->data = newData;
+        //printf("RESIZE FROM %d to %d\n", s->size, newSize);
+        s->size = newSize;
+    }
 }
 
 int streamGetChar(Stream* in, char* c)
@@ -65,6 +125,7 @@ int streamGetInt(Stream* in, int* i)
 
 int streamWriteChar(Stream* out, char c)
 {
+    streamEnsureCapacity(out, out->index);
     if(out->index < out->size)
     {
         out->data[out->index] = c;
@@ -90,6 +151,7 @@ int streamWriteChars(Stream* out, char* c)
 
 int streamWriteShort(Stream* out, short s)
 {
+    streamEnsureCapacity(out, out->index + 1);
     if(out->index + 1 < out->size)
     {
         out->data[out->index] = s & 0xFF;
@@ -102,6 +164,7 @@ int streamWriteShort(Stream* out, short s)
 
 int streamWriteInt(Stream* out, int i)
 {
+    streamEnsureCapacity(out, out->index + 3);
     if(out->index + 3 < out->size)
     {
         out->data[out->index] = i & 0xFF;

+ 6 - 4
Stream.h

@@ -1,16 +1,18 @@
 #ifndef STREAM_H
 #define STREAM_H
 
-#define BUFFER_SIZE 1024
-
 typedef struct Stream
 {
     int size;
     int index;
-    char data[BUFFER_SIZE];
+    char* data;
 } Stream;
 
-void streamInit(Stream* s);
+void streamInit(Stream* s, int size);
+void streamRemove(Stream* s);
+void streamUpdateIndex(Stream* s);
+int streamGetIndex(Stream* s);
+void streamEnsureCapacity(Stream* s, int length);
 
 typedef void (*StreamFunction) (Stream*);