[remote] start to wire up remote support

This commit is contained in:
Timothy Stack 2021-05-01 08:33:16 -07:00
parent 5742e5eb09
commit f050c7b0a7
38 changed files with 1240 additions and 192 deletions

View File

@ -1,6 +1,6 @@
# aminclude_static.am generated automatically by Autoconf
# from AX_AM_MACROS_STATIC on Thu Apr 22 08:56:31 PDT 2021
# from AX_AM_MACROS_STATIC on Thu Apr 29 09:08:09 PDT 2021
# Code coverage

View File

@ -294,6 +294,7 @@ AC_CONFIG_FILES([src/fmtlib/Makefile])
AC_CONFIG_FILES([src/pcrepp/Makefile])
AC_CONFIG_FILES([src/pugixml/Makefile])
AC_CONFIG_FILES([src/tailer/Makefile])
AC_CONFIG_FILES([src/tools/Makefile])
AC_CONFIG_FILES([src/yajl/Makefile])
AC_CONFIG_FILES([src/yajlpp/Makefile])
AC_CONFIG_FILES([test/Makefile])

View File

@ -17,7 +17,7 @@ add_subdirectory(base)
add_subdirectory(remote)
add_subdirectory(tailer)
add_executable(bin2c bin2c.hh bin2c.c)
add_executable(bin2c bin2c.hh tools/bin2c.c)
target_link_libraries(bin2c ZLIB::zlib)
add_executable(ptimec ptimec.hh ptimec.c)
@ -381,7 +381,6 @@ add_library(diag STATIC
attr_line.hh
auto_fd.hh
auto_mem.hh
auto_pid.hh
big_array.hh
bottom_status_source.hh
bound_tags.hh
@ -499,7 +498,13 @@ target_include_directories(
${CMAKE_CURRENT_BINARY_DIR}
)
target_link_libraries(diag base ${lnav_LIBS})
target_link_libraries(diag
base
tailerservice
tailerpp
tailercommon
${lnav_LIBS}
)
target_compile_definitions(diag PRIVATE SQLITE_OMIT_LOAD_EXTENSION)
check_library_exists(util openpty "" HAVE_LIBUTIL)

View File

@ -1,7 +1,7 @@
include $(top_srcdir)/aminclude_static.am
SUBDIRS = fmtlib base tailer pcrepp pugixml yajl yajlpp .
SUBDIRS = tools fmtlib base tailer pcrepp pugixml yajl yajlpp .
bin_PROGRAMS = lnav
@ -23,8 +23,8 @@ RE2C_V_0 = @echo " RE2C " $@;
include formats/formats.am
default-formats.h default-formats.cc: bin2c$(BUILD_EXEEXT) $(FORMAT_FILES)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) -n lnav_format_json default-formats $(FORMAT_FILES)
default-formats.h default-formats.cc: tools/bin2c$(BUILD_EXEEXT) $(FORMAT_FILES)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) -n lnav_format_json default-formats $(FORMAT_FILES)
include keymaps/keymaps.am
include themes/themes.am
@ -35,31 +35,31 @@ CONFIG_FILES = \
$(THEME_FILES) \
$()
default-config.h default-config.cc: bin2c$(BUILD_EXEEXT) $(CONFIG_FILES)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) -n lnav_config_json default-config $(CONFIG_FILES)
default-config.h default-config.cc: tools/bin2c$(BUILD_EXEEXT) $(CONFIG_FILES)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) -n lnav_config_json default-config $(CONFIG_FILES)
include scripts/scripts.am
builtin-scripts.h builtin-scripts.cc: bin2c$(BUILD_EXEEXT) $(BUILTIN_LNAVSCRIPTS)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) -n lnav_scripts builtin-scripts $(BUILTIN_LNAVSCRIPTS)
builtin-scripts.h builtin-scripts.cc: tools/bin2c$(BUILD_EXEEXT) $(BUILTIN_LNAVSCRIPTS)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) -n lnav_scripts builtin-scripts $(BUILTIN_LNAVSCRIPTS)
builtin-sh-scripts.h builtin-sh-scripts.cc: bin2c$(BUILD_EXEEXT) $(BUILTIN_SHSCRIPTS)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) -n lnav_sh_scripts builtin-sh-scripts $(BUILTIN_SHSCRIPTS)
builtin-sh-scripts.h builtin-sh-scripts.cc: tools/bin2c$(BUILD_EXEEXT) $(BUILTIN_SHSCRIPTS)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) -n lnav_sh_scripts builtin-sh-scripts $(BUILTIN_SHSCRIPTS)
%-sh.cc: $(srcdir)/%.sh bin2c$(BUILD_EXEEXT)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) $(*)-sh $<
%-sh.cc: $(srcdir)/%.sh tools/bin2c$(BUILD_EXEEXT)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) $(*)-sh $<
%-txt.cc %-txt.h: $(srcdir)/%.txt bin2c$(BUILD_EXEEXT)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) $(*)-txt $<
%-txt.cc %-txt.h: $(srcdir)/%.txt tools/bin2c$(BUILD_EXEEXT)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) $(*)-txt $<
%-sql.cc %-sql.h: $(srcdir)/%.sql bin2c$(BUILD_EXEEXT)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) $(*)-sql $<
%-sql.cc %-sql.h: $(srcdir)/%.sql tools/bin2c$(BUILD_EXEEXT)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) $(*)-sql $<
%-lnav.cc %-lnav.h: $(srcdir)/%.lnav bin2c$(BUILD_EXEEXT)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) $(*)-lnav $<
%-lnav.cc %-lnav.h: $(srcdir)/%.lnav tools/bin2c$(BUILD_EXEEXT)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) $(*)-lnav $<
%-json.cc %-json.h: $(srcdir)/%.json bin2c$(BUILD_EXEEXT)
$(BIN2C_V)./bin2c$(BUILD_EXEEXT) $(*)-json $<
%-json.cc %-json.h: $(srcdir)/%.json tools/bin2c$(BUILD_EXEEXT)
$(BIN2C_V)tools/bin2c$(BUILD_EXEEXT) $(*)-json $<
include time_formats.am
@ -130,6 +130,9 @@ LDADD = \
fmtlib/libcppfmt.a \
pcrepp/libpcrepp.a \
pugixml/libpugixml.a \
tailer/libtailerservice.a \
tailer/libtailercommon.a \
tailer/libtailerpp.a \
yajl/libyajl.a \
yajlpp/libyajlpp.a \
$(READLINE_LIBS) \
@ -157,7 +160,6 @@ noinst_HEADERS = \
attr_line.hh \
auto_fd.hh \
auto_mem.hh \
auto_pid.hh \
big_array.hh \
bin2c.hh \
bookmarks.hh \
@ -404,9 +406,6 @@ lnav_SOURCES = lnav.cc $(PLUGIN_SRCS)
lnav_test_SOURCES = lnav.cc test_override.c $(PLUGIN_SRCS)
bin2c$(BUILD_EXEEXT): bin2c.c
$(AM_V_CC) $(CC_FOR_BUILD) $(CPPFLAGS) $(LDFLAGS) -g3 -o $@ $? -lz
ptimec$(BUILD_EXEEXT): ptimec.c
$(AM_V_CC) $(CC_FOR_BUILD) $(CPPFLAGS) $(LDFLAGS) -g3 -o $@ $?
@ -415,12 +414,10 @@ RE2C_FILES = data_scanner_re.cc log_level_re.cc
endif
EXTRA_DIST = \
bin2c.c \
ptimec.c
CLEANFILES = \
ptimec$(BUILD_EXEEXT) \
bin2c$(BUILD_EXEEXT)
ptimec$(BUILD_EXEEXT)
DISTCLEANFILES = \
$(LNAV_BUILT_FILES) \

