Add a gzip indexing class for faster gz navigation

The gzread function is slow.  Every time you seek to a new location, the
whole file up to that position has to be decompressed again.  This causes
massive lags when trying to do simple things in lnav on a large .gz file.

Use the zlib inflate* functions instead and record the dictionary
periodically while processing the file the first time.  Then use
inflateSetDictionary to restore the dictionary to a convenient
location when trying to seek into the file again in the future.

Use a default period of 1MB of compressed data for syncpoints.
Each syncpoint uses 32KB. This is a ratio of 3.2%.  For example,
a 1GB .gz file (compressed size) will require us to keep 32MB
of index data in memory. A better method may be to use a fixed
number of syncpoints and divide the file appropriately. This
would keep the memory bounded at the cost of slower file
navigation on large .gz files.

Use pread to read the data for the stream decompressor and remove
the lock_hack previously employed.

NB. The documentation on these zlib functions is sparse. I followed
the example in zlib/examples/zran.c, but I used the z_stream total_in
and total_out variables instead of keeping my own separately as zran.c
does.  Maybe this is incompatible with some very old zlib versions.
I haven't looked.
This commit is contained in:
Phil Hord 2019-07-30 17:00:24 -07:00
parent 165cd69c8d
commit 5513deeade
4 changed files with 292 additions and 29 deletions

View File

