use std::{ borrow::Cow, collections::HashMap, env::var, ffi::{OsStr, OsString}, fs::File, io::{IsTerminal, Write}, process::Stdio, sync::{ atomic::{AtomicBool, AtomicU8, Ordering}, Arc, }, time::Duration, }; use clearscreen::ClearScreen; use miette::{miette, IntoDiagnostic, Report, Result}; use notify_rust::Notification; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; use tokio::{process::Command as TokioCommand, time::sleep}; use tracing::{debug, debug_span, error, instrument, trace, trace_span, Instrument}; use watchexec::{ action::ActionHandler, command::{Command, Program, Shell, SpawnOptions}, error::RuntimeError, job::{CommandState, Job}, sources::fs::Watcher, Config, ErrorHook, Id, }; use watchexec_events::{Event, Keyboard, ProcessEnd, Tag}; use watchexec_signals::Signal; use crate::{ args::{Args, ClearMode, ColourMode, EmitEvents, OnBusyUpdate, SignalMapping, WrapMode}, state::RotatingTempFile, }; use crate::{emits::events_to_simple_format, state::State}; #[derive(Clone, Copy, Debug)] struct OutputFlags { quiet: bool, colour: ColorChoice, timings: bool, bell: bool, toast: bool, } pub fn make_config(args: &Args, state: &State) -> Result { let _span = debug_span!("args-runtime").entered(); let config = Config::default(); config.on_error(|err: ErrorHook| { if let RuntimeError::IoError { about: "waiting on process group", .. } = err.error { // "No child processes" and such // these are often spurious, so condemn them to -v only error!("{}", err.error); return; } if cfg!(debug_assertions) { eprintln!("[[{:?}]]", err.error); } eprintln!("[[Error (not fatal)]]\n{}", Report::new(err.error)); }); config.pathset(args.paths.clone()); config.throttle(args.debounce.0); config.keyboard_events(args.stdin_quit); if let Some(interval) = args.poll { config.file_watcher(Watcher::Poll(interval.0)); } let once = args.once; let clear = args.screen_clear; let emit_events_to = args.emit_events_to; let emit_file = state.emit_file.clone(); if args.only_emit_events { config.on_action(move |mut action| { // if we got a terminate or interrupt signal, quit if action .signals() .any(|sig| sig == Signal::Terminate || sig == Signal::Interrupt) { // no need to be graceful as there's no commands action.quit(); return action; } // clear the screen before printing events if let Some(mode) = clear { match mode { ClearMode::Clear => { clearscreen::clear().ok(); } ClearMode::Reset => { reset_screen(); } } } match emit_events_to { EmitEvents::Stdio => { println!( "{}", events_to_simple_format(action.events.as_ref()).unwrap_or_default() ); } EmitEvents::JsonStdio => { for event in action.events.iter().filter(|e| !e.is_empty()) { println!("{}", serde_json::to_string(event).unwrap_or_default()); } } other => unreachable!( "emit_events_to should have been validated earlier: {:?}", other ), } action }); return Ok(config); } let delay_run = args.delay_run.map(|ts| ts.0); let on_busy = args.on_busy_update; let stdin_quit = args.stdin_quit; let signal = args.signal; let stop_signal = args.stop_signal; let stop_timeout = args.stop_timeout.0; let print_events = args.print_events; let outflags = OutputFlags { quiet: args.quiet, colour: match args.color { ColourMode::Auto if !std::io::stdin().is_terminal() => ColorChoice::Never, ColourMode::Auto => ColorChoice::Auto, ColourMode::Always => ColorChoice::Always, ColourMode::Never => ColorChoice::Never, }, timings: args.timings, bell: args.bell, toast: args.notify, }; let workdir = Arc::new(args.workdir.clone()); let mut add_envs = HashMap::new(); for pair in &args.env { if let Some((k, v)) = pair.split_once('=') { add_envs.insert(k.to_owned(), OsString::from(v)); } else { return Err(miette!("{pair} is not in key=value format")); } } debug!( ?add_envs, "additional environment variables to add to command" ); let id = Id::default(); let command = interpret_command_args(args)?; let signal_map: Arc>> = Arc::new( args.signal_map .iter() .copied() .map(|SignalMapping { from, to }| (from, to)) .collect(), ); let queued = Arc::new(AtomicBool::new(false)); let quit_again = Arc::new(AtomicU8::new(0)); config.on_action_async(move |mut action| { let add_envs = add_envs.clone(); let command = command.clone(); let emit_file = emit_file.clone(); let queued = queued.clone(); let quit_again = quit_again.clone(); let signal_map = signal_map.clone(); let workdir = workdir.clone(); Box::new( async move { trace!(events=?action.events, "handling action"); let add_envs = add_envs.clone(); let command = command.clone(); let emit_file = emit_file.clone(); let queued = queued.clone(); let quit_again = quit_again.clone(); let signal_map = signal_map.clone(); let workdir = workdir.clone(); trace!("set spawn hook for workdir and environment variables"); let job = action.get_or_create_job(id, move || command.clone()); let events = action.events.clone(); job.set_spawn_hook(move |command, _| { let add_envs = add_envs.clone(); let emit_file = emit_file.clone(); let events = events.clone(); if let Some(ref workdir) = workdir.as_ref() { debug!(?workdir, "set command workdir"); command.command_mut().current_dir(workdir); } emit_events_to_command( command.command_mut(), events, emit_file, emit_events_to, add_envs, ); }); let show_events = { let events = action.events.clone(); move || { if print_events { trace!("print events to stderr"); for (n, event) in events.iter().enumerate() { eprintln!("[EVENT {n}] {event}"); } } } }; let clear_screen = { let events = action.events.clone(); move || { if let Some(mode) = clear { match mode { ClearMode::Clear => { clearscreen::clear().ok(); debug!("cleared screen"); } ClearMode::Reset => { reset_screen(); debug!("hard-reset screen"); } } } // re-show events after clearing if print_events { trace!("print events to stderr"); for (n, event) in events.iter().enumerate() { eprintln!("[EVENT {n}] {event}"); } } } }; let quit = |mut action: ActionHandler| { match quit_again.fetch_add(1, Ordering::Relaxed) { 0 => { eprintln!("[Waiting {stop_timeout:?} for processes to exit before stopping...]"); // eprintln!("[Waiting {stop_timeout:?} for processes to exit before stopping... Ctrl-C again to exit faster]"); // see TODO in action/worker.rs action.quit_gracefully( stop_signal.unwrap_or(Signal::Terminate), stop_timeout, ); } 1 => { action.quit_gracefully(Signal::ForceStop, Duration::ZERO); } _ => { action.quit(); } } action }; if once { debug!("debug mode: run once and quit"); show_events(); if let Some(delay) = delay_run { job.run_async(move |_| { Box::new(async move { sleep(delay).await; }) }); } // this blocks the event loop, but also this is a debug feature so i don't care job.start().await; job.to_wait().await; return quit(action); } let is_keyboard_eof = action .events .iter() .any(|e| e.tags.contains(&Tag::Keyboard(Keyboard::Eof))); if stdin_quit && is_keyboard_eof { debug!("keyboard EOF, quit"); show_events(); return quit(action); } let signals: Vec = action.signals().collect(); trace!(?signals, "received some signals"); // if we got a terminate or interrupt signal and they're not mapped, quit if (signals.contains(&Signal::Terminate) && !signal_map.contains_key(&Signal::Terminate)) || (signals.contains(&Signal::Interrupt) && !signal_map.contains_key(&Signal::Interrupt)) { debug!("unmapped terminate or interrupt signal, quit"); show_events(); return quit(action); } // pass all other signals on for signal in signals { match signal_map.get(&signal) { Some(Some(mapped)) => { debug!(?signal, ?mapped, "passing mapped signal"); job.signal(*mapped); } Some(None) => { debug!(?signal, "discarding signal"); } None => { debug!(?signal, "passing signal on"); job.signal(signal); } } } // only filesystem events below here (or empty synthetic events) if action.paths().next().is_none() && !action.events.iter().any(|e| e.is_empty()) { debug!("no filesystem or synthetic events, skip without doing more"); show_events(); return action; } show_events(); if let Some(delay) = delay_run { trace!("delaying run by sleeping inside the job"); job.run_async(move |_| { Box::new(async move { sleep(delay).await; }) }); } trace!("querying job state via run_async"); job.run_async({ let job = job.clone(); move |context| { let job = job.clone(); let is_running = matches!(context.current, CommandState::Running { .. }); Box::new(async move { let innerjob = job.clone(); if is_running { trace!(?on_busy, "job is running, decide what to do"); match on_busy { OnBusyUpdate::DoNothing => {} OnBusyUpdate::Signal => { job.signal(if cfg!(windows) { Signal::ForceStop } else { stop_signal.or(signal).unwrap_or(Signal::Terminate) }); } OnBusyUpdate::Restart if cfg!(windows) => { job.restart(); job.run(move |context| { clear_screen(); setup_process( innerjob.clone(), context.command.clone(), outflags, ) }); } OnBusyUpdate::Restart => { job.restart_with_signal( stop_signal.unwrap_or(Signal::Terminate), stop_timeout, ); job.run(move |context| { clear_screen(); setup_process( innerjob.clone(), context.command.clone(), outflags, ) }); } OnBusyUpdate::Queue => { let job = job.clone(); let already_queued = queued.fetch_or(true, Ordering::SeqCst); if already_queued { debug!("next start is already queued, do nothing"); } else { debug!("queueing next start of job"); tokio::spawn({ let queued = queued.clone(); async move { trace!("waiting for job to finish"); job.to_wait().await; trace!("job finished, starting queued"); job.start(); job.run(move |context| { clear_screen(); setup_process( innerjob.clone(), context.command.clone(), outflags, ) }) .await; trace!("resetting queued state"); queued.store(false, Ordering::SeqCst); } }); } } } } else { trace!("job is not running, start it"); job.start(); job.run(move |context| { clear_screen(); setup_process( innerjob.clone(), context.command.clone(), outflags, ) }); } }) } }); action } .instrument(trace_span!("action handler")), ) }); Ok(config) } #[instrument(level = "debug")] fn interpret_command_args(args: &Args) -> Result> { let mut cmd = args.command.clone(); if cmd.is_empty() { panic!("(clap) Bug: command is not present"); } let shell = if args.no_shell { None } else { let shell = args.shell.clone().or_else(|| var("SHELL").ok()); match shell .as_deref() .or_else(|| { if cfg!(not(windows)) { Some("sh") } else if var("POWERSHELL_DISTRIBUTION_CHANNEL").is_ok() && (which::which("pwsh").is_ok() || which::which("pwsh.exe").is_ok()) { trace!("detected pwsh"); Some("pwsh") } else if var("PSModulePath").is_ok() && (which::which("powershell").is_ok() || which::which("powershell.exe").is_ok()) { trace!("detected powershell"); Some("powershell") } else { Some("cmd") } }) .or(Some("default")) { Some("") => return Err(RuntimeError::CommandShellEmptyShell).into_diagnostic(), Some("none") | None => None, #[cfg(windows)] Some("cmd") | Some("cmd.exe") | Some("CMD") | Some("CMD.EXE") => Some(Shell::cmd()), Some(other) => { let sh = other.split_ascii_whitespace().collect::>(); // UNWRAP: checked by Some("") #[allow(clippy::unwrap_used)] let (shprog, shopts) = sh.split_first().unwrap(); Some(Shell { prog: shprog.into(), options: shopts.iter().map(|s| (*s).to_string()).collect(), program_option: Some(Cow::Borrowed(OsStr::new("-c"))), }) } } }; let program = if let Some(shell) = shell { Program::Shell { shell, command: cmd.join(" "), args: Vec::new(), } } else { Program::Exec { prog: cmd.remove(0).into(), args: cmd, } }; Ok(Arc::new(Command { program, options: SpawnOptions { grouped: matches!(args.wrap_process, WrapMode::Group), session: matches!(args.wrap_process, WrapMode::Session), ..Default::default() }, })) } #[instrument(level = "trace")] fn setup_process(job: Job, command: Arc, outflags: OutputFlags) { if outflags.toast { Notification::new() .summary("Watchexec: change detected") .body(&format!("Running {command}")) .show() .map_or_else( |err| { eprintln!("[[Failed to send desktop notification: {err}]]"); }, drop, ); } if !outflags.quiet { let mut stderr = StandardStream::stderr(outflags.colour); stderr.reset().ok(); stderr .set_color(ColorSpec::new().set_fg(Some(Color::Green))) .ok(); writeln!(&mut stderr, "[Running: {command}]").ok(); stderr.reset().ok(); } tokio::spawn(async move { job.to_wait().await; job.run(move |context| end_of_process(context.current, outflags)); }); } #[instrument(level = "trace")] fn end_of_process(state: &CommandState, outflags: OutputFlags) { let CommandState::Finished { status, started, finished, } = state else { return; }; let duration = *finished - *started; let timing = if outflags.timings { format!(", lasted {duration:?}") } else { String::new() }; let (msg, fg) = match status { ProcessEnd::ExitError(code) => (format!("Command exited with {code}{timing}"), Color::Red), ProcessEnd::ExitSignal(sig) => { (format!("Command killed by {sig:?}{timing}"), Color::Magenta) } ProcessEnd::ExitStop(sig) => (format!("Command stopped by {sig:?}{timing}"), Color::Blue), ProcessEnd::Continued => (format!("Command continued{timing}"), Color::Cyan), ProcessEnd::Exception(ex) => ( format!("Command ended by exception {ex:#x}{timing}"), Color::Yellow, ), ProcessEnd::Success => (format!("Command was successful{timing}"), Color::Green), }; if outflags.toast { Notification::new() .summary("Watchexec: command ended") .body(&msg) .show() .map_or_else( |err| { eprintln!("[[Failed to send desktop notification: {err}]]"); }, drop, ); } if !outflags.quiet { let mut stderr = StandardStream::stderr(outflags.colour); stderr.reset().ok(); stderr.set_color(ColorSpec::new().set_fg(Some(fg))).ok(); writeln!(&mut stderr, "[{msg}]").ok(); stderr.reset().ok(); } if outflags.bell { let mut stdout = std::io::stdout(); stdout.write_all(b"\x07").ok(); stdout.flush().ok(); } } #[instrument(level = "trace")] fn emit_events_to_command( command: &mut TokioCommand, events: Arc<[Event]>, emit_file: RotatingTempFile, emit_events_to: EmitEvents, mut add_envs: HashMap, ) { use crate::emits::*; let mut stdin = None; match emit_events_to { EmitEvents::Environment => { add_envs.extend(emits_to_environment(&events)); } EmitEvents::Stdio => match emits_to_file(&emit_file, &events) .and_then(|path| File::open(path).into_diagnostic()) { Ok(file) => { stdin.replace(Stdio::from(file)); } Err(err) => { error!("Failed to write events to stdin, continuing without it: {err}"); } }, EmitEvents::File => match emits_to_file(&emit_file, &events) { Ok(path) => { add_envs.insert("WATCHEXEC_EVENTS_FILE".into(), path.into()); } Err(err) => { error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}"); } }, EmitEvents::JsonStdio => match emits_to_json_file(&emit_file, &events) .and_then(|path| File::open(path).into_diagnostic()) { Ok(file) => { stdin.replace(Stdio::from(file)); } Err(err) => { error!("Failed to write events to stdin, continuing without it: {err}"); } }, EmitEvents::JsonFile => match emits_to_json_file(&emit_file, &events) { Ok(path) => { add_envs.insert("WATCHEXEC_EVENTS_FILE".into(), path.into()); } Err(err) => { error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}"); } }, EmitEvents::None => {} } for (k, v) in add_envs { debug!(?k, ?v, "inserting environment variable"); command.env(k, v); } if let Some(stdin) = stdin { debug!("set command stdin"); command.stdin(stdin); } } pub(crate) fn reset_screen() { for cs in [ ClearScreen::WindowsCooked, ClearScreen::WindowsVt, ClearScreen::VtLeaveAlt, ClearScreen::VtWellDone, ClearScreen::default(), ] { cs.clear().ok(); } }