View File

@ -38,9 +38,11 @@
#include <sys/select.h>
#include <new>
#include <string>
#include <exception>
#include "base/lnav_log.hh"
#include "base/result.h"
/**
* Resource management class for file descriptors.
@ -229,6 +231,16 @@ private:
class auto_pipe {
public:
static Result<auto_pipe, std::string> for_child_fd(int child_fd) {
auto_pipe retval(child_fd);
if (retval.open() == -1) {
return Err(std::string(strerror(errno)));
}
return Ok(retval);
}
explicit auto_pipe(int child_fd = -1, int child_flags = O_RDONLY)
: ap_child_flags(child_flags), ap_child_fd(child_fd)
{

View File

@ -2,6 +2,7 @@
add_library(base STATIC
../config.h
auto_pid.cc
date_time_scanner.cc
humanize.cc
humanize.time.cc
@ -15,6 +16,7 @@ add_library(base STATIC
time_util.cc
auto_pid.hh
date_time_scanner.hh
enum_util.hh
func_util.hh

View File

@ -14,6 +14,7 @@ AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
noinst_LIBRARIES = libbase.a
noinst_HEADERS = \
auto_pid.hh \
date_time_scanner.hh \
enum_util.hh \
file_range.hh \
@ -37,6 +38,7 @@ noinst_HEADERS = \
time_util.hh
libbase_a_SOURCES = \
auto_pid.cc \
date_time_scanner.cc \
humanize.cc \
humanize.time.cc \

55
src/base/auto_pid.cc Normal file
View File

@ -0,0 +1,55 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include <unistd.h>
#include "lnav_log.hh"
#include "fmt/format.h"
#include "auto_pid.hh"
namespace lnav {
namespace pid {
Result<auto_pid<process_state::RUNNING>, std::string> from_fork()
{
auto pid = ::fork();
if (pid == -1) {
return Err(fmt::format("fork() failed: {}", strerror(errno)));
}
if (pid != 0) {
log_debug("started child: %d", pid);
}
return Ok(auto_pid<process_state::RUNNING>(pid));
}
}
}

View File

@ -37,93 +37,106 @@
#include <sys/types.h>
#include <sys/wait.h>
#include "base/result.h"
#include "base/lnav_log.hh"
enum class process_state {
RUNNING,
FINISHED,
};
template<process_state ProcState>
class auto_pid {
public:
explicit auto_pid(pid_t child = -1) : ap_child(child)
explicit auto_pid(pid_t child, int status = 0) : ap_child(child),
ap_status(status)
{};
auto_pid(const auto_pid &other) = delete;
auto_pid(auto_pid &&other) noexcept : ap_child(other.release())
auto_pid(auto_pid &&other) noexcept: ap_child(std::move(other).release())
{};
~auto_pid()
{ this->reset(); };
auto_pid &operator=(auto_pid &&other) noexcept {
this->reset(other.release());
auto_pid &operator=(auto_pid &&other) noexcept
{
this->reset(std::move(other).release());
this->ap_status = other.ap_status;
return *this;
};
pid_t in() const {
return this->ap_child;
}
bool failed() const {
return this->ap_child == -1;
}
pid_t in() const
{ return this->ap_child; }
bool in_child() const
{
static_assert(ProcState == process_state::RUNNING,
"this method is only available in the RUNNING state");
return this->ap_child == 0;
};
pid_t release()
pid_t release() &&
{
pid_t retval = this->ap_child;
this->ap_child = -1;
return retval;
return std::exchange(this->ap_child, -1);
};
int status() const
{
static_assert(ProcState == process_state::FINISHED,
"wait_for_child() must be called first");
return this->ap_status;
};
bool was_normal_exit() const
{
static_assert(ProcState == process_state::FINISHED,
"wait_for_child() must be called first");
return WIFEXITED(this->ap_status);
}
int exit_status() const
{
static_assert(ProcState == process_state::FINISHED,
"wait_for_child() must be called first");
return WEXITSTATUS(this->ap_status);
}
bool wait_for_child(int options = 0)
auto_pid<process_state::FINISHED> wait_for_child(int options = 0) &&
{
if (this->ap_child != -1) {
int rc;
while ((rc = waitpid(this->ap_child,
&this->ap_status,
options)) < 0 && (errno == EINTR)) { ;
}
if (rc > 0) {
this->ap_child = -1;
while ((waitpid(this->ap_child,
&this->ap_status,
options)) < 0 && (errno == EINTR)) { ;
}
}
return this->ap_child == -1;
};
return auto_pid<process_state::FINISHED>(
std::exchange(this->ap_child, -1), this->ap_status);
}
void reset(pid_t child = -1)
{
if (this->ap_child != child) {
this->ap_status = 0;
if (this->ap_child != -1) {
log_debug("sending SIGTERM to child: %d", this->ap_child);
kill(this->ap_child, SIGTERM);
this->wait_for_child();
}
this->ap_child = child;
}
};
}
private:
pid_t ap_child;
int ap_status{0};
};
namespace lnav {
namespace pid {
Result<auto_pid<process_state::RUNNING>, std::string> from_fork();
}
}
#endif

View File

@ -35,20 +35,93 @@
namespace isc {
void *service::run()
void service_base::start()
{
log_info("isc thread started");
log_debug("starting service thread for: %s", this->s_name.c_str());
this->s_thread = std::thread(&service_base::run, this);
this->s_started = true;
}
void *service_base::run()
{
log_info("BEGIN isc thread: %s", this->s_name.c_str());
while (this->s_looping) {
mstime_t current_time = getmstime();
auto timeout = this->compute_timeout(current_time);
this->s_port.process_for(timeout);
this->s_children.cleanup_children();
this->loop_body();
}
log_info("isc thread exiting");
if (!this->s_children.empty()) {
log_debug("stopping children of service: %s", this->s_name.c_str());
this->s_children.stop_children();
}
this->stopped();
log_info("END isc thread: %s", this->s_name.c_str());
return nullptr;
}
void service_base::stop()
{
if (this->s_started) {
log_debug("stopping service thread: %s", this->s_name.c_str());
if (this->s_looping) {
this->s_looping = false;
this->s_port.send(empty_msg());
}
log_debug("waiting for service thread: %s", this->s_name.c_str());
this->s_thread.join();
log_debug("joined service thread: %s", this->s_name.c_str());
this->s_started = false;
}
}
supervisor::supervisor(service_list servs, service_base *parent)
: s_service_list(std::move(servs)), s_parent(parent) {
for (auto& serv : this->s_service_list) {
serv->start();
}
}
supervisor::~supervisor()
{
this->stop_children();
}
void supervisor::stop_children()
{
for (auto& serv : this->s_service_list) {
serv->stop();
}
this->cleanup_children();
}
void supervisor::cleanup_children()
{
this->s_service_list.erase(
std::remove_if(
this->s_service_list.begin(), this->s_service_list.end(),
[this](auto &child) {
if (child->is_looping()) {
return false;
}
child->stop();
if (this->s_parent != nullptr) {
this->s_parent->child_finished(child);
}
return true;
}),
this->s_service_list.end());
}
void supervisor::add_child_service(std::shared_ptr<service_base> new_service)
{
this->s_service_list.emplace_back(new_service);
new_service->start();
}
}

View File

@ -35,6 +35,7 @@
#include <mutex>
#include <thread>
#include <condition_variable>
#include <utility>
#include "injector.hh"
#include "time_util.hh"
@ -93,88 +94,115 @@ private:
safe_message_list mp_messages;
};
class service {
class service_base;
using service_list = std::vector<std::shared_ptr<service_base>>;
struct supervisor {
explicit supervisor(service_list servs = {},
service_base *parent = nullptr);
~supervisor();
bool empty() const {
return this->s_service_list.empty();
}
void add_child_service(std::shared_ptr<service_base> new_service);
void stop_children();
void cleanup_children();
protected:
service_list s_service_list;
service_base *s_parent;
};
class service_base : public std::enable_shared_from_this<service_base> {
public:
virtual ~service() = default;
explicit service_base(std::string name)
: s_name(std::move(name)), s_children({}, this) {
}
void start() {
log_debug("starting service thread");
this->s_thread = std::thread(&service::run, this);
this->s_started = true;
};
virtual ~service_base() = default;
void stop() {
if (this->s_started) {
this->s_looping = false;
this->s_port.send(empty_msg());
log_debug("waiting for service thread");
this->s_thread.join();
log_debug("service thread joined");
this->s_started = false;
}
};
bool is_looping() const {
return this->s_looping;
}
msg_port& get_port() { return this->s_port; };
msg_port& get_port() {
return this->s_port;
}
friend supervisor;
private:
void start();
void stop();
protected:
void *run();
virtual void loop_body() {};
virtual void child_finished(std::shared_ptr<service_base> child) {};
virtual void stopped() {};
virtual std::chrono::milliseconds compute_timeout(mstime_t current_time) const {
using namespace std::literals::chrono_literals;
return 1s;
};
const std::string s_name;
bool s_started{false};
std::thread s_thread;
std::atomic<bool> s_looping{true};
msg_port s_port;
supervisor s_children;
};
using service_list = std::vector<std::shared_ptr<service>>;
struct service_guard {
service_guard(service_list servs)
: sg_service_list(servs) {
for (auto& serv : servs) {
serv->start();
}
};
~service_guard() {
for (auto& serv : this->sg_service_list) {
serv->stop();
}
template<typename T>
class service : public service_base {
public:
explicit service(std::string sub_name = "")
: service_base(std::string(__PRETTY_FUNCTION__) + " " + sub_name) {
}
template<typename F>
void send(F msg) {
this->s_port.send({
[lifetime = this->shared_from_this(), this, msg]() {
msg(*(static_cast<T *>(this)));
}
});
}
template<typename F, class Rep, class Period>
void send_and_wait(F msg,
const std::chrono::duration<Rep, Period>& rel_time) {
msg_port reply_port;
this->s_port.send({
[lifetime = this->shared_from_this(), this, &reply_port, msg]() {
msg(*(static_cast<T *>(this)));
reply_port.send(empty_msg());
}
});
reply_port.template process_for(rel_time);
}
private:
service_list sg_service_list;
};
template<typename T, typename Service, typename...Annotations>
struct to {
void send(std::function<void(T)> cb) {
auto& service = injector::get<isc::service&, Service>();
void send(std::function<void(T&)> cb) {
auto& service = injector::get<T&, Service>();
service.get_port().send({
[cb]() {
cb(injector::get<T, Service, Annotations...>());
}
});
service.send(cb);
}
template<class Rep, class Period>
void send_and_wait(std::function<void(T)> cb,
const std::chrono::duration<Rep, Period>& rel_time) {
auto& service = injector::get<isc::service&, Service>();
msg_port reply_port;
auto& service = injector::get<T&, Service>();
service.get_port().send({
[cb, &reply_port]() {
cb(injector::get<T, Service, Annotations...>());
reply_port.send(empty_msg());
}
});
reply_port.template process_for(rel_time);
service.send_and_wait(cb, rel_time);
}
void send_and_wait(std::function<void(T)> cb) {

View File

@ -885,6 +885,34 @@ struct Result {
std::terminate();
}
template<typename U = T>
typename std::enable_if<
!std::is_same<U, void>::value,
U
>::type
unwrap() {
if (isOk()) {
return std::move(storage().template get<U>());
}
::fprintf(stderr, "Attempting to unwrap an error Result\n");
std::terminate();
}
template<typename U = T>
typename std::enable_if<
std::is_same<U, void>::value,
U
>::type
unwrap() const {
if (isOk()) {
return;
}
::fprintf(stderr, "Attempting to unwrap an error Result\n");
std::terminate();
}
E unwrapErr() const {
if (isErr()) {
return storage().template get<E>();
@ -944,6 +972,5 @@ bool operator==(const Result<T, E>& lhs, types::Err<E> err) {
typedef details::ResultErrType<decltype(res)>::type E; \
return types::Err<E>(res.storage().get<E>()); \
} \
typedef details::ResultOkType<decltype(res)>::type T; \
res.storage().get<T>(); \
res.unwrap(); \
})

View File

@ -143,7 +143,7 @@ protected:
int cr_completions;
};
class curl_looper : public isc::service {
class curl_looper : public isc::service<curl_looper> {
public:
curl_looper()
: cl_curl_multi(curl_multi_cleanup) {

View File

@ -36,8 +36,13 @@
#include <unordered_map>
#include "base/opt_util.hh"
#include "base/isc.hh"
#include "logfile.hh"
#include "file_collection.hh"
#include "pcrepp/pcrepp.hh"
#include "tailer/tailer.looper.hh"
#include "service_tags.hh"
#include "lnav_util.hh"
static std::mutex REALPATH_CACHE_MUTEX;
static std::unordered_map<std::string, std::string> REALPATH_CACHE;
@ -45,10 +50,21 @@ static std::unordered_map<std::string, std::string> REALPATH_CACHE;
void file_collection::close_files(const std::vector<std::shared_ptr<logfile>> &files)
{
for (const auto& lf : files) {
if (lf->is_valid_filename()) {
std::lock_guard<std::mutex> lg(REALPATH_CACHE_MUTEX);
auto actual_path_opt = lf->get_actual_path();
if (actual_path_opt) {
std::lock_guard<std::mutex> lg(REALPATH_CACHE_MUTEX);
auto path_str = actual_path_opt.value().string();
for (auto iter = REALPATH_CACHE.begin();
iter != REALPATH_CACHE.end();) {
if (iter->first == path_str || iter->second == path_str) {
iter = REALPATH_CACHE.erase(iter);
} else {
++iter;
}
}
REALPATH_CACHE.erase(lf->get_filename());
} else {
this->fc_file_names.erase(lf->get_filename());
}
@ -107,6 +123,9 @@ void file_collection::merge(const file_collection &other)
this->fc_file_names.insert(other.fc_file_names.begin(),
other.fc_file_names.end());
if (!other.fc_files.empty()) {
for (const auto& lf : other.fc_files) {
this->fc_name_to_errors.erase(lf->get_filename());
}
this->fc_files.insert(this->fc_files.end(),
other.fc_files.begin(),
other.fc_files.end());
@ -343,6 +362,9 @@ void file_collection::expand_filename(lnav::futures::future_queue<file_collectio
logfile_open_options &loo,
bool required)
{
static const pcrepp REMOTE_PATTERN(
"(?:(?<username>[^@]+)@)?(?<hostname>[^:]+):(?<path>.*)");
static_root_mem<glob_t, globfree> gl;
{
@ -353,8 +375,32 @@ void file_collection::expand_filename(lnav::futures::future_queue<file_collectio
}
}
pcre_context_static<30> pc;
pcre_input pi(path);
if (is_url(path.c_str())) {
return;
}
if (REMOTE_PATTERN.match(pc, pi)) {
auto iter = this->fc_other_files.find(path);
if (iter != this->fc_other_files.end()) {
return;
}
const auto username = pi.get_substr_opt(pc["username"]);
const auto hostname = pi.get_substr(pc["hostname"]);
const auto remote_path = pi.get_substr(pc["path"]);
file_collection retval;
isc::to<tailer::looper &, services::remote_tailer_t>()
.send([=](auto &tlooper) {
tlooper.add_remote(to_netloc(username, hostname), remote_path);
});
retval.fc_other_files[path] = file_format_t::FF_REMOTE;
fq.push_back(lnav::futures::make_ready_future(retval));
} else if (glob(path.c_str(), GLOB_NOCHECK, nullptr, gl.inout()) == 0) {
int lpc;

View File

@ -39,6 +39,7 @@ enum class file_format_t {
FF_UNKNOWN,
FF_SQLITE_DB,
FF_ARCHIVE,
FF_REMOTE,
};
file_format_t detect_file_format(const ghc::filesystem::path& filename);
@ -57,6 +58,9 @@ struct formatter<file_format_t> : formatter<string_view> {
case file_format_t::FF_ARCHIVE:
name = "Archive";
break;
case file_format_t::FF_REMOTE:
name = "Remote";
break;
default:
break;
}

View File

@ -58,6 +58,9 @@ bool files_sub_source::list_input_handle_key(listview_curses &lv, int ch)
if (sel < 0) {
return true;
}
if (sel >= fc.fc_files.size()) {
return true;
}
auto& lss = lnav_data.ld_log_source;
auto &lf = fc.fc_files[sel];

View File

@ -123,6 +123,7 @@
#include "xpath_vtab.hh"
#include "textfile_highlighters.hh"
#include "base/future_util.hh"
#include "tailer/tailer.looper.hh"
#include "service_tags.hh"
#ifdef HAVE_LIBCURL
@ -250,9 +251,17 @@ static auto bound_scripts =
injector::bind<available_scripts>::to_singleton();
static auto bound_curl =
injector::bind_multiple<isc::service>()
injector::bind_multiple<isc::service_base>()
.add_singleton<curl_looper, services::curl_streamer_t>();
static auto bound_tailer =
injector::bind_multiple<isc::service_base>()
.add_singleton<tailer::looper, services::remote_tailer_t>();
static auto bound_main =
injector::bind_multiple<static_service>()
.add_singleton<main_looper, services::main_t>();
namespace injector {
template<>
void force_linking(last_relative_time_tag anno)
@ -268,6 +277,11 @@ template<>
void force_linking(services::remote_tailer_t anno)
{
}
template<>
void force_linking(services::main_t anno)
{
}
}
bool setup_logline_table(exec_context &ec)
@ -1473,6 +1487,13 @@ static void looper()
active_copy,
false);
}
{
auto& mlooper = injector::get<main_looper&, services::main_t>();
mlooper.get_port().process_for(0s);
}
if (initial_rescan_completed) {
rebuild_indexes();
} else {
@ -1638,7 +1659,12 @@ static void looper()
}
if (!initial_build &&
lnav_data.ld_log_source.text_line_count() == 0 &&
!lnav_data.ld_active_files.fc_other_files.empty()) {
!lnav_data.ld_active_files.fc_other_files.empty() &&
std::any_of(lnav_data.ld_active_files.fc_other_files.begin(),
lnav_data.ld_active_files.fc_other_files.end(),
[](const auto& pair) {
return pair.second == file_format_t::FF_ARCHIVE;
})) {
ensure_view(&lnav_data.ld_views[LNV_SCHEMA]);
}
@ -2406,7 +2432,7 @@ SELECT tbl_name FROM sqlite_master WHERE sql LIKE 'CREATE VIRTUAL TABLE%'
});
}
#endif
else if (is_glob(argv[lpc])) {
else if (is_glob(argv[lpc]) || strchr(argv[lpc], ':') != nullptr) {
lnav_data.ld_active_files.fc_file_names
.emplace(argv[lpc], logfile_open_options());
}
@ -2588,7 +2614,7 @@ SELECT tbl_name FROM sqlite_master WHERE sql LIKE 'CREATE VIRTUAL TABLE%'
usage();
}
else {
isc::service_guard serv_guard(injector::get<isc::service_list>());
isc::supervisor root_superv(injector::get<isc::service_list>());
try {
log_info("startup: %s", VCS_PACKAGE_STRING);

View File

@ -45,6 +45,7 @@
#include <unordered_map>
#include "base/future_util.hh"
#include "base/isc.hh"
#include "safe/safe.h"
#include "logfile.hh"
#include "hist_source.hh"
@ -285,6 +286,12 @@ struct lnav_data_t {
struct key_repeat_history ld_key_repeat_history;
};
struct static_service {};
class main_looper : public isc::service<main_looper>, public static_service {
public:
};
extern struct lnav_data_t lnav_data;
extern readline_context::command_map_t lnav_commands;

View File

@ -2002,7 +2002,7 @@ static Result<string, string> com_open(exec_context &ec, string cmdline, vector<
}
#endif
}
else if (is_glob(fn.c_str())) {
else if (is_glob(fn.c_str()) || fn.find(':') != string::npos) {
file_names.emplace(fn, logfile_open_options());
retval = "info: watching -- " + fn;
}

View File

@ -53,7 +53,7 @@
#include "base/lnav_log.hh"
#include "lnav_util.hh"
#include "auto_mem.hh"
#include "auto_pid.hh"
#include "base/auto_pid.hh"
#include "lnav_config.hh"
#include "yajlpp/yajlpp.hh"
#include "yajlpp/yajlpp_def.hh"
@ -212,8 +212,14 @@ bool install_from_git(const char *repo)
auto local_configs_path = configs_path / local_name;
auto local_staging_path = staging_path / local_name;
auto_pid git_cmd(fork());
auto fork_res = lnav::pid::from_fork();
if (fork_res.isErr()) {
fprintf(stderr, "error: cannot fork() to run git: %s\n",
fork_res.unwrapErr().c_str());
_exit(1);
}
auto git_cmd = fork_res.unwrap();
if (git_cmd.in_child()) {
if (ghc::filesystem::is_directory(local_formats_path)) {
printf("Updating format repo: %s\n", repo);
@ -231,9 +237,10 @@ bool install_from_git(const char *repo)
_exit(1);
}
git_cmd.wait_for_child();
auto finished_child = std::move(git_cmd).wait_for_child();
if (!git_cmd.was_normal_exit() || git_cmd.exit_status() != 0) {
if (!finished_child.was_normal_exit() ||
finished_child.exit_status() != 0) {
return false;
}

View File

@ -134,6 +134,15 @@ Result<std::string, std::string> read_file(const ghc::filesystem::path &path)
}
}
std::string to_netloc(const nonstd::optional<std::string>& username,
std::string hostname)
{
return fmt::format("{}{}{}",
username.value_or(std::string()),
username ? "@" : "",
hostname);
}
Result<std::pair<ghc::filesystem::path, int>, std::string>
open_temp_file(const ghc::filesystem::path &pattern)
{

View File

@ -127,6 +127,9 @@ std::string build_path(const std::vector<ghc::filesystem::path> &paths);
Result<std::string, std::string> read_file(const ghc::filesystem::path &path);
std::string to_netloc(const nonstd::optional<std::string>& username,
std::string hostname);
template<typename T>
size_t strtonum(T &num_out, const char *data, size_t len);

View File

@ -88,6 +88,7 @@ logfile::logfile(const string &filename, logfile_open_options &loo)
(long long) this->lf_stat.st_mtime,
filename.c_str());
this->lf_actual_path = filename;
this->lf_valid_filename = true;
}
else {
@ -119,13 +120,13 @@ bool logfile::exists() const
{
struct stat st;
if (!this->lf_valid_filename) {
if (!this->lf_actual_path) {
return true;
}
if (::stat(this->lf_filename.c_str(), &st) == -1) {
if (::stat(this->lf_actual_path.value().c_str(), &st) == -1) {
log_error("%s: stat failed -- %s",
this->lf_filename.c_str(),
this->lf_actual_path.value().c_str(),
strerror(errno));
return false;
}

View File

@ -119,6 +119,10 @@ public:
return this->lf_activity;
};
nonstd::optional<ghc::filesystem::path> get_actual_path() const {
return this->lf_actual_path;
}
/** @return The filename as given in the constructor. */
const std::string &get_filename() const { return this->lf_filename; };
@ -390,6 +394,7 @@ protected:
bool lf_named_file{true};
bool lf_valid_filename;
std::string lf_filename;
nonstd::optional<ghc::filesystem::path> lf_actual_path;
std::string lf_basename;
std::string lf_content_id;
struct stat lf_stat;

