Compressing Data Sent Over a Socket

The java.util.zip package, introduced in JDK 1.3, provides a Java interface to the widely used ZLIB compression algorithms. The core of the package is a pair of classes, Inflator and Deflator, that wrap a native library that compresses and decompresses blocks of data. Two pairs of stream classes, ZipInputStream/ZipOutputStream and GZIPInputStream/GZIPOutputStream, use Inflator and Deflator to read and write compressed data in the ubiquitous zip and gzip formats.

The stream classes in java.util.zip work well for working with compressed files and similarly bounded data. They are, however, less useful for compressing continuous transmissions over a socket connection. This is unfortunate, as the tradeoff between processor cycles (needed for compressing and decompressing) and bandwidth is often such that compression can have a considerable impact on performance of network applications. Applications that make use of XML data or XML-based protocols are prime examples.

At the root of the problem is an unavoidable characteristic of compression algorithms: Data is compressed by finding patterns and redundancies that can be represented in a more compact way. Without a priori knowledge of the nature of the data, patterns can only be detected in finite blocks of data. This presents a conflict for the “one byte at a time” model of Java streams, at least in the case where communication is continuous and open-ended. For transactional communication (e.g., bounded operations in which a connection is opened, data is transmitted, and the connection closed), the standard stream classes work quite well. For example, GZIPOutputStream can be used in a servlet filter to automatically compress data sent to a browser.

Streams, of course, need not process data one byte at a time. Many of the standard Java stream classes perform buffering to read or write larger chunks of data. Compression simply needs a similar buffering mechanism, reading data until a suitable block is available for compression, then compressing it and sending it across the wire.

On the sending side, GZIPOutputStream almost provides a solution. The class includes a finish method that compresses previously written data and transmits it without closing the underlying stream. Once finish has been called, no further data can be written to the GZIPOutputStream. One could simply discard the GZIPOutputStream after calling finish and create a new one for the next block of data, but a related problem happens in the GZIPInputStream on the other end of the connection. GZIPInputStream reads data in chunks of a fixed size. If a block of compressed data ends in the middle of such a chunk, extra hoops must be (proverbially) jumped through to create a new GZIPInputStream that begins reading at the start of the next block.

To be fair, the ZipInputStream and ZipOutputStream classes are somewhat more adaptable to continous transmission of data across a socket. The zip format is designed to support multiple files within an archive, so each block of data can be treated as a separate file. The closeEntry and putNextEntry methods in ZipOutputStream can be used to separate entries, while the getNextEntry method in ZipInputStream can be used to read one entry at a time on the receiving side. This approach still has several problems. It requires extra work by the code which uses the zip streams: They can no longer simply be wrapped around the socket’s stream and accessed via the normal write, flush, read, and close methods, but must determine which a new “file” must be created in the stream. The zip format’s meta-data (encapsulated in the ZipEntry class) also adds a small amount of overhead to every compressed block of data. Neither of these issues are insurmountable, but they do suggest that a simpler design might be possible.

An alternate approach is to create stream classes that use the Deflator and Inflator classes directly. The CompressedBlockOutputStream and CompressedBlockInputStream classes shown below were designed with the following goals in mind:


  1. The streams’ compression functionality should be invisible to the application. In other words, it should be possible to simply wrap them around the socket’s input and output streams, then write to and read from them without having to know when a block of data should be compressed or decompressed.
  2. A compressed block of data should be sent whenever a given number of bytes have been written. This will prevent OutOfMemoryErrors from happening if a very large block of data is written to the stream.
  3. The flush method in CompressedBlockOutputStream should cause any buffered input to be immediately compressed and transmitted, even if the size threshold has not been reached yet.
  4. Stream meta-data (beyond any header and trailer information used by ZLIB) should be kept to a minimum. In practice, the implementation uses 8 bytes of header information: One 4-byte integer that indicates the size of the block of the compressed data and another 4-byte integer that indicates the size of the uncompressed data.

The implementation of these classes is shown in Figures 1 and 2, with a simple demonstration program given in Figure 3.



