Merge pull request #677 from phord/gzip-index

Add a gzip indexing class for faster gz navigation
This commit is contained in:
Tim Stack 2019-09-09 21:35:44 -07:00 committed by GitHub
commit b2463e44a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 292 additions and 29 deletions

View File

@ -60,9 +60,9 @@ static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024;
/*
* XXX REMOVE ME
*
* The stock gzipped file code does not use pread, so we need to use a lock to
* The stock bzipped file code does not use pread, so we need to use a lock to
* get exclusive access to the file. In the future, we should just rewrite
* the gzipped file code to use pread.
* the bzipped file code to use pread.
*/
class lock_hack {
public:
@ -116,9 +116,173 @@ private:
};
/* XXX END */
#define Z_BUFSIZE 65536U
#define SYNCPOINT_SIZE (1024 * 1024)
line_buffer::gz_indexed::gz_indexed()
{
if ((this->inbuf = (Bytef *)malloc(Z_BUFSIZE)) == NULL) {
throw bad_alloc();
}
}
void line_buffer::gz_indexed::close()
{
// Release old stream, if we were open
if (*this) {
inflateEnd(&this->strm);
::close(this->gz_fd);
this->syncpoints.clear();
this->gz_fd = -1;
}
}
void line_buffer::gz_indexed::init_stream()
{
if (*this) {
inflateEnd(&this->strm);
}
// initialize inflate struct
this->strm.zalloc = Z_NULL;
this->strm.zfree = Z_NULL;
this->strm.opaque = Z_NULL;
this->strm.avail_in = 0;
this->strm.next_in = Z_NULL;
this->strm.avail_out = 0;
int rc = inflateInit2(&strm, GZ_HEADER_MODE);
if (rc != Z_OK) {
throw(rc); // FIXME: exception wrapper
}
}
void line_buffer::gz_indexed::continue_stream()
{
// Save our position and output buffer
auto total_in = this->strm.total_in;
auto total_out = this->strm.total_out;
auto avail_out = this->strm.avail_out;
auto next_out = this->strm.next_out;
init_stream();
// Restore position and output buffer
this->strm.total_in = total_in;
this->strm.total_out = total_out;
this->strm.avail_out = avail_out;
this->strm.next_out = next_out;
}
void line_buffer::gz_indexed::open(int fd)
{
this->close();
this->init_stream();
this->gz_fd = fd;
}
int line_buffer::gz_indexed::stream_data(void * buf, size_t size)
{
this->strm.avail_out = size;
this->strm.next_out = (unsigned char *) buf;
int last = this->syncpoints.empty() ? 0 :
this->syncpoints.back().in;
while (this->strm.avail_out) {
if (!this->strm.avail_in) {
int rc = ::pread(this->gz_fd,
&this->inbuf[0],
Z_BUFSIZE,
this->strm.total_in);
if (rc < 0) {
return rc;
}
this->strm.next_in = this->inbuf;
this->strm.avail_in = rc;
}
if (this->strm.avail_in) {
int flush = last > this->strm.total_in
? Z_SYNC_FLUSH : Z_BLOCK;
auto err = inflate(&this->strm, flush);
if (err == Z_STREAM_END) {
// Reached end of stream; re-init for a possible subsequent stream
continue_stream();
} else if (err != Z_OK) {
log_error(" inflate-error: %d", (int)err);
throw error(err); // FIXME: exception wrapper
}
if (this->strm.total_in >= last + SYNCPOINT_SIZE &&
size > this->strm.avail_out + GZ_WINSIZE &&
(this->strm.data_type & GZ_END_OF_BLOCK_MASK) &&
!(this->strm.data_type & GZ_END_OF_FILE_MASK))
{
this->syncpoints.emplace_back(this->strm, size);
last = this->strm.total_out;
}
} else if (this->strm.avail_out) {
// Processed all the gz file data but didn't fill
// the output buffer. We're done, even though we
// produced fewer bytes than requested.
break;
}
}
return size - this->strm.avail_out;
}
void line_buffer::gz_indexed::seek(size_t offset)
{
if (offset == this->strm.total_out) {
return;
}
indexDict * dict = NULL;
// Find highest syncpoint not past offset
// FIXME: Make this a binary-tree search
for (auto &d : this->syncpoints) {
if (d.out <= offset) {
dict = &d;
} else {
break;
}
}
// Choose highest available syncpoint, or keep current offset if it's ok
if (offset < this->strm.total_out ||
(dict && this->strm.total_out < dict->out)) {
// Release the old z_stream
inflateEnd(&this->strm);
if (dict) {
dict->apply(&this->strm);
} else {
init_stream();
}
}
// Stream from compressed file until we reach our offset
unsigned char dummy[Z_BUFSIZE];
while ( offset > this->strm.total_out) {
size_t to_copy = std::min(static_cast<size_t>(Z_BUFSIZE),
offset - this->strm.total_out);
auto bytes = stream_data(dummy, to_copy);
if (bytes <= 0) {
break;
}
}
}
int line_buffer::gz_indexed::read(void * buf, size_t offset, size_t size)
{
if (offset != this->strm.total_out) {
this->seek(offset);
}
int bytes = stream_data(buf, size);
return bytes;
}
line_buffer::line_buffer()
: lb_gz_file(NULL),
lb_bz_file(false),
: lb_bz_file(false),
lb_compressed_offset(0),
lb_file_size(-1),
lb_file_offset(0),
@ -149,8 +313,7 @@ void line_buffer::set_fd(auto_fd &fd)
off_t newoff = 0;
if (this->lb_gz_file) {
gzclose(this->lb_gz_file);
this->lb_gz_file = NULL;
this->lb_gz_file.close();
}
if (this->lb_bz_file) {
@ -181,15 +344,7 @@ void line_buffer::set_fd(auto_fd &fd)
close(gzfd);
throw error(errno);
}
if ((this->lb_gz_file = gzdopen(gzfd, "r")) == NULL) {
close(gzfd);
if (errno == 0) {
throw bad_alloc();
}
else{
throw error(errno);
}
}
lb_gz_file.open(gzfd);
this->lb_file_time = read_le32(
(const unsigned char *)&gz_id[4]);
if (this->lb_file_time < 0) {
@ -343,16 +498,10 @@ bool line_buffer::fill_range(off_t start, ssize_t max_length)
rc = 0;
}
else {
lock_hack::guard guard;
lseek(this->lb_fd, this->lb_compressed_offset, SEEK_SET);
gzseek(this->lb_gz_file,
this->lb_file_offset + this->lb_buffer_size,
SEEK_SET);
rc = gzread(this->lb_gz_file,
&this->lb_buffer[this->lb_buffer_size],
this->lb_buffer_max - this->lb_buffer_size);
this->lb_compressed_offset = lseek(this->lb_fd, 0, SEEK_CUR);
rc = this->lb_gz_file.read(&this->lb_buffer[this->lb_buffer_size],
this->lb_file_offset + this->lb_buffer_size,
this->lb_buffer_max - this->lb_buffer_size);
this->lb_compressed_offset = this->lb_gz_file.get_source_offset();
if (rc != -1 && (
rc < (this->lb_buffer_max - this->lb_buffer_size))) {
this->lb_file_size = (

View File

@ -38,6 +38,7 @@
#include <zlib.h>
#include <exception>
#include <vector>
#include "base/lnav_log.hh"
#include "base/file_range.hh"
@ -74,6 +75,91 @@ public:
int e_err;
};
#define GZ_WINSIZE 32768U /*> gzip's max supported dictionary is 15-bits */
#define GZ_RAW_MODE (-15) /*> Raw inflate data mode */
#define GZ_HEADER_MODE (15 + 32) /*> Automatic zstd or gzip decoding */
#define GZ_BORROW_BITS_MASK 7 /*> Bits (0-7) consumed in previous block */
#define GZ_END_OF_BLOCK_MASK 128 /*> Stopped because reached end-of-block */
#define GZ_END_OF_FILE_MASK 64 /*> Stopped because reached end-of-file */
/**
* A memoized gzip file reader that can do random file access faster than
* gzseek/gzread alone.
*/
class gz_indexed {
public:
gz_indexed();
gz_indexed(gz_indexed &&other) = default;
~gz_indexed() {
this->close();
}
inline operator bool() const {
return this->gz_fd != -1;
}
uLong get_source_offset() {
return !!*this ? this->strm.total_in + this->strm.avail_in : 0;
}
void close();
void init_stream();
void continue_stream();
void open(int fd);
int stream_data(void * buf, size_t size);
void seek(size_t offset);
/**
* Decompress bytes from the gz file returning at most `size` bytes.
* offset is the byte-offset in the decompressed data stream.
*/
int read(void * buf, size_t offset, size_t size);
struct indexDict {
off_t in = 0;
off_t out = 0;
unsigned char bits = 0;
unsigned char in_bits = 0;
Bytef index[GZ_WINSIZE];
indexDict(z_stream const & s, const off_t size) {
assert((s.data_type & GZ_END_OF_BLOCK_MASK));
assert(!(s.data_type & GZ_END_OF_FILE_MASK));
assert(size >= s.avail_out + GZ_WINSIZE);
this->bits = s.data_type & GZ_BORROW_BITS_MASK;
this->in = s.total_in;
this->out = s.total_out;
auto last_byte_in = s.next_in[-1];
this->in_bits = last_byte_in >> (8 - this->bits);
// Copy the last 32k uncompressed data (sliding window) to our index
memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE);
}
int apply(z_streamp s) {
s->zalloc = Z_NULL;
s->zfree = Z_NULL;
s->opaque = Z_NULL;
s->avail_in = 0;
s->next_in = Z_NULL;
auto ret = inflateInit2(s, GZ_RAW_MODE);
if (ret != Z_OK) {
return ret;
}
if (this->bits) {
inflatePrime(s, this->bits, this->in_bits);
}
s->total_in = this->in;
s->total_out = this->out;
inflateSetDictionary(s, this->index, GZ_WINSIZE);
return ret;
}
};
private:
z_stream strm; /*< gzip streams structure */
std::vector<indexDict> syncpoints; /*< indexed dictionaries as discovered */
auto_mem<Bytef> inbuf; /*< Compressed data buffer */
int gz_fd = -1; /*< The file to read data from. */
};
/** Construct an empty line_buffer. */
line_buffer();
@ -103,7 +189,7 @@ public:
};
bool is_compressed() const {
return this->lb_gz_file != NULL || this->lb_bz_file;
return this->lb_gz_file || this->lb_bz_file;
};
off_t get_read_offset(off_t off) const
@ -235,7 +321,7 @@ private:
shared_buffer lb_share_manager;
auto_fd lb_fd; /*< The file to read data from. */
gzFile lb_gz_file; /*< File handle for gzipped files. */
gz_indexed lb_gz_file; /*< File reader for gzipped files. */
bool lb_bz_file; /*< Flag set for bzip2 compressed files. */
off_t lb_compressed_offset; /*< The offset into the compressed file. */