View File

@ -42,6 +42,7 @@ class logline_observer;
enum class logfile_name_source {
USER,
ARCHIVE,
REMOTE,
};
struct logfile_open_options {

View File

@ -259,6 +259,14 @@ public:
return intern_string::lookup(&this->pi_string[iter->c_begin], iter->length());
};
nonstd::optional<std::string> get_substr_opt(pcre_context::const_iterator iter) const {
if (iter->is_valid()) {
return std::string(&this->pi_string[iter->c_begin], iter->length());
}
return nonstd::nullopt;
}
void get_substr(pcre_context::const_iterator iter, char *dst) const {
memcpy(dst, &this->pi_string[iter->c_begin], iter->length());
dst[iter->length()] = '\0';
@ -563,6 +571,17 @@ public:
bool match(pcre_context &pc, pcre_input &pi, int options = 0) const;
template<size_t MATCH_COUNT>
nonstd::optional<pcre_context_static<MATCH_COUNT>> match(pcre_input &pi, int options = 0) const {
pcre_context_static<MATCH_COUNT> pc;
if (this->match(pc, pi, options)) {
return pc;
}
return nonstd::nullopt;
}
std::string replace(const char *str, const char *repl) const;
size_t match_partial(pcre_input &pi) const {

View File

@ -34,6 +34,7 @@
namespace services {
struct main_t {};
struct ui_t {};
struct curl_streamer_t {};
struct remote_tailer_t {};

View File

@ -25,6 +25,27 @@ add_library(
)
target_link_libraries(tailerpp base)
add_custom_command(
OUTPUT tailerbin.h tailerbin.cc
COMMAND bin2c -n tailer_bin tailerbin tailer
DEPENDS bin2c tailer
)
add_library(
tailerservice
tailer.looper.hh
tailer.looper.cc
tailerbin.h
tailerbin.cc
)
target_include_directories(
tailerservice
PUBLIC
${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries(tailerservice base)
add_executable(
drive_tailer

View File

@ -8,11 +8,15 @@ AM_LIBS = $(CODE_COVERAGE_LIBS)
AM_CFLAGS = $(CODE_COVERAGE_CFLAGS)
AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
noinst_LIBRARIES = libtailercommon.a libtailerpp.a
noinst_LIBRARIES = \
libtailercommon.a \
libtailerpp.a \
libtailerservice.a
noinst_HEADERS = \
sha-256.h \
tailer.h \
tailer.looper.hh \
tailerpp.hh
libtailercommon_a_SOURCES = \
@ -26,6 +30,21 @@ libtailerpp_a_CPPFLAGS = \
libtailerpp_a_SOURCES = \
tailerpp.cc
tailerbin.h tailerbin.cc: tailer ../tools/bin2c$(BUILD_EXEEXT)
../tools/bin2c$(BUILD_EXEEXT) -n tailer_bin $(*) $<
libtailerservice_a_CPPFLAGS = \
-I$(srcdir)/.. \
-I$(srcdir)/../fmtlib
libtailerservice_a_LIBADD = \
libtailercommon.a \
libtailerpp.a
libtailerservice_a_SOURCES = \
tailerbin.cc \
tailer.looper.cc
noinst_PROGRAMS = \
drive_tailer \
tailer
@ -47,3 +66,7 @@ drive_tailer_LDADD = \
libtailerpp.a \
../base/libbase.a \
../fmtlib/libcppfmt.a
DISTCLEANFILES = \
tailerbin.h \
tailerbin.cc

View File

@ -31,7 +31,7 @@
#include "config.h"
#include "ghc/filesystem.hpp"
#include "auto_pid.hh"
#include "base/auto_pid.hh"
#include "auto_fd.hh"
#include "tailerpp.hh"
@ -43,18 +43,34 @@ int main(int argc, char *const *argv)
// ghc::filesystem::remove_all(tmppath);
ghc::filesystem::create_directories(tmppath);
auto_pipe in_pipe(STDIN_FILENO);
auto_pipe out_pipe(STDOUT_FILENO);
in_pipe.open();
out_pipe.open();
auto child = auto_pid(fork());
if (child.failed()) {
auto in_pipe_res = auto_pipe::for_child_fd(STDIN_FILENO);
if (in_pipe_res.isErr()) {
fprintf(stderr,
"cannot open stdin pipe for child: %s\n",
in_pipe_res.unwrapErr().c_str());
exit(EXIT_FAILURE);
}
auto out_pipe_res = auto_pipe::for_child_fd(STDOUT_FILENO);
if (out_pipe_res.isErr()) {
fprintf(stderr,
"cannot open stdout pipe for child: %s\n",
out_pipe_res.unwrapErr().c_str());
exit(EXIT_FAILURE);
}
auto fork_res = lnav::pid::from_fork();
if (fork_res.isErr()) {
fprintf(stderr,
"cannot start tailer: %s\n",
fork_res.unwrapErr().c_str());
exit(EXIT_FAILURE);
}
auto in_pipe = in_pipe_res.unwrap();
auto out_pipe = out_pipe_res.unwrap();
auto child = fork_res.unwrap();
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
@ -98,23 +114,26 @@ int main(int argc, char *const *argv)
printf("all done!\n");
done = true;
},
[&](const tailer::packet_error &te) {
printf("Got an error: %s -- %s\n", te.te_path.c_str(),
te.te_msg.c_str());
[&](const tailer::packet_log &te) {
printf("log: %s\n", te.pl_msg.c_str());
},
[&](const tailer::packet_error &pe) {
printf("Got an error: %s -- %s\n", pe.pe_path.c_str(),
pe.pe_msg.c_str());
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(te.te_path)).relative_path();
ghc::filesystem::path(pe.pe_path)).relative_path();
auto local_path = tmppath / remote_path;
printf("removing %s\n", local_path.c_str());
ghc::filesystem::remove_all(local_path);
},
[&](const tailer::packet_offer_block &tob) {
printf("Got an offer: %s %lld - %lld\n", tob.tob_path.c_str(),
tob.tob_offset, tob.tob_length);
[&](const tailer::packet_offer_block &pob) {
printf("Got an offer: %s %lld - %lld\n", pob.pob_path.c_str(),
pob.pob_offset, pob.pob_length);
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(tob.tob_path)).relative_path();
ghc::filesystem::path(pob.pob_path)).relative_path();
auto local_path = tmppath / remote_path;
auto fd = auto_fd(open(local_path.c_str(), O_RDONLY));
@ -122,7 +141,7 @@ int main(int argc, char *const *argv)
printf("sending need block\n");
send_packet(to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return;
}
@ -133,25 +152,25 @@ int main(int argc, char *const *argv)
ghc::filesystem::remove_all(local_path);
send_packet(to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return;
}
auto_mem<char> buffer;
buffer = (char *) malloc(tob.tob_length);
auto bytes_read = pread(fd, buffer, tob.tob_length,
tob.tob_offset);
buffer = (char *) malloc(pob.pob_length);
auto bytes_read = pread(fd, buffer, pob.pob_length,
pob.pob_offset);
// fprintf(stderr, "debug: bytes_read %ld\n", bytes_read);
if (bytes_read == tob.tob_length) {
if (bytes_read == pob.pob_length) {
tailer::hash_frag thf;
calc_sha_256(thf.thf_hash, buffer, bytes_read);
if (thf == tob.tob_hash) {
if (thf == pob.pob_hash) {
send_packet(to_child.get(),
TPT_ACK_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return;
}
@ -160,14 +179,14 @@ int main(int argc, char *const *argv)
}
send_packet(to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
},
[&](const tailer::packet_tail_block &ttb) {
//printf("got a tail: %s %lld %ld\n", ttb.ttb_path.c_str(),
// ttb.ptb_offset, ttb.ttb_bits.size());
[&](const tailer::packet_tail_block &ptb) {
//printf("got a tail: %s %lld %ld\n", ptb.ptb_path.c_str(),
// ptb.ptb_offset, ptb.ptb_bits.size());
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(ttb.ttb_path)).relative_path();
ghc::filesystem::path(ptb.ptb_path)).relative_path();
auto local_path = tmppath / remote_path;
ghc::filesystem::create_directories(local_path.parent_path());
@ -178,15 +197,15 @@ int main(int argc, char *const *argv)
if (fd == -1) {
perror("open");
} else {
ftruncate(fd, ttb.ptb_offset);
pwrite(fd, ttb.ttb_bits.data(), ttb.ttb_bits.size(), ttb.ptb_offset);
ftruncate(fd, ptb.ptb_offset);
pwrite(fd, ptb.ptb_bits.data(), ptb.ptb_bits.size(), ptb.ptb_offset);
}
}
);
}
child.wait_for_child();
if (!child.was_normal_exit()) {
auto finished_child = std::move(child).wait_for_child();
if (!finished_child.was_normal_exit()) {
fprintf(stderr, "error: child exited abnormally\n");
}
}

View File

@ -48,6 +48,7 @@ typedef enum {
TPT_NEED_BLOCK,
TPT_ACK_BLOCK,
TPT_TAIL_BLOCK,
TPT_LOG,
} tailer_packet_type_t;
#ifdef __cplusplus

491
src/tailer/tailer.looper.cc Normal file
View File

@ -0,0 +1,491 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "base/lnav_log.hh"
#include "tailer.looper.hh"
#include "tailer.h"
#include "tailerpp.hh"
#include "lnav.hh"
#include "service_tags.hh"
#include "line_buffer.hh"
#include "tailerbin.h"
using namespace std::chrono_literals;
static const auto HOST_RETRY_DELAY = 1min;
static void read_err_pipe(const std::string &netloc, auto_fd &err,
std::vector<std::string> &eq)
{
line_buffer lb;
file_range pipe_range;
bool done = false;
lb.set_fd(err);
while (!done) {
auto load_res = lb.load_next_line(pipe_range);
if (load_res.isErr()) {
done = true;
} else {
auto li = load_res.unwrap();
pipe_range = li.li_file_range;
if (li.li_file_range.empty()) {
done = true;
} else {
lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) {
auto line_str = string_fragment(sbr.get_data(),
0,
sbr.length());
line_str.trim("\n");
if (eq.size() < 10) {
eq.template emplace_back(line_str.to_string());
}
log_debug("tailer(%s): %.*s",
netloc.c_str(),
line_str.length(),
line_str.data());
});
}
}
}
}
void tailer::looper::loop_body()
{
auto now = std::chrono::steady_clock::now();
for (auto& qpair : this->l_netlocs_to_paths) {
auto& netloc = qpair.first;
auto& rpq = qpair.second;
if (now < rpq.rpq_next_attempt_time) {
continue;
}
if (this->l_remotes.count(netloc) == 0) {
auto create_res = host_tailer::for_host(netloc);
if (create_res.isErr()) {
this->report_error(netloc, create_res.unwrapErr());
rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
continue;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc] = ht;
this->s_children.add_child_service(ht);
rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(),
rpq.rpq_existing_paths.end());
rpq.rpq_existing_paths.clear();
}
if (!rpq.rpq_new_paths.empty()) {
log_debug("%s: new paths to monitor -- %s",
netloc.c_str(),
rpq.rpq_new_paths.begin()->c_str());
this->l_remotes[netloc]->send(
[paths = rpq.rpq_new_paths](auto &ht) {
for (const auto &path : paths) {
log_debug(" adding path to tailer -- %s",
path.c_str());
ht.open_remote_path(path);
}
});
rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(),
rpq.rpq_new_paths.end());
rpq.rpq_new_paths.clear();
}
}
}
void tailer::looper::add_remote(std::string netloc, std::string path)
{
this->l_netlocs_to_paths[netloc].rpq_new_paths.insert(path);
}
Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
tailer::looper::host_tailer::for_host(const std::string& netloc)
{
log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
{
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
auto child = TRY(lnav::pid::from_fork());
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
err_pipe.after_fork(child.in());
if (child.in_child()) {
execlp("ssh", "ssh",
"-oStrictHostKeyChecking=no",
"-oBatchMode=yes",
netloc.c_str(),
"cat > tailer.bin && chmod ugo+rx ./tailer.bin",
nullptr);
exit(EXIT_FAILURE);
}
std::vector<std::string> error_queue;
log_debug("starting err reader");
std::thread err_reader([netloc, err = std::move(err_pipe.read_end()), &error_queue]() mutable {
read_err_pipe(netloc, err, error_queue);
});
log_debug("writing to child");
auto sf = tailer_bin[0].to_string_fragment();
ssize_t total_bytes = 0;
while (total_bytes < sf.length()) {
auto rc = write(in_pipe.write_end(), sf.data(), sf.length() - total_bytes);
if (rc < 0) {
break;
}
total_bytes += rc;
}
in_pipe.write_end().reset();
while (true) {
char buffer[1024];
auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer));
if (rc < 0) {
break;
}
if (rc == 0) {
break;
}
log_debug("tailer(%s): transfer output -- %.*s",
netloc.c_str(), rc, buffer);
}
auto finished_child = std::move(child).wait_for_child();
err_reader.join();
if (!finished_child.was_normal_exit() ||
finished_child.exit_status() != EXIT_SUCCESS) {
auto error_msg = error_queue.empty() ? "unknown" : error_queue.back();
return Err(fmt::format("failed to ssh to host: {}", error_msg));
}
}
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
auto child = TRY(lnav::pid::from_fork());
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
err_pipe.after_fork(child.in());
if (child.in_child()) {
execlp("ssh", "ssh",
// "-q",
"-oStrictHostKeyChecking=no",
"-oBatchMode=yes",
netloc.c_str(),
"./tailer.bin",
nullptr);
exit(EXIT_FAILURE);
}
return Ok(std::make_shared<host_tailer>(
netloc,
std::move(child),
std::move(in_pipe.write_end()),
std::move(out_pipe.read_end()),
std::move(err_pipe.read_end())
));
}
tailer::looper::host_tailer::host_tailer(const std::string &netloc,
auto_pid<process_state::RUNNING> child,
auto_fd to_child,
auto_fd from_child,
auto_fd err_from_child)
: isc::service<host_tailer>(netloc),
ht_netloc(netloc),
ht_local_path(ghc::filesystem::temp_directory_path() /
fmt::format("lnav-{}-remotes", getuid()) /
netloc),
ht_error_reader([
netloc, err = std::move(err_from_child), &eq = this->ht_error_queue]() mutable {
read_err_pipe(netloc, err, eq);
}),
ht_state(connected{
std::move(child),
std::move(to_child),
std::move(from_child),
})
{
}
void tailer::looper::host_tailer::open_remote_path(const std::string& path)
{
this->ht_state.match(
[&](connected& conn) {
conn.c_desired_paths.insert(path);
send_packet(conn.ht_to_child.get(),
TPT_OPEN_PATH,
TPPT_STRING, path.c_str(),
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("tailer(%s): disconnected from host, cannot tail: %s",
this->ht_netloc.c_str(),
path.c_str());
}
);
}
void tailer::looper::host_tailer::loop_body()
{
if (!this->ht_state.is<connected>()) {
return;
}
auto& conn = this->ht_state.get<connected>();
pollfd pfds[1];
pfds[0].fd = conn.ht_from_child.get();
pfds[0].events = POLLIN;
pfds[0].revents = 0;
auto ready_count = poll(pfds, 1, 100);
if (ready_count > 0) {
auto read_res = tailer::read_packet(conn.ht_from_child);
if (read_res.isErr()) {
log_error("read error: %s", read_res.unwrapErr().c_str());
exit(EXIT_FAILURE);
}
auto packet = read_res.unwrap();
this->ht_state = packet.match(
[&](const tailer::packet_eof &te) {
log_debug("tailer(%s): all done!", this->ht_netloc.c_str());
auto finished_child = std::move(conn).close();
if (finished_child.exit_status() != 0 &&
!this->ht_error_queue.empty()) {
report_error(this->ht_netloc,
this->ht_error_queue.back());
}
return state_v{disconnected()};
},
[&](const tailer::packet_log &pl) {
log_debug("tailer(%s): %s\n",
this->ht_netloc.c_str(), pl.pl_msg.c_str());
return std::move(this->ht_state);
},
[&](const tailer::packet_error &pe) {
log_debug("Got an error: %s -- %s", pe.pe_path.c_str(),
pe.pe_msg.c_str());
if (conn.c_desired_paths.count(pe.pe_path)) {
report_error(this->get_display_path(pe.pe_path), pe.pe_msg);
}
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pe.pe_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("removing %s", local_path.c_str());
this->ht_active_files.erase(local_path);
ghc::filesystem::remove_all(local_path);
return std::move(this->ht_state);
},
[&](const tailer::packet_offer_block &pob) {
log_debug("Got an offer: %s %lld - %lld", pob.pob_path.c_str(),
pob.pob_offset, pob.pob_length);
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pob.pob_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
auto fd = auto_fd(::open(local_path.c_str(), O_RDONLY));
if (this->ht_active_files.count(local_path) == 0) {
this->ht_active_files.insert(local_path);
auto custom_name = this->get_display_path(pob.pob_path);
isc::to<main_looper &, services::main_t>()
.send([local_path, custom_name](auto &mlooper) {
auto &active_fc = lnav_data.ld_active_files;
auto lpath_str = local_path.string();
if (active_fc.fc_file_names.count(lpath_str) > 0) {
log_debug("already in fc_file_names");
return;
}
if (active_fc.fc_closed_files.count(custom_name) > 0) {
log_debug("in closed");
return;
}
file_collection fc;
fc.fc_file_names[lpath_str]
.with_filename(custom_name)
.with_source(logfile_name_source::REMOTE);
update_active_files(fc);
});
}
if (fd == -1) {
log_debug("sending need block");
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
struct stat st;
if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
ghc::filesystem::remove_all(local_path);
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
auto_mem<char> buffer;
buffer = (char *) malloc(pob.pob_length);
auto bytes_read = pread(fd, buffer, pob.pob_length,
pob.pob_offset);
if (bytes_read == pob.pob_length) {
tailer::hash_frag thf;
calc_sha_256(thf.thf_hash, buffer, bytes_read);
if (thf == pob.pob_hash) {
send_packet(conn.ht_to_child.get(),
TPT_ACK_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
} else if (bytes_read == -1) {
ghc::filesystem::remove_all(local_path);
}
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
},
[&](const tailer::packet_tail_block &ptb) {
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(ptb.ptb_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("writing tail to: %s", local_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
auto fd = auto_fd(
::open(local_path.c_str(), O_WRONLY | O_APPEND | O_CREAT,
0600));
if (fd == -1) {
log_error("open: %s", strerror(errno));
} else {
ftruncate(fd, ptb.ptb_offset);
pwrite(fd,
ptb.ptb_bits.data(), ptb.ptb_bits.size(),
ptb.ptb_offset);
}
return std::move(this->ht_state);
}
);
if (this->ht_state.is<disconnected>()) {
this->s_looping = false;
}
}
}
std::chrono::milliseconds
tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
{
return 0s;
}
void tailer::looper::host_tailer::stopped()
{
if (!this->ht_state.is<disconnected>()) {
this->ht_state = disconnected();
}
if (this->ht_error_reader.joinable()) {
this->ht_error_reader.join();
}
}
std::string tailer::looper::host_tailer::get_display_path(const std::string& remote_path) const
{
return fmt::format("{}:{}", this->ht_netloc, remote_path);
}
void
tailer::looper::child_finished(std::shared_ptr<service_base> child)
{
auto child_tailer = std::static_pointer_cast<host_tailer>(child);
for (auto iter = this->l_remotes.begin();
iter != this->l_remotes.end();
++iter) {
if (iter->second != child_tailer) {
continue;
}
this->l_remotes.erase(iter);
return;
}
}
void tailer::looper::report_error(std::string path, std::string msg)
{
isc::to<main_looper&, services::main_t>()
.send([=](auto& mlooper) {
file_collection fc;
fc.fc_name_to_errors[path] = msg;
update_active_files(fc);
});
}

119
src/tailer/tailer.looper.hh Normal file
View File

@ -0,0 +1,119 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef lnav_tailer_looper_hh
#define lnav_tailer_looper_hh
#include <set>
#include "base/isc.hh"
#include "base/auto_pid.hh"
#include "auto_fd.hh"
#include "ghc/filesystem.hpp"
#include "mapbox/variant.hpp"
namespace tailer {
class looper : public isc::service<looper> {
public:
void add_remote(std::string netloc, std::string path);
protected:
void loop_body() override;
void child_finished(std::shared_ptr<service_base> child) override;
private:
class host_tailer : public isc::service<host_tailer> {
public:
static Result<std::shared_ptr<host_tailer>, std::string> for_host(
const std::string& netloc);
host_tailer(const std::string& netloc,
auto_pid<process_state::RUNNING> child,
auto_fd to_child, auto_fd from_child,
auto_fd err_from_child);
void open_remote_path(const std::string& path);
protected:
void loop_body() override;
void stopped() override;
std::chrono::milliseconds
compute_timeout(mstime_t current_time) const override;
private:
std::string get_display_path(const std::string& remote_path) const;
struct connected {
auto_pid<process_state::RUNNING> ht_child;
auto_fd ht_to_child;
auto_fd ht_from_child;
std::set<std::string> c_desired_paths;
auto_pid<process_state::FINISHED> close() && {
this->ht_to_child.reset();
this->ht_from_child.reset();
return std::move(this->ht_child).wait_for_child();
}
};
struct disconnected {};
using state_v = mapbox::util::variant<connected, disconnected>;
const std::string ht_netloc;
const ghc::filesystem::path ht_local_path;
std::set<ghc::filesystem::path> ht_active_files;
std::vector<std::string> ht_error_queue;
std::thread ht_error_reader;
state_v ht_state{disconnected()};
};
static void report_error(std::string path, std::string msg);
using attempt_time_point = std::chrono::time_point<std::chrono::steady_clock>;
struct remote_path_queue {
attempt_time_point rpq_next_attempt_time{std::chrono::steady_clock::now()};
std::set<std::string> rpq_new_paths;
std::set<std::string> rpq_existing_paths;
};
std::map<std::string, remote_path_queue> l_netlocs_to_paths;
std::map<std::string, std::shared_ptr<host_tailer>> l_remotes;
};
}
#endif

View File

@ -303,7 +303,7 @@ int poll_paths(struct list *path_list)
while (curr->cps_node.n_succ != NULL) {
struct stat st;
int rc = lstat(curr->cps_path, &st);
int rc = stat(curr->cps_path, &st);
if (rc == -1) {
memset(&st, 0, sizeof(st));
@ -374,6 +374,8 @@ int poll_paths(struct list *path_list)
break;
}
}
curr->cps_last_path_state = PS_OK;
} else if (S_ISDIR(st.st_mode)) {
DIR *dir = opendir(curr->cps_path);
@ -419,6 +421,8 @@ int poll_paths(struct list *path_list)
retval += poll_paths(&curr->cps_children);
}
curr->cps_last_path_state = PS_OK;
}
curr->cps_last_stat = st;
@ -440,6 +444,7 @@ int main(int argc, char *argv[])
pfds[0].fd = STDIN_FILENO;
pfds[0].events = POLLIN;
pfds[0].revents = 0;
int ready_count = poll(pfds, 1, timeout);
@ -462,10 +467,17 @@ int main(int argc, char *argv[])
fprintf(stderr, "error: invalid open packet\n");
done = 1;
} else if (type == TPT_OPEN_PATH) {
struct client_path_state *cps = create_client_path_state(path);
struct client_path_state *cps;
fprintf(stderr, "info: monitoring path: %s\n", path);
list_append(&client_path_list, &cps->cps_node);
cps = find_client_path_state(&client_path_list, path);
if (cps != NULL) {
fprintf(stderr, "warning: already monitoring -- %s\n", path);
} else {
cps = create_client_path_state(path);
fprintf(stderr, "info: monitoring path: %s\n", path);
list_append(&client_path_list, &cps->cps_node);
}
} else {
struct client_path_state *cps = find_client_path_state(&client_path_list, path);

View File

@ -68,29 +68,29 @@ Result<packet, std::string> read_packet(int fd)
}
switch (type) {
case TPT_ERROR: {
packet_error te;
packet_error pe;
TRY(read_payloads_into(fd, te.te_path, te.te_msg));
return Ok(packet{te});
TRY(read_payloads_into(fd, pe.pe_path, pe.pe_msg));
return Ok(packet{pe});
}
case TPT_OFFER_BLOCK: {
packet_offer_block tob;
packet_offer_block pob;
TRY(read_payloads_into(fd,
tob.tob_path,
tob.tob_offset,
tob.tob_length,
tob.tob_hash));
return Ok(packet{tob});
pob.pob_path,
pob.pob_offset,
pob.pob_length,
pob.pob_hash));
return Ok(packet{pob});
}
case TPT_TAIL_BLOCK: {
packet_tail_block ttb;
packet_tail_block ptb;
TRY(read_payloads_into(fd,
ttb.ttb_path,
ttb.ptb_offset,
ttb.ttb_bits));
return Ok(packet{ttb});
ptb.ptb_path,
ptb.ptb_offset,
ptb.ptb_bits));
return Ok(packet{ptb});
}
default:
assert(0);