/**
 * Output stream that compresses data. A compressed block 
 * is generated and transmitted once a given number of bytes 
 * have been written, or when the flush method is invoked.
 * 
 * Copyright 2005 - Philip Isenhour - http://javatechniques.com/
 * 
 * This software is provided 'as-is', without any express or 
 * implied warranty. In no event will the authors be held liable 
 * for any damages arising from the use of this software.
 *
 * Permission is granted to anyone to use this software for any 
 * purpose, including commercial applications, and to alter it and 
 * redistribute it freely, subject to the following restrictions:
 *
 *  1. The origin of this software must not be misrepresented; you 
 *     must not claim that you wrote the original software. If you 
 *     use this software in a product, an acknowledgment in the 
 *     product documentation would be appreciated but is not required.
 *
 *  2. Altered source versions must be plainly marked as such, and 
 *     must not be misrepresented as being the original software.
 *
 *  3. This notice may not be removed or altered from any source 
 *     distribution.
 *
 * $Id:  1.1 2005/10/26 17:19:05 isenhour Exp $
 */
import java.io.*;
import java.util.zip.Deflater;

public class CompressedBlockOutputStream extends FilterOutputStream {
    /**
     * Buffer for input data
     */
    private byte[] inBuf = null;

    /**
     * Buffer for compressed data to be written
     */
    private byte[] outBuf = null;

    /**
     * Number of bytes in the buffer
     */
    private int len = 0;

    /**
     * Deflater for compressing data
     */
    private Deflater deflater = null;

    /**
     * Constructs a CompressedBlockOutputStream that writes to 
     * the given underlying output stream 'os' and sends a compressed 
     * block once 'size' byte have been written. The default 
     * compression strategy and level are used.
     */
    public CompressedBlockOutputStream(OutputStream os, int size) 
        throws IOException {
        this(os, size, 
            Deflater.DEFAULT_COMPRESSION, Deflater.DEFAULT_STRATEGY);
    }

    /**
     * Constructs a CompressedBlockOutputStream that writes to the 
     * given underlying output stream 'os' and sends a compressed 
     * block once 'size' byte have been written. The compression 
     * level and strategy should be specified using the constants 
     * defined in java.util.zip.Deflator.
     */
    public CompressedBlockOutputStream(OutputStream os, int size, 
        int level, int strategy) throws IOException {
        super(os);
        this.inBuf = new byte[size];
        this.outBuf = new byte[size + 64];
        this.deflater = new Deflater(level);
        this.deflater.setStrategy(strategy);
    }

    protected void compressAndSend() throws IOException {
        if (len > 0) {
            deflater.setInput(inBuf, 0, len);
            deflater.finish();
            int size = deflater.deflate(outBuf);

            // Write the size of the compressed data, followed 
            // by the size of the uncompressed data
            out.write((size >> 24) & 0xFF);
            out.write((size >> 16) & 0xFF);
            out.write((size >>  8) & 0xFF);
            out.write((size >>  0) & 0xFF);

            out.write((len >> 24) & 0xFF);
            out.write((len >> 16) & 0xFF);
            out.write((len >>  8) & 0xFF);
            out.write((len >>  0) & 0xFF);

            out.write(outBuf, 0, size);
            out.flush();

            len = 0;
            deflater.reset();
        }
    }

    public void write(int b) throws IOException {
        inBuf[len++] = (byte) b;
        if (len == inBuf.length) {
            compressAndSend();
        }
    }

    public void write(byte[] b, int boff, int blen) 
        throws IOException {
        while ((len + blen) > inBuf.length) {
            int toCopy = inBuf.length - len;
            System.arraycopy(b, boff, inBuf, len, toCopy);
            len += toCopy;
            compressAndSend();
            boff += toCopy;
            blen -= toCopy;
        }
        System.arraycopy(b, boff, inBuf, len, blen);
        len += blen;
    }

    public void flush() throws IOException {
        compressAndSend();
        out.flush();
    }

    public void close() throws IOException {
        compressAndSend();
        out.close();
    }
}



