[remote] support for links

This commit is contained in:
Timothy Stack 2021-05-02 15:08:12 -07:00
parent 5aaea49b4b
commit ca5ec33241
6 changed files with 121 additions and 14 deletions

View File

@ -200,6 +200,9 @@ int main(int argc, char *const *argv)
ftruncate(fd, ptb.ptb_offset);
pwrite(fd, ptb.ptb_bits.data(), ptb.ptb_bits.size(), ptb.ptb_offset);
}
},
[&](const tailer::packet_link &pl) {
}
);
}

View File

@ -48,6 +48,7 @@ typedef enum {
TPT_NEED_BLOCK,
TPT_ACK_BLOCK,
TPT_TAIL_BLOCK,
TPT_LINK_BLOCK,
TPT_LOG,
} tailer_packet_type_t;

View File

@ -417,7 +417,9 @@ void tailer::looper::host_tailer::loop_body()
ghc::filesystem::path(ptb.ptb_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("writing tail to: %s", local_path.c_str());
log_debug("tailer(%s): writing tail to: %s",
this->ht_netloc.c_str(),
local_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
auto fd = auto_fd(
::open(local_path.c_str(), O_WRONLY | O_APPEND | O_CREAT,
@ -431,6 +433,34 @@ void tailer::looper::host_tailer::loop_body()
ptb.ptb_bits.data(), ptb.ptb_bits.size(),
ptb.ptb_offset);
}
return std::move(this->ht_state);
},
[&](const tailer::packet_link &pl) {
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pl.pl_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
auto remote_link_path = ghc::filesystem::path(pl.pl_link_value);
std::string link_path;
if (remote_path.is_absolute()) {
auto local_link_path =
this->ht_local_path / remote_link_path.relative_path();
link_path = local_link_path.string();
} else {
link_path = remote_link_path.string();
}
log_debug("tailer(%s): symlinking %s -> %s",
this->ht_netloc.c_str(),
local_path.c_str(),
link_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
ghc::filesystem::remove_all(local_path);
if (symlink(link_path.c_str(), local_path.c_str()) < 0) {
log_error("symlink failed: %s", strerror(errno));
}
return std::move(this->ht_state);
}
);

View File

@ -110,6 +110,7 @@ typedef enum {
CS_INIT,
CS_OFFERED,
CS_TAILING,
CS_SYNCED,
} client_state_t;
typedef enum {
@ -143,15 +144,22 @@ struct client_path_state *create_client_path_state(const char *path)
return retval;
}
void delete_client_path_state(struct client_path_state *cps)
{
free(cps->cps_path);
while (cps->cps_children.l_head->n_succ != NULL) {
struct client_path_state *child_cps = (struct client_path_state *) cps->cps_children.l_head;
void delete_client_path_state(struct client_path_state *cps);
void delete_client_path_list(struct list *l)
{
struct client_path_state *child_cps;
while ((child_cps = (struct client_path_state *) list_remove_head(l)) != NULL) {
list_remove(&child_cps->cps_node);
delete_client_path_state(child_cps);
}
}
void delete_client_path_state(struct client_path_state *cps)
{
free(cps->cps_path);
delete_client_path_list(&cps->cps_children);
free(cps);
}
@ -200,6 +208,7 @@ void set_client_path_state_error(struct client_path_state *cps)
cps->cps_last_path_state = PS_ERROR;
cps->cps_client_file_offset = -1;
cps->cps_client_state = CS_INIT;
delete_client_path_list(&cps->cps_children);
}
static int readall(int sock, void *buf, size_t len)
@ -281,13 +290,12 @@ struct client_path_state *find_client_path_state(struct list *path_list, const c
if (strcmp(curr->cps_path, path) == 0) {
return curr;
}
if (strncmp(curr->cps_path, path, strlen(curr->cps_path)) == 0) {
struct client_path_state *child =
find_client_path_state(&curr->cps_children, path);
if (child != NULL) {
return child;
}
struct client_path_state *child =
find_client_path_state(&curr->cps_children, path);
if (child != NULL) {
return child;
}
curr = (struct client_path_state *) curr->cps_node.n_succ;
@ -303,7 +311,7 @@ int poll_paths(struct list *path_list)
while (curr->cps_node.n_succ != NULL) {
struct stat st;
int rc = stat(curr->cps_path, &st);
int rc = lstat(curr->cps_path, &st);
if (rc == -1) {
memset(&st, 0, sizeof(st));
@ -314,6 +322,51 @@ int poll_paths(struct list *path_list)
(st.st_size < curr->cps_last_stat.st_size))) {
send_error(curr, "replaced");
set_client_path_state_error(curr);
} else if (S_ISLNK(st.st_mode)) {
switch (curr->cps_client_state) {
case CS_INIT: {
char buffer[PATH_MAX];
char *target_path;
ssize_t link_len;
link_len = readlink(curr->cps_path, buffer, sizeof(buffer));
target_path = realpath(curr->cps_path, NULL);
if (link_len < 0 || target_path == NULL) {
set_client_path_state_error(curr);
} else {
buffer[link_len] = '\0';
send_packet(STDOUT_FILENO,
TPT_LINK_BLOCK,
TPPT_STRING, curr->cps_path,
TPPT_STRING, buffer,
TPPT_DONE);
curr->cps_client_state = CS_SYNCED;
struct client_path_state *child =
create_client_path_state(target_path);
fprintf(stderr, "info: monitoring link path %s\n",
target_path);
list_append(&curr->cps_children, &child->cps_node);
retval += 1;
}
free(target_path);
break;
}
case CS_SYNCED:
break;
case CS_OFFERED:
case CS_TAILING:
fprintf(stderr,
"internal-error: unexpected state for path -- %s\n",
curr->cps_path);
break;
}
retval += poll_paths(&curr->cps_children);
curr->cps_last_path_state = PS_OK;
} else if (S_ISREG(st.st_mode)) {
switch (curr->cps_client_state) {
case CS_INIT:
@ -373,6 +426,11 @@ int poll_paths(struct list *path_list)
// Still waiting for the client ack
break;
}
case CS_SYNCED: {
fprintf(stderr, "internal-error: got synced for %s\n",
curr->cps_path);
break;
}
}
curr->cps_last_path_state = PS_OK;
@ -430,6 +488,8 @@ int poll_paths(struct list *path_list)
curr = (struct client_path_state *) curr->cps_node.n_succ;
}
fflush(stderr);
return retval;
}

View File

@ -92,6 +92,14 @@ Result<packet, std::string> read_packet(int fd)
ptb.ptb_bits));
return Ok(packet{ptb});
}
case TPT_LINK_BLOCK: {
packet_link pl;
TRY(read_payloads_into(fd,
pl.pl_path,
pl.pl_link_value));
return Ok(packet{pl});
}
default:
assert(0);
break;

View File

@ -77,8 +77,13 @@ struct packet_tail_block {
std::vector<uint8_t> ptb_bits;
};
struct packet_link {
std::string pl_path;
std::string pl_link_value;
};
using packet = mapbox::util::variant<
packet_eof, packet_error, packet_offer_block, packet_tail_block>;
packet_eof, packet_error, packet_offer_block, packet_tail_block, packet_link>;
int readall(int sock, void *buf, size_t len);