View File

@ -46,8 +46,8 @@ struct packet_eof {
};
struct packet_error {
std::string te_path;
std::string te_msg;
std::string pe_path;
std::string pe_msg;
};
struct hash_frag {
@ -60,17 +60,21 @@ struct hash_frag {
}
};
struct packet_log {
std::string pl_msg;
};
struct packet_offer_block {
std::string tob_path;
int64_t tob_offset;
int64_t tob_length;
hash_frag tob_hash;
std::string pob_path;
int64_t pob_offset;
int64_t pob_length;
hash_frag pob_hash;
};
struct packet_tail_block {
std::string ttb_path;
std::string ptb_path;
int64_t ptb_offset;
std::vector<uint8_t> ttb_bits;
std::vector<uint8_t> ptb_bits;
};
using packet = mapbox::util::variant<

11
src/tools/Makefile.am Normal file
View File

@ -0,0 +1,11 @@
all-local: bin2c$(BUILD_EXEEXT)
bin2c$(BUILD_EXEEXT): bin2c.c
$(AM_V_CC) $(CC_FOR_BUILD) $(CPPFLAGS) $(LDFLAGS) -g3 -o $@ $? -lz
EXTRA_DIST = \
bin2c.c
CLEANFILES = \
bin2c$(BUILD_EXEEXT)