@ -60,9 +60,9 @@ static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024;
/*
* XXX REMOVE ME
*
* The stock gzipped file code does not use pread, so we need to use a lock to
* The stock bzipped file code does not use pread, so we need to use a lock to
* get exclusive access to the file. In the future, we should just rewrite
* the gzipped file code to use pread.
* the bzipped file code to use pread.
*/
class lock_hack {
public:
@ -116,9 +116,173 @@ private:
};
/* XXX END */
#define Z_BUFSIZE 65536U
#define SYNCPOINT_SIZE (1024 * 1024)
line_buffer::gz_indexed::gz_indexed()
{
if ((this->inbuf = (Bytef *)malloc(Z_BUFSIZE)) == NULL) {
throw bad_alloc();
}
}
void line_buffer::gz_indexed::close()
{
// Release old stream, if we were open
if (*this) {
inflateEnd(&this->strm);
::close(this->gz_fd);
this->syncpoints.clear();
this->gz_fd = -1;
}
}
void line_buffer::gz_indexed::init_stream()
{
if (*this) {
inflateEnd(&this->strm);
}
// initialize inflate struct
this->strm.zalloc = Z_NULL;
this->strm.zfree = Z_NULL;
this->strm.opaque = Z_NULL;
this->strm.avail_in = 0;
this->strm.next_in = Z_NULL;
this->strm.avail_out = 0;
int rc = inflateInit2(&strm, GZ_HEADER_MODE);
if (rc != Z_OK) {
throw(rc); // FIXME: exception wrapper
}
}
void line_buffer::gz_indexed::continue_stream()
{
// Save our position and output buffer
auto total_in = this->strm.total_in;
auto total_out = this->strm.total_out;
auto avail_out = this->strm.avail_out;
auto next_out = this->strm.next_out;
init_stream();
// Restore position and output buffer
this->strm.total_in = total_in;
this->strm.total_out = total_out;
this->strm.avail_out = avail_out;
this->strm.next_out = next_out;
}
void line_buffer::gz_indexed::open(int fd)
{
this->close();
this->init_stream();
this->gz_fd = fd;
}
int line_buffer::gz_indexed::stream_data(void * buf, size_t size)
{
this->strm.avail_out = size;
this->strm.next_out = (unsigned char *) buf;
int last = this->syncpoints.empty() ? 0 :
this->syncpoints.back().in;
while (this->strm.avail_out) {
if (!this->strm.avail_in) {
int rc = ::pread(this->gz_fd,
&this->inbuf[0],
Z_BUFSIZE,
this->strm.total_in);
if (rc < 0) {
return rc;
}
this->strm.next_in = this->inbuf;
this->strm.avail_in = rc;
}
if (this->strm.avail_in) {
int flush = last > this->strm.total_in
? Z_SYNC_FLUSH : Z_BLOCK;
auto err = inflate(&this->strm, flush);
if (err == Z_STREAM_END) {
// Reached end of stream; re-init for a possible subsequent stream
continue_stream();
} else if (err != Z_OK) {
log_error(" inflate-error: %d", (int)err);
throw error(err); // FIXME: exception wrapper
}
if (this->strm.total_in >= last + SYNCPOINT_SIZE &&
size > this->strm.avail_out + GZ_WINSIZE &&
(this->strm.data_type & GZ_END_OF_BLOCK_MASK) &&
!(this->strm.data_type & GZ_END_OF_FILE_MASK))
{
this->syncpoints.emplace_back(this->strm, size);
last = this->strm.total_out;
}
} else if (this->strm.avail_out) {
// Processed all the gz file data but didn't fill
// the output buffer. We're done, even though we
// produced fewer bytes than requested.
break;
}
}
return size - this->strm.avail_out;
}
void line_buffer::gz_indexed::seek(size_t offset)
{
if (offset == this->strm.total_out) {
return;
}
indexDict * dict = NULL;
// Find highest syncpoint not past offset
// FIXME: Make this a binary-tree search
for (auto &d : this->syncpoints) {
if (d.out <= offset) {
dict = &d;
} else {
break;
}
}
// Choose highest available syncpoint, or keep current offset if it's ok
if (offset < this->strm.total_out ||
(dict && this->strm.total_out < dict->out)) {
// Release the old z_stream
inflateEnd(&this->strm);
if (dict) {
dict->apply(&this->strm);
} else {
init_stream();
}
}
// Stream from compressed file until we reach our offset
unsigned char dummy[Z_BUFSIZE];
while ( offset > this->strm.total_out) {
size_t to_copy = std::min(static_cast<size_t>(Z_BUFSIZE),
offset - this->strm.total_out);
auto bytes = stream_data(dummy, to_copy);
if (bytes <= 0) {
break;
}
}
}
int line_buffer::gz_indexed::read(void * buf, size_t offset, size_t size)
{
if (offset != this->strm.total_out) {
this->seek(offset);
}
int bytes = stream_data(buf, size);
return bytes;
}
line_buffer::line_buffer()
: lb_gz_file(NULL),
lb_bz_file(false),
: lb_bz_file(false),
lb_compressed_offset(0),
lb_file_size(-1),
lb_file_offset(0),
@ -149,8 +313,7 @@ void line_buffer::set_fd(auto_fd &fd)
off_t newoff = 0;
if (this->lb_gz_file) {
gzclose(this->lb_gz_file);
this->lb_gz_file = NULL;
this->lb_gz_file.close();
}
if (this->lb_bz_file) {
@ -181,15 +344,7 @@ void line_buffer::set_fd(auto_fd &fd)
close(gzfd);
throw error(errno);
}
if ((this->lb_gz_file = gzdopen(gzfd, "r")) == NULL) {
close(gzfd);
if (errno == 0) {
throw bad_alloc();
}
else{
throw error(errno);
}
}
lb_gz_file.open(gzfd);
this->lb_file_time = read_le32(
(const unsigned char *)&gz_id[4]);
if (this->lb_file_time < 0) {
@ -343,16 +498,10 @@ bool line_buffer::fill_range(off_t start, ssize_t max_length)
rc = 0;
}
else {
lock_hack::guard guard;
lseek(this->lb_fd, this->lb_compressed_offset, SEEK_SET);
gzseek(this->lb_gz_file,
this->lb_file_offset + this->lb_buffer_size,
SEEK_SET);
rc = gzread(this->lb_gz_file,
&this->lb_buffer[this->lb_buffer_size],
this->lb_buffer_max - this->lb_buffer_size);
this->lb_compressed_offset = lseek(this->lb_fd, 0, SEEK_CUR);
rc = this->lb_gz_file.read(&this->lb_buffer[this->lb_buffer_size],
this->lb_file_offset + this->lb_buffer_size,
this->lb_buffer_max - this->lb_buffer_size);
this->lb_compressed_offset = this->lb_gz_file.get_source_offset();
if (rc != -1 && (
rc < (this->lb_buffer_max - this->lb_buffer_size))) {
this->lb_file_size = (

View File

@ -38,6 +38,7 @@
#include <zlib.h>
#include <exception>
#include <vector>
#include "base/lnav_log.hh"
#include "base/file_range.hh"
@ -74,6 +75,91 @@ public:
int e_err;
};
#define GZ_WINSIZE 32768U /*> gzip's max supported dictionary is 15-bits */
#define GZ_RAW_MODE (-15) /*> Raw inflate data mode */
#define GZ_HEADER_MODE (15 + 32) /*> Automatic zstd or gzip decoding */
#define GZ_BORROW_BITS_MASK 7 /*> Bits (0-7) consumed in previous block */
#define GZ_END_OF_BLOCK_MASK 128 /*> Stopped because reached end-of-block */
#define GZ_END_OF_FILE_MASK 64 /*> Stopped because reached end-of-file */
/**
* A memoized gzip file reader that can do random file access faster than
* gzseek/gzread alone.
*/
class gz_indexed {
public:
gz_indexed();
gz_indexed(gz_indexed &&other) = default;
~gz_indexed() {
this->close();
}
inline operator bool() const {
return this->gz_fd != -1;
}
uLong get_source_offset() {
return !!*this ? this->strm.total_in + this->strm.avail_in : 0;
}
void close();
void init_stream();
void continue_stream();
void open(int fd);
int stream_data(void * buf, size_t size);
void seek(size_t offset);
/**
* Decompress bytes from the gz file returning at most `size` bytes.
* offset is the byte-offset in the decompressed data stream.
*/
int read(void * buf, size_t offset, size_t size);
struct indexDict {
off_t in = 0;
off_t out = 0;
unsigned char bits = 0;
unsigned char in_bits = 0;
Bytef index[GZ_WINSIZE];
indexDict(z_stream const & s, const off_t size) {
assert((s.data_type & GZ_END_OF_BLOCK_MASK));
assert(!(s.data_type & GZ_END_OF_FILE_MASK));
assert(size >= s.avail_out + GZ_WINSIZE);
this->bits = s.data_type & GZ_BORROW_BITS_MASK;
this->in = s.total_in;
this->out = s.total_out;
auto last_byte_in = s.next_in[-1];
this->in_bits = last_byte_in >> (8 - this->bits);
// Copy the last 32k uncompressed data (sliding window) to our index
memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE);
}
int apply(z_streamp s) {
s->zalloc = Z_NULL;
s->zfree = Z_NULL;
s->opaque = Z_NULL;
s->avail_in = 0;
s->next_in = Z_NULL;
auto ret = inflateInit2(s, GZ_RAW_MODE);
if (ret != Z_OK) {
return ret;
}
if (this->bits) {
inflatePrime(s, this->bits, this->in_bits);
}
s->total_in = this->in;
s->total_out = this->out;
inflateSetDictionary(s, this->index, GZ_WINSIZE);
return ret;
}
};
private:
z_stream strm; /*< gzip streams structure */
std::vector<indexDict> syncpoints; /*< indexed dictionaries as discovered */
auto_mem<Bytef> inbuf; /*< Compressed data buffer */
int gz_fd = -1; /*< The file to read data from. */
};
/** Construct an empty line_buffer. */
line_buffer();
@ -103,7 +189,7 @@ public:
};
bool is_compressed() const {
return this->lb_gz_file != NULL || this->lb_bz_file;
return this->lb_gz_file || this->lb_bz_file;
};
off_t get_read_offset(off_t off) const
@ -235,7 +321,7 @@ private:
shared_buffer lb_share_manager;
auto_fd lb_fd; /*< The file to read data from. */
gzFile lb_gz_file; /*< File handle for gzipped files. */
gz_indexed lb_gz_file; /*< File reader for gzipped files. */
bool lb_bz_file; /*< Flag set for bzip2 compressed files. */
off_t lb_compressed_offset; /*< The offset into the compressed file. */