View File

@ -56,7 +56,7 @@ int main(int argc, char *argv[])
{
int c, rnd_iters = 5, retval = EXIT_SUCCESS;
vector<tuple<int, off_t, ssize_t> > index;
auto_fd fd = STDIN_FILENO;
auto_fd fd = STDIN_FILENO, fd_cmp;
int offseti = 0;
off_t offset = 0;
int count = 1000;
@ -135,12 +135,20 @@ int main(int argc, char *argv[])
} else if ((argc > 0) && (fstat(fd, &st) == -1)) {
perror("fstat");
retval = EXIT_FAILURE;
} else if ((argc > 1) && (fd_cmp = open(argv[1], O_RDONLY)) == -1) {
perror("open-cmp");
retval = EXIT_FAILURE;
} else if ((argc > 1) && (fstat(fd_cmp, &st) == -1)) {
perror("fstat-cmp");
retval = EXIT_FAILURE;
} else {
try {
file_range last_range{offset};
line_buffer lb;
char *maddr;
int fd2 = (argc > 1) ? fd_cmp.get() : fd.get();
assert(fd2 >= 0);
lb.set_fd(fd);
if (index.size() == 0) {
while (count) {
@ -179,7 +187,7 @@ int main(int argc, char *argv[])
st.st_size,
PROT_READ,
MAP_FILE | MAP_PRIVATE,
lb.get_fd(),
fd2,
0)) == MAP_FAILED) {
perror("mmap");
retval = EXIT_FAILURE;

View File

@ -56,3 +56,23 @@ run_test ./drive_line_buffer -i lb.index -n 10 lb-2.dat
check_output "Random reads don't match input?" <<EOF
All done
EOF
gzip -c ${test_dir}/logfile_access_log.1 > lb-double.gz
gzip -c ${test_dir}/logfile_access_log.1 >> lb-double.gz
run_test ${lnav_test} -n lb-double.gz
gzip -dc lb-double.gz | \
check_output "concatenated gzip files don't parse correctly"
> lb-3.gz
while test $(stat -c"%s" lb-3.gz) -le 5000000 ; do
cat lb-2.dat
done | gzip -c -1 > lb-3.gz
gzip -dc lb-3.gz > lb-3.dat
grep -b '$' lb-3.dat | cut -f 1 -d : > lb-3.index
run_test ./drive_line_buffer -i lb-3.index -n 10 lb-3.gz lb-3.dat
check_output "Random gzipped reads don't match input" <<EOF
All done
EOF