Figure 1. The CompressedBlockOutputStream class.



/**
 * Input stream that decompresses data. 
 * 
 * Copyright 2005 - Philip Isenhour - http://javatechniques.com/
 * 
 * This software is provided 'as-is', without any express or 
 * implied warranty. In no event will the authors be held liable 
 * for any damages arising from the use of this software.
 *
 * Permission is granted to anyone to use this software for any 
 * purpose, including commercial applications, and to alter it and 
 * redistribute it freely, subject to the following restrictions:
 *
 *  1. The origin of this software must not be misrepresented; you 
 *     must not claim that you wrote the original software. If you 
 *     use this software in a product, an acknowledgment in the 
 *     product documentation would be appreciated but is not required.
 *
 *  2. Altered source versions must be plainly marked as such, and 
 *     must not be misrepresented as being the original software.
 *
 *  3. This notice may not be removed or altered from any source 
 *     distribution.
 *
 * $Id:  1.2 2005/10/26 17:40:19 isenhour Exp $
 */
import java.io.*;
import java.util.zip.Inflater;
import java.util.zip.DataFormatException;

public class CompressedBlockInputStream extends FilterInputStream {
    /**
     * Buffer of compressed data read from the stream
     */
    private byte[] inBuf = null;

    /**
     * Length of data in the input data
     */
    private int inLength = 0;

    /**
     * Buffer of uncompressed data
     */
    private byte[] outBuf = null;

    /**
     * Offset and length of uncompressed data
     */
    private int outOffs = 0;
    private int outLength = 0;

    /**
     * Inflater for decompressing
     */
    private Inflater inflater = null;

    public CompressedBlockInputStream(InputStream is) 
        throws IOException {
        super(is);
        inflater = new Inflater();
    }

    private void readAndDecompress() throws IOException {
        // Read the length of the compressed block
        int ch1 = in.read();
        int ch2 = in.read();
        int ch3 = in.read();
        int ch4 = in.read();
        if ((ch1 | ch2 | ch3 | ch4) < 0)
            throw new EOFException();
        inLength = ((ch1 << 24) + (ch2 << 16) + 
            (ch3 << 8) + (ch4 << 0));

        ch1 = in.read();
        ch2 = in.read();
        ch3 = in.read();
        ch4 = in.read();
        if ((ch1 | ch2 | ch3 | ch4) < 0)
            throw new EOFException();
        outLength = ((ch1 << 24) + (ch2 << 16) + 
            (ch3 << 8) + (ch4 << 0));

        // Make sure we've got enough space to read the block
        if ((inBuf == null) || (inLength > inBuf.length)) {
            inBuf = new byte[inLength];
        }

        if ((outBuf == null) || (outLength > outBuf.length)) {
            outBuf = new byte[outLength];
        }

        // Read until we're got the entire compressed buffer. 
        // read(...) will not necessarily block until all 
        // requested data has been read, so we loop until 
        // we're done.
        int inOffs = 0;
        while (inOffs < inLength) {
            int inCount = 
                in.read(inBuf, inOffs, inLength - inOffs);
            if (inCount == -1) {
                throw new EOFException();
            }
            inOffs += inCount;
        }

        inflater.setInput(inBuf, 0, inLength);
        try {
            inflater.inflate(outBuf);
        }
        catch(DataFormatException dfe) {
            throw new IOException(
                "Data format exception - " + 
                dfe.getMessage());
        }

        // Reset the inflator so we can re-use it for the 
        // next block
        inflater.reset();

        outOffs = 0;
    }

    public int read() throws IOException {
        if (outOffs >= outLength) {
            try {
                readAndDecompress();
            }
            catch(EOFException eof) {
                return -1;
            }
        }

        return outBuf[outOffs++] & 0xff;
    }