View File

@ -56,7 +56,7 @@ int main(int argc, char *argv[])
{
int c, rnd_iters = 5, retval = EXIT_SUCCESS;
vector<tuple<int, off_t, ssize_t> > index;
auto_fd fd = STDIN_FILENO;
auto_fd fd = STDIN_FILENO, fd_cmp;
int offseti = 0;
off_t offset = 0;
int count = 1000;
@ -135,12 +135,20 @@ int main(int argc, char *argv[])
} else if ((argc > 0) && (fstat(fd, &st) == -1)) {
perror("fstat");
retval = EXIT_FAILURE;
} else if ((argc > 1) && (fd_cmp = open(argv[1], O_RDONLY)) == -1) {
perror("open-cmp");
retval = EXIT_FAILURE;
} else if ((argc > 1) && (fstat(fd_cmp, &st) == -1)) {
perror("fstat-cmp");
retval = EXIT_FAILURE;
} else {
try {
file_range last_range{offset};
line_buffer lb;
char *maddr;
int fd2 = (argc > 1) ? fd_cmp.get() : fd.get();
assert(fd2 >= 0);
lb.set_fd(fd);
if (index.size() == 0) {
while (count) {
@ -179,7 +187,7 @@ int main(int argc, char *argv[])
st.st_size,
PROT_READ,
MAP_FILE | MAP_PRIVATE,
lb.get_fd(),
fd2,
0)) == MAP_FAILED) {
perror("mmap");
retval = EXIT_FAILURE;

View File

@ -56,3 +56,23 @@ run_test ./drive_line_buffer -i lb.index -n 10 lb-2.dat
check_output "Random reads don't match input?" <<EOF
All done
EOF
gzip -c ${test_dir}/logfile_access_log.1 > lb-double.gz
gzip -c ${test_dir}/logfile_access_log.1 >> lb-double.gz
run_test ${lnav_test} -n lb-double.gz
gzip -dc lb-double.gz | \
check_output "concatenated gzip files don't parse correctly"
> lb-3.gz
while test $(stat -c"%s" lb-3.gz) -le 5000000 ; do
cat lb-2.dat
done | gzip -c -1 > lb-3.gz
gzip -dc lb-3.gz > lb-3.dat
grep -b '$' lb-3.dat | cut -f 1 -d : > lb-3.index
run_test ./drive_line_buffer -i lb-3.index -n 10 lb-3.gz lb-3.dat
check_output "Random gzipped reads don't match input" <<EOF
All done
EOF