From 8933b795003f8ad202fce6e553191be8932a37b6 Mon Sep 17 00:00:00 2001 From: Stijn Buys Date: Fri, 30 May 2008 19:56:10 +0000 Subject: zlib support --- src/core/netclient.cc | 62 +++++++++++++++++++++---------- src/core/netconnection.cc | 94 +++++++++++++++++++++++++++++++++++++---------- src/core/netconnection.h | 5 +++ src/core/stats.cc | 7 ++-- src/core/stats.h | 5 ++- 5 files changed, 128 insertions(+), 45 deletions(-) (limited to 'src/core') diff --git a/src/core/netclient.cc b/src/core/netclient.cc index b90f164..dc4a0bd 100644 --- a/src/core/netclient.cc +++ b/src/core/netclient.cc @@ -4,6 +4,8 @@ the terms of the GNU General Public License version 2 */ +#include + #include #include @@ -87,29 +89,23 @@ void NetClient::retreive(std::string & message) { // receive data and decode it into lines void NetClient::receive(char *data) { - std::string datablock; - datablock.assign(data); - - if (!datablock.size()) - return; + const char *c = data; - while(datablock.size() > 0 ) { - // scan the datablock for enters - if (datablock[0] == '\n' || datablock[0] == '\r') { - // TODO detect "begin binary block" message for zlib compression + while (*c) { + if (( *c == '\n') || (*c == '\r')) { if (messageblock.size() > 0 ) { recvq.push_back(messageblock); messageblock.clear(); } } else { if (messageblock.size() < FRAMESIZE) { - messageblock.append(datablock.substr(0,1)); + messageblock += *c; } else { con_warn << "Incoming message exceeds " << FRAMESIZE << " bytes!\n"; messageblock.clear(); } } - datablock.erase(0,1); + c++; } client_timeout = application()->time(); @@ -128,16 +124,41 @@ void NetClient::transmit(int serverfd) } else { return; } - } else if (sendq.size() >= FRAMESIZE) { - con_warn << "Outgoing message exceeds " << FRAMESIZE -1 << " bytes!\n"; - //sendq.clear(); - //return; + } else if (sendq.size() >= BLOCKSIZE - 16 ) { + con_warn << host() << ":" << port() << " outgoing data exceeds " << BLOCKSIZE - 16 << " bytes!\n"; + sendq.clear(); + return; } - ssize_t bytes_sent = 0; + char zbuf[BLOCKSIZE]; + const char *data = 0; + size_t compressed_size = BLOCKSIZE - 5; + size_t total_size = 0; + + memset(zbuf,0, sizeof(zbuf)); + + Stats::network_uncompressed_bytes_sent += sendq.size(); + + // zlib compress + int status = compress((Bytef*)(zbuf+4), &compressed_size, (Bytef*)sendq.c_str(), sendq.size()); - while (sendq.size() && !error()) { - bytes_sent = ::sendto(serverfd, sendq.c_str(), sendq.size()+1, 0, + if ((status == Z_OK) && (compressed_size + 4 < sendq.size())) { + // add a header to the compress packet + data = zbuf; + total_size = compressed_size + 4; + zbuf[0] = '\xff'; + zbuf[1] = '\xff'; + zbuf[2] = compressed_size % 256; + zbuf[3] = compressed_size >> 8; + } else { + data = sendq.c_str(); + total_size = sendq.size(); + } + + size_t total_sent = 0; + + while (total_sent < total_size && !error()) { + ssize_t bytes_sent = ::sendto(serverfd, data, total_size - total_sent, 0, (struct sockaddr *)&client_addr, sizeof(client_addr)); if (bytes_sent < 0) { @@ -145,11 +166,12 @@ void NetClient::transmit(int serverfd) return; } - sendq.erase(0, bytes_sent); + total_sent += bytes_sent; + data += bytes_sent; Stats::network_bytes_sent += bytes_sent; } - sendq.clear(); + sendq.clear(); client_keepalive = application()->time(); } diff --git a/src/core/netconnection.cc b/src/core/netconnection.cc index aa3db1a..6606a9f 100644 --- a/src/core/netconnection.cc +++ b/src/core/netconnection.cc @@ -4,6 +4,8 @@ the terms of the GNU General Public License version 2 */ +#include + #include #include "sys/sys.h" @@ -20,6 +22,10 @@ NetConnection::NetConnection() { connection_timeout = core::application()->time(); connection_state = Connecting; + + receive_compressed = false; + received_compressed_size = 0; + compressed_size = 0; } NetConnection::~NetConnection() @@ -135,13 +141,13 @@ void NetConnection::receive() ssize_t bytes_received; - memset(recvbuf, '\0', BLOCKSIZE); + memset(recvbuf, 0, BLOCKSIZE); bytes_received = ::recv(connection_fd, recvbuf, BLOCKSIZE-1, 0); Stats::network_bytes_received += bytes_received; connection_timeout = core::application()->time(); if (bytes_received == 0) { - con_print << "^BDisconnected."; + con_print << "^BDisconnected." << std::endl; abort(); return; } else if (bytes_received < 0) { @@ -151,26 +157,74 @@ void NetConnection::receive() return; } - std::string datablock; - datablock.assign(recvbuf); + const char *c = recvbuf; + + while (bytes_received) { + if (receive_compressed) { + zrecvbuf[received_compressed_size] = *c; + received_compressed_size++; + + if (received_compressed_size == compressed_size) { + // uncompress + char zunbuf[BLOCKSIZE]; + memset(zunbuf, 0, sizeof(zunbuf)); + size_t zunbuf_size = BLOCKSIZE - 1; + + int status = uncompress((Bytef *) zunbuf, &zunbuf_size, (Bytef *) zrecvbuf, compressed_size); + + if (status != Z_OK) { + con_warn << "zlib error " << status << " uncompressing incoming datablock!\n"; + } else { + const char *zc = zunbuf; + while (*zc) { + if (( *zc == '\n') || (*zc == '\r')) { + if (messageblock.size()) { + recvq.push_back(messageblock); + messageblock.clear(); + } + } else { + if (messageblock.size() < FRAMESIZE) { + messageblock += *zc; + } else { + con_warn << "Incoming uncompressed message exceeds " << FRAMESIZE << " bytes!\n"; + messageblock.clear(); + } + } + zc++; + } + } - while (datablock.size()) { - // scan the datablock for enters - if (datablock[0] == '\n' || datablock[0] == '\r') { - if (messageblock.size() >= FRAMESIZE) { - con_warn << "Incoming message exceeds " << FRAMESIZE << " bytes!\n"; - messageblock.clear(); - } else if (messageblock.size()) { - recvq.push_back(messageblock); - //con_debug << "Incoming message '" << messageblock << "'" << std::endl; + // reset + receive_compressed = false; + received_compressed_size = 0; + compressed_size = 0; + } + } else if (!messageblock.size() && (bytes_received > 3 ) && ( *c == '\xff') && (*(c+1) == '\xff')) { + + receive_compressed = true; + received_compressed_size = 0; + compressed_size = *(c+2) + (*(c+3) << 8); + c += 3; + bytes_received -= 3; + memset(zrecvbuf, 0, sizeof(zrecvbuf)); + + } else if (( *c == '\n') || (*c == '\r')) { + if (messageblock.size()) { + recvq.push_back(messageblock); messageblock.clear(); } } else { - messageblock.append(datablock.substr(0,1)); + if (messageblock.size() < FRAMESIZE) { + messageblock += *c; + } else { + con_warn << "Incoming message exceeds " << FRAMESIZE << " bytes!\n"; + messageblock.clear(); + } } - datablock.erase(0,1); + c++; + bytes_received--; } - datablock.clear(); + } void NetConnection::frame(float seconds) @@ -232,10 +286,10 @@ void NetConnection::transmit() } else { return; } - } else if (sendq.size() > FRAMESIZE) { - con_warn << "Outgoing message exceeds " << FRAMESIZE << " bytes!\n"; - //sendq.clear(); - //return; + } else if (sendq.size() >= BLOCKSIZE - 16) { + con_warn << "Outgoing data exceeds " << BLOCKSIZE - 16 << " bytes!\n"; + sendq.clear(); + return; } ssize_t bytes_sent = 0; diff --git a/src/core/netconnection.h b/src/core/netconnection.h index 3cc28db..ddc70f0 100644 --- a/src/core/netconnection.h +++ b/src/core/netconnection.h @@ -104,6 +104,11 @@ private: int connection_port; struct sockaddr_in server_addr; char recvbuf[BLOCKSIZE]; + + bool receive_compressed; + size_t received_compressed_size; + size_t compressed_size; + char zrecvbuf[BLOCKSIZE]; }; } diff --git a/src/core/stats.cc b/src/core/stats.cc index 0e6a15b..36e942f 100644 --- a/src/core/stats.cc +++ b/src/core/stats.cc @@ -8,13 +8,14 @@ namespace core { -unsigned int Stats::network_bytes_sent = 0; -unsigned int Stats::network_bytes_received = 0; - +unsigned long Stats::network_bytes_sent = 0; +unsigned long Stats::network_bytes_received = 0; +unsigned long Stats::network_uncompressed_bytes_sent = 0; void Stats::clear() { network_bytes_sent = 0; network_bytes_received = 0; + network_uncompressed_bytes_sent = 0; }; } diff --git a/src/core/stats.h b/src/core/stats.h index 9d90fdc..7bcad0d 100644 --- a/src/core/stats.h +++ b/src/core/stats.h @@ -16,8 +16,9 @@ public: /// clear statistics static void clear(); - static unsigned int network_bytes_sent; - static unsigned int network_bytes_received; + static unsigned long network_bytes_sent; + static unsigned long network_uncompressed_bytes_sent; + static unsigned long network_bytes_received; }; } -- cgit v1.2.3