use kqueue_sys::{kevent, kqueue};
use libc::{pid_t, uintptr_t};
use std::convert::{AsRef, Into, TryFrom, TryInto};
use std::default::Default;
use std::fs::File;
use std::io::{self, Error, Result};
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use std::path::Path;
use std::ptr;
use std::time::Duration;
pub use kqueue_sys::constants::*;
mod os;
use crate::os::vnode;
mod time;
use crate::time::duration_to_timespec;
#[derive(Debug, Eq, Clone)]
pub enum Ident {
Filename(RawFd, String),
Fd(RawFd),
Pid(pid_t),
Signal(i32),
Timer(i32),
}
#[doc(hidden)]
#[derive(Debug, PartialEq, Clone)]
pub struct Watched {
filter: EventFilter,
flags: FilterFlag,
ident: Ident,
}
#[derive(Debug)]
pub struct Watcher {
watched: Vec<Watched>,
queue: RawFd,
started: bool,
opts: KqueueOpts,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum Vnode {
Delete,
Write,
Extend,
Truncate,
Attrib,
Link,
Rename,
Revoke,
Open,
CloseWrite,
Close,
}
#[derive(Debug)]
pub enum Proc {
Exit(usize),
Fork,
Exec,
Track(libc::pid_t),
Trackerr,
Child(libc::pid_t),
}
#[derive(Debug)]
pub enum EventData {
Vnode(Vnode),
Proc(Proc),
ReadReady(usize),
WriteReady(usize),
Signal(usize),
Timer(usize),
Error(Error),
}
#[derive(Debug)]
pub struct Event {
pub ident: Ident,
pub data: EventData,
}
pub struct EventIter<'a> {
watcher: &'a Watcher,
}
#[derive(Debug)]
pub struct KqueueOpts {
clear: bool,
}
impl Default for KqueueOpts {
fn default() -> KqueueOpts {
KqueueOpts { clear: true }
}
}
#[allow(clippy::from_over_into)]
impl Into<usize> for Ident {
fn into(self) -> usize {
match self {
Ident::Filename(fd, _) => fd as usize,
Ident::Fd(fd) => fd as usize,
Ident::Pid(pid) => pid as usize,
Ident::Signal(sig) => sig as usize,
Ident::Timer(timer) => timer as usize,
}
}
}
impl PartialEq<Ident> for Ident {
fn eq(&self, other: &Ident) -> bool {
match *self {
Ident::Filename(_, ref name) => {
if let Ident::Filename(_, ref othername) = *other {
name == othername
} else {
false
}
}
_ => self.as_usize() == other.as_usize(),
}
}
}
impl Ident {
fn as_usize(&self) -> usize {
match *self {
Ident::Filename(fd, _) => fd as usize,
Ident::Fd(fd) => fd as usize,
Ident::Pid(pid) => pid as usize,
Ident::Signal(sig) => sig as usize,
Ident::Timer(timer) => timer as usize,
}
}
}
impl Watcher {
pub fn new() -> Result<Watcher> {
let queue = unsafe { kqueue() };
if queue == -1 {
Err(Error::last_os_error())
} else {
Ok(Watcher {
watched: Vec::new(),
queue,
started: false,
opts: Default::default(),
})
}
}
pub fn disable_clears(&mut self) -> &mut Self {
self.opts.clear = false;
self
}
pub fn add_pid(
&mut self,
pid: libc::pid_t,
filter: EventFilter,
flags: FilterFlag,
) -> Result<()> {
let watch = Watched {
filter,
flags,
ident: Ident::Pid(pid),
};
if !self.watched.contains(&watch) {
self.watched.push(watch);
}
Ok(())
}
pub fn add_filename<P: AsRef<Path>>(
&mut self,
filename: P,
filter: EventFilter,
flags: FilterFlag,
) -> Result<()> {
let file = File::open(filename.as_ref())?;
let watch = Watched {
filter,
flags,
ident: Ident::Filename(
file.into_raw_fd(),
filename.as_ref().to_string_lossy().into_owned(),
),
};
if !self.watched.contains(&watch) {
self.watched.push(watch);
}
Ok(())
}
pub fn add_fd(&mut self, fd: RawFd, filter: EventFilter, flags: FilterFlag) -> Result<()> {
let watch = Watched {
filter,
flags,
ident: Ident::Fd(fd),
};
if !self.watched.contains(&watch) {
self.watched.push(watch);
}
Ok(())
}
pub fn add_file(&mut self, file: &File, filter: EventFilter, flags: FilterFlag) -> Result<()> {
self.add_fd(file.as_raw_fd(), filter, flags)
}
fn delete_kevents(&self, ident: Ident, filter: EventFilter) -> Result<()> {
let kev = vec![kevent::new(
ident.as_usize(),
filter,
EventFlag::EV_DELETE,
FilterFlag::empty(),
)];
let ret = unsafe {
kevent(
self.queue,
kev.as_ptr(),
#[allow(clippy::useless_conversion)]
i32::try_from(kev.len()).unwrap().try_into().unwrap(),
ptr::null_mut(),
0,
ptr::null(),
)
};
match ret {
-1 => Err(Error::last_os_error()),
_ => Ok(()),
}
}
pub fn remove_pid(&mut self, pid: libc::pid_t, filter: EventFilter) -> Result<()> {
let new_watched = self
.watched
.drain(..)
.filter(|x| {
if let Ident::Pid(iterpid) = x.ident {
iterpid != pid
} else {
true
}
})
.collect();
self.watched = new_watched;
self.delete_kevents(Ident::Pid(pid), filter)
}
pub fn remove_filename<P: AsRef<Path>>(
&mut self,
filename: P,
filter: EventFilter,
) -> Result<()> {
let mut fd: RawFd = 0;
let new_watched = self
.watched
.drain(..)
.filter(|x| {
if let Ident::Filename(iterfd, ref iterfile) = x.ident {
if iterfile == filename.as_ref().to_str().unwrap() {
fd = iterfd;
false
} else {
true
}
} else {
true
}
})
.collect();
self.watched = new_watched;
self.delete_kevents(Ident::Fd(fd), filter)
}
pub fn remove_fd(&mut self, fd: RawFd, filter: EventFilter) -> Result<()> {
let new_watched = self
.watched
.drain(..)
.filter(|x| {
if let Ident::Fd(iterfd) = x.ident {
iterfd != fd
} else {
true
}
})
.collect();
self.watched = new_watched;
self.delete_kevents(Ident::Fd(fd), filter)
}
pub fn remove_file(&mut self, file: &File, filter: EventFilter) -> Result<()> {
self.remove_fd(file.as_raw_fd(), filter)
}
pub fn watch(&mut self) -> Result<()> {
let mut kevs: Vec<kevent> = Vec::new();
for watched in &self.watched {
let raw_ident = match watched.ident {
Ident::Fd(fd) => fd as uintptr_t,
Ident::Filename(fd, _) => fd as uintptr_t,
Ident::Pid(pid) => pid as uintptr_t,
Ident::Signal(sig) => sig as uintptr_t,
Ident::Timer(ident) => ident as uintptr_t,
};
kevs.push(kevent::new(
raw_ident,
watched.filter,
if self.opts.clear {
EventFlag::EV_ADD | EventFlag::EV_CLEAR
} else {
EventFlag::EV_ADD
},
watched.flags,
));
}
let ret = unsafe {
kevent(
self.queue,
kevs.as_ptr(),
#[allow(clippy::useless_conversion)]
i32::try_from(kevs.len()).unwrap().try_into().unwrap(),
ptr::null_mut(),
0,
ptr::null(),
)
};
self.started = true;
match ret {
-1 => Err(Error::last_os_error()),
_ => Ok(()),
}
}
pub fn poll(&self, timeout: Option<Duration>) -> Option<Event> {
match timeout {
Some(timeout) => get_event(self, Some(timeout)),
None => get_event(self, Some(Duration::new(0, 0))),
}
}
pub fn poll_forever(&self, timeout: Option<Duration>) -> Option<Event> {
if timeout.is_some() {
self.poll(timeout)
} else {
get_event(self, None)
}
}
pub fn iter(&self) -> EventIter {
EventIter { watcher: self }
}
}
impl AsRawFd for Watcher {
fn as_raw_fd(&self) -> RawFd {
self.queue
}
}
impl Drop for Watcher {
fn drop(&mut self) {
unsafe { libc::close(self.queue) };
for watched in &self.watched {
match watched.ident {
Ident::Fd(fd) => unsafe { libc::close(fd) },
Ident::Filename(fd, _) => unsafe { libc::close(fd) },
_ => continue,
};
}
}
}
fn find_file_ident(watcher: &Watcher, fd: RawFd) -> Option<Ident> {
for watched in &watcher.watched {
match watched.ident.clone() {
Ident::Fd(ident_fd) => {
if fd == ident_fd {
return Some(Ident::Fd(fd));
} else {
continue;
}
}
Ident::Filename(ident_fd, ident_str) => {
if fd == ident_fd {
return Some(Ident::Filename(ident_fd, ident_str));
} else {
continue;
}
}
_ => continue,
}
}
None
}
fn get_event(watcher: &Watcher, timeout: Option<Duration>) -> Option<Event> {
let mut kev = kevent::new(
0,
EventFilter::EVFILT_SYSCOUNT,
EventFlag::empty(),
FilterFlag::empty(),
);
let ret = if let Some(ts) = timeout {
unsafe {
kevent(
watcher.queue,
ptr::null(),
0,
&mut kev,
1,
&duration_to_timespec(ts),
)
}
} else {
unsafe { kevent(watcher.queue, ptr::null(), 0, &mut kev, 1, ptr::null()) }
};
match ret {
-1 => Some(Event::from_error(kev, watcher)),
0 => None, _ => Some(Event::new(kev, watcher)),
}
}
impl Event {
#[doc(hidden)]
pub fn new(ev: kevent, watcher: &Watcher) -> Event {
let data = match ev.filter {
EventFilter::EVFILT_READ => EventData::ReadReady(ev.data as usize),
EventFilter::EVFILT_WRITE => EventData::WriteReady(ev.data as usize),
EventFilter::EVFILT_SIGNAL => EventData::Signal(ev.data as usize),
EventFilter::EVFILT_TIMER => EventData::Timer(ev.data as usize),
EventFilter::EVFILT_PROC => {
let inner = if ev.fflags.contains(FilterFlag::NOTE_EXIT) {
Proc::Exit(ev.data as usize)
} else if ev.fflags.contains(FilterFlag::NOTE_FORK) {
Proc::Fork
} else if ev.fflags.contains(FilterFlag::NOTE_EXEC) {
Proc::Exec
} else if ev.fflags.contains(FilterFlag::NOTE_TRACK) {
Proc::Track(ev.data as libc::pid_t)
} else if ev.fflags.contains(FilterFlag::NOTE_CHILD) {
Proc::Child(ev.data as libc::pid_t)
} else {
panic!("not supported: {:?}", ev.fflags)
};
EventData::Proc(inner)
}
EventFilter::EVFILT_VNODE => {
let inner = if ev.fflags.contains(FilterFlag::NOTE_DELETE) {
Vnode::Delete
} else if ev.fflags.contains(FilterFlag::NOTE_WRITE) {
Vnode::Write
} else if ev.fflags.contains(FilterFlag::NOTE_EXTEND) {
Vnode::Extend
} else if ev.fflags.contains(FilterFlag::NOTE_ATTRIB) {
Vnode::Attrib
} else if ev.fflags.contains(FilterFlag::NOTE_LINK) {
Vnode::Link
} else if ev.fflags.contains(FilterFlag::NOTE_RENAME) {
Vnode::Rename
} else if ev.fflags.contains(FilterFlag::NOTE_REVOKE) {
Vnode::Revoke
} else {
vnode::handle_vnode_extras(ev.fflags)
};
EventData::Vnode(inner)
}
_ => panic!("not supported"),
};
let ident = match ev.filter {
EventFilter::EVFILT_READ => find_file_ident(watcher, ev.ident as RawFd).unwrap(),
EventFilter::EVFILT_WRITE => find_file_ident(watcher, ev.ident as RawFd).unwrap(),
EventFilter::EVFILT_VNODE => find_file_ident(watcher, ev.ident as RawFd).unwrap(),
EventFilter::EVFILT_SIGNAL => Ident::Signal(ev.ident as i32),
EventFilter::EVFILT_TIMER => Ident::Timer(ev.ident as i32),
EventFilter::EVFILT_PROC => Ident::Pid(ev.ident as pid_t),
_ => panic!("not supported"),
};
Event { ident, data }
}
#[doc(hidden)]
pub fn from_error(ev: kevent, watcher: &Watcher) -> Event {
let ident = match ev.filter {
EventFilter::EVFILT_READ => find_file_ident(watcher, ev.ident as RawFd).unwrap(),
EventFilter::EVFILT_WRITE => find_file_ident(watcher, ev.ident as RawFd).unwrap(),
EventFilter::EVFILT_VNODE => find_file_ident(watcher, ev.ident as RawFd).unwrap(),
EventFilter::EVFILT_SIGNAL => Ident::Signal(ev.ident as i32),
EventFilter::EVFILT_TIMER => Ident::Timer(ev.ident as i32),
EventFilter::EVFILT_PROC => Ident::Pid(ev.ident as pid_t),
_ => panic!("not supported"),
};
Event {
data: EventData::Error(io::Error::last_os_error()),
ident,
}
}
#[doc(hidden)]
pub fn is_err(&self) -> bool {
matches!(self.data, EventData::Error(_))
}
}
impl<'a> Iterator for EventIter<'a> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
if !self.watcher.started {
return None;
}
get_event(self.watcher, None)
}
}
#[cfg(test)]
mod tests {
use super::{EventData, EventFilter, FilterFlag, Ident, Vnode, Watcher};
use std::fs;
use std::io::Write;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::path::Path;
use std::thread;
use std::time;
#[cfg(target_os = "freebsd")]
use std::process;
#[test]
fn test_new_watcher() {
let mut watcher = Watcher::new().expect("new failed");
let file = tempfile::tempfile().expect("Couldn't create tempfile");
watcher
.add_file(&file, EventFilter::EVFILT_VNODE, FilterFlag::NOTE_WRITE)
.expect("add failed");
watcher.watch().expect("watch failed");
}
#[test]
fn test_filename() {
let mut watcher = Watcher::new().expect("new failed");
let file = tempfile::NamedTempFile::new().expect("Couldn't create tempfile");
watcher
.add_filename(
file.path(),
EventFilter::EVFILT_VNODE,
FilterFlag::NOTE_WRITE,
)
.expect("add failed");
watcher.watch().expect("watch failed");
let mut new_file = fs::OpenOptions::new()
.write(true)
.open(file.path())
.expect("open failed");
new_file.write_all(b"foo").expect("write failed");
thread::sleep(time::Duration::from_secs(1));
let ev = watcher.iter().next().expect("Could not get a watch");
assert!(matches!(ev.data, EventData::Vnode(Vnode::Write)));
match ev.ident {
Ident::Filename(_, name) => assert!(Path::new(&name) == file.path()),
_ => panic!(),
};
}
#[test]
fn test_file() {
let mut watcher = Watcher::new().expect("new failed");
let mut file = tempfile::tempfile().expect("Could not create tempfile");
watcher
.add_file(&file, EventFilter::EVFILT_VNODE, FilterFlag::NOTE_WRITE)
.expect("add failed");
watcher.watch().expect("watch failed");
file.write_all(b"foo").expect("write failed");
thread::sleep(time::Duration::from_secs(1));
let ev = watcher.iter().next().expect("Didn't get an event");
assert!(matches!(ev.data, EventData::Vnode(Vnode::Write)));
assert!(matches!(ev.ident, Ident::Fd(_)));
}
#[test]
fn test_delete_filename() {
let mut watcher = Watcher::new().expect("new failed");
let file = tempfile::NamedTempFile::new().expect("Could not create tempfile");
let filename = file.path();
watcher
.add_filename(filename, EventFilter::EVFILT_VNODE, FilterFlag::NOTE_WRITE)
.expect("add failed");
watcher.watch().expect("watch failed");
watcher
.remove_filename(filename, EventFilter::EVFILT_VNODE)
.expect("delete failed");
}
#[test]
fn test_dupe() {
let mut watcher = Watcher::new().expect("new failed");
let file = tempfile::NamedTempFile::new().expect("Couldn't create tempfile");
let filename = file.path();
watcher
.add_filename(filename, EventFilter::EVFILT_VNODE, FilterFlag::NOTE_WRITE)
.expect("add failed");
watcher
.add_filename(filename, EventFilter::EVFILT_VNODE, FilterFlag::NOTE_WRITE)
.expect("second add failed");
assert_eq!(
watcher.watched.len(),
1,
"Did not get an expected number of events"
);
}
#[test]
fn test_two_files() {
let mut watcher = Watcher::new().expect("new failed");
let mut first_file = tempfile::tempfile().expect("Unable to create first temporary file");
let mut second_file = tempfile::tempfile().expect("Unable to create second temporary file");
watcher
.add_file(
&first_file,
EventFilter::EVFILT_VNODE,
FilterFlag::NOTE_WRITE,
)
.expect("add failed");
watcher
.add_file(
&second_file,
EventFilter::EVFILT_VNODE,
FilterFlag::NOTE_WRITE,
)
.expect("add failed");
watcher.watch().expect("watch failed");
first_file.write_all(b"foo").expect("first write failed");
second_file.write_all(b"foo").expect("second write failed");
thread::sleep(time::Duration::from_secs(1));
watcher.iter().next().expect("didn't get any events");
watcher.iter().next().expect("didn't get any events");
}
#[test]
fn test_nested_kqueue() {
let mut watcher = Watcher::new().expect("Failed to create main watcher");
let mut nested_watcher = Watcher::new().expect("Failed to create nested watcher");
let kqueue_file = unsafe { fs::File::from_raw_fd(nested_watcher.as_raw_fd()) };
watcher
.add_file(&kqueue_file, EventFilter::EVFILT_READ, FilterFlag::empty())
.expect("add_file failed for main watcher");
let mut file = tempfile::tempfile().expect("Couldn't create tempfile");
nested_watcher
.add_file(&file, EventFilter::EVFILT_VNODE, FilterFlag::NOTE_WRITE)
.expect("add_file failed for nested watcher");
watcher.watch().expect("watch failed on main watcher");
nested_watcher
.watch()
.expect("watch failed on nested watcher");
file.write_all(b"foo").expect("write failed");
thread::sleep(time::Duration::from_secs(1));
watcher.iter().next().expect("didn't get any events");
nested_watcher.iter().next().expect("didn't get any events");
}
#[test]
#[cfg(target_os = "freebsd")]
fn test_close_read() {
let mut watcher = Watcher::new().expect("new failed");
{
let file = tempfile::NamedTempFile::new().expect("temporary file failed to create");
watcher
.add_filename(
file.path(),
EventFilter::EVFILT_VNODE,
FilterFlag::NOTE_CLOSE,
)
.expect("add failed");
watcher.watch().expect("watch failed");
process::Command::new("cat")
.arg(file.path())
.spawn()
.expect("should spawn a file");
thread::sleep(time::Duration::from_secs(1));
}
let ev = watcher.iter().next().expect("did not receive event");
assert!(matches!(ev.data, EventData::Vnode(Vnode::Close)));
}
#[test]
#[cfg(target_os = "freebsd")]
fn test_close_write() {
let mut watcher = match Watcher::new() {
Ok(wat) => wat,
Err(_) => panic!("new failed"),
};
{
let file = tempfile::NamedTempFile::new().expect("couldn't create tempfile");
watcher
.add_filename(
file.path(),
EventFilter::EVFILT_VNODE,
FilterFlag::NOTE_CLOSE_WRITE,
)
.expect("add failed");
watcher.watch().expect("watch failed");
process::Command::new("cat")
.arg(file.path())
.spawn()
.expect("should spawn a file");
thread::sleep(time::Duration::from_secs(1));
}
let ev = watcher.iter().next().expect("didn't get an event");
assert!(matches!(ev.data, EventData::Vnode(Vnode::CloseWrite)));
}
}