    public int read(byte[] b, int off, int len) 
        throws IOException {
        int count = 0;
        while (count < len) {
            if (outOffs >= outLength) {
                try {
                    // If we've read at least one decompressed 
                    // byte and further decompression would 
                    // require blocking, return the count.
                    if ((count > 0) && (in.available() == 0))
                        return count;
                    else
                        readAndDecompress();
                }
                catch(EOFException eof) {
                    if (count == 0)
                        count = -1;
                    return count;
                }
            }

            int toCopy = 
                Math.min(outLength - outOffs, len - count);
            System.arraycopy(
                outBuf, outOffs, b, off + count, toCopy);
            outOffs += toCopy;
            count += toCopy;
        }

        return count;
    }

    public int available() throws IOException {
        // This isn't precise, but should be an adequate 
        // lower bound on the actual amount of available data
        return (outLength - outOffs) + in.available();
    }

}



Figure 2. The CompressedBlockInputStream class.

Here is a simple demo program that opens a socket connection and sends 1000 lines of (compressed) data across it:



/**
 * Simple demonstration of the CompressedBlockOutputStream and 
 * CompressedBlockInputStream transmitting data across a socket.
 * 
 * $Id: Demo.java,v 1.1 2005/10/26 17:40:19 isenhour Exp $
 */
import java.io.*;
import java.net.*;

public class Demo {
    private static class Server extends Thread {
        int port;
        public Server(int port) {
            this.port = port;
            setDaemon(true);
            start();
        }

        public void run() {
            try {
                // Accept connections, spawning a worker thread 
                // when we get one.
                ServerSocket ss = new ServerSocket(port);
                while (true) {
                    ServerWorker worker = 
                        new ServerWorker(ss.accept());
                }
            }
            catch(IOException ioe) {
                ioe.printStackTrace();
            }
        }
    }

    private static class ServerWorker extends Thread {
        Socket s = null;
        public ServerWorker(Socket s) {
            this.s = s;
            setDaemon(false);
            start();
        }

        public void run() {
            try {
                // Build a Reader object that wraps the 
                // (decompressed) socket input stream.
                BufferedReader in = new BufferedReader(
                    new InputStreamReader(
                    new CompressedBlockInputStream(
                    s.getInputStream())));
                String line = in.readLine();
                while (line != null) {
                    System.out.println(line);
                    line = in.readLine();
                }
                System.out.flush();
            }
            catch(IOException ioe) {
                ioe.printStackTrace();
            }
        }
    }

    public static void sendData(int port) throws IOException {
        // Connect to the server
        Socket s = new Socket("localhost", port);

        // Make a stream that compresses outgoing data, 
        // sending a compressed block for every 1K worth of 
        // uncompressed data written.
        CompressedBlockOutputStream compressed =
            new CompressedBlockOutputStream(
                s.getOutputStream(), 1024);

        // Build a writer that wraps the (compressed) socket 
        // output stream
        PrintWriter out = 
            new PrintWriter(new OutputStreamWriter(compressed));

        // Send across 1000 lines of output
        for (int i = 0; i < 1000; i++) {
            out.println("This is line " + (i + 1) + 
                " of the test output.");
        }

        // Note that if we don't close the stream, the last 
        // block of data may not be sent across the connection. We 
        // could also force the last block to be sent by calling 
        // flush(), which would leave the socket connection open.
        out.close();
    }


    public static void main(String[] args) throws IOException {
        // Pick an obscure default port. An alternate port 
        // can be given as the first command line argument.
        int port = 11535;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        Server s = new Server(port);

        sendData(port);
    }
}



Figure 3. A simple Demo class, illustrating the use of CompressedBlockOutputStream and CompressedBlockInputStream.

Here are a few things to keep in mind when using these classes:


  • The classes are not thread-safe. Synchronization will be needed if, for example, multiple threads will be writing to a CompressedBlockOutputStream.
  • Any buffered data will be sent when the CompressedBlockOutputStream is flushed or closed. If you need to ensure that buffered data is always sent when the program terminates, you can use Runtime‘s addShutdownHook method to register a thread that will flush the stream even if the program exits uncleanly.
  • Generally speaking, you will get higher compression levels for larger blocks of data, since the compression algorithm will have more opportunities to find patterns and redundancies. The tradeoff, of course, is that larger buffer sizes increase memory usage and latency.