use chrono::Utc; use flexi_logger::LoggerHandle; use log::{debug, error, info, trace, warn}; use regex::RegexBuilder; use std::fs; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::{Arc, Condvar, Mutex}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; pub mod bonding; pub mod config; pub mod filesum; pub mod helper; pub mod multipath; pub mod processes; pub mod snmp; use bonding::bonding_status; use config::DataFunctionsFilesum; use filesum::filesum_filtered; use helper::{compile_re, compile_re_bin}; use multipath::multipath_status; use processes::Ptypes; use snmp::{Oid, OidData, SnmpData}; fn t_multipath( t_quit: Arc<(Mutex, Condvar)>, t_check_interval: u64, snmp_data: Arc>, options: Option, ) { debug!("Startup of t_multipath function"); #[cfg(target_os = "linux")] { let oidt = Oid::from_str("2.1.0").unwrap(); let oidc = Oid::from_str("2.2.0").unwrap(); let oidtab_dm = Oid::from_str("2.3.1.1").unwrap(); let oidtab_uuid = Oid::from_str("2.3.1.2").unwrap(); let oidtab_desc = Oid::from_str("2.3.1.3").unwrap(); let oidtab_slt = Oid::from_str("2.3.1.4").unwrap(); let oidtab_sle = Oid::from_str("2.3.1.5").unwrap(); let oidtab_end = Oid::from_str("2.3.9999").unwrap(); let filter_re = compile_re(options, "multipath_filter"); let base_path = String::from("/sys/devices/virtual/block"); let (lock, cvar) = &*t_quit; let mut quit = false; while !quit { debug!("Starting multipath_status function"); let mut mplist = Vec::new(); match multipath_status(&base_path, filter_re.as_ref()) { Ok(m) => { trace!("multipath_status returned Ok with {:?}", &m); mplist = m; } Err(e) => { error!("Error at reading sys multipath status files: {}", e); } } let now = Utc::now().timestamp().try_into().unwrap_or(0); { trace!("try to lock mutex snmp_data to update multipath {:?}", now); let mut guard = snmp_data.lock().unwrap(); trace!("mutex for update multipath snmp_data now locked"); let snmp_data = &mut guard.data; snmp_data.insert(oidt.clone(), SnmpData::Gauge(now)); snmp_data.insert(oidc.clone(), SnmpData::Gauge(mplist.len().try_into().unwrap_or(0))); // since we have not implemnted the copy trait, we must use the old way ... // 1. Collect the keys to be removed. let keys_to_remove: Vec = snmp_data .range(&oidtab_dm..&oidtab_end) .map(|(k, _)| k.clone()) .collect(); // 2. Iterate over the collected keys and remove them. for key in keys_to_remove { snmp_data.remove(&key); } if !mplist.is_empty() { let mut n = 1; for e in mplist { snmp_data.insert(oidtab_dm.add_suffix_int(n), SnmpData::String(e.mp)); snmp_data.insert(oidtab_uuid.add_suffix_int(n), SnmpData::String(e.uuid)); snmp_data.insert( oidtab_desc.add_suffix_int(n), SnmpData::String(format!("{},{}", e.vendor, e.model)), ); snmp_data.insert(oidtab_slt.add_suffix_int(n), SnmpData::Gauge(e.slave_count)); snmp_data.insert(oidtab_sle.add_suffix_int(n), SnmpData::Gauge(e.slave_failed)); n += 1; } } } { let guard = lock.lock().unwrap(); let qres = cvar.wait_timeout(guard, Duration::from_secs(t_check_interval)).unwrap(); if !qres.1.timed_out() { quit = *qres.0; } } } } debug!("Exit of t_multipath function"); } fn t_bonding( t_quit: Arc<(Mutex, Condvar)>, t_check_interval: u64, snmp_data: Arc>, options: Option, ) { debug!("Startup of t_bonding function"); #[cfg(target_os = "linux")] { let oidt = Oid::from_str("5.1.0").unwrap(); let oidc = Oid::from_str("5.2.0").unwrap(); let oidtab_bond = Oid::from_str("5.3.1.1").unwrap(); let oidtab_bond_state = Oid::from_str("5.3.1.2").unwrap(); let oidtab_slave = Oid::from_str("5.3.1.3").unwrap(); let oidtab_slave_state = Oid::from_str("5.3.1.4").unwrap(); let oidtab_slave_mode = Oid::from_str("5.3.1.5").unwrap(); let oidtab_end = Oid::from_str("5.3.9999").unwrap(); let filter_re = compile_re(options, "bonding_filter"); let base_path = String::from("/sys/devices/virtual/net"); let (lock, cvar) = &*t_quit; let mut quit = false; while !quit { debug!("Starting bonding_status function"); let mut bl = Vec::new(); match bonding_status(&base_path, filter_re.as_ref()) { Ok(b) => { trace!("bonding_status returned Ok with {:?}", &b); bl = b; } Err(e) => { error!("Error at reading sys bonding status files: {}", e); } } let now = Utc::now().timestamp().try_into().unwrap_or(0); { trace!("try to lock mutex snmp_data to update bonding with {:?}", now); let mut guard = snmp_data.lock().unwrap(); trace!("mutex for update bonding snmp_data now locked"); let snmp_data = &mut guard.data; snmp_data.insert(oidt.clone(), SnmpData::Gauge(now)); snmp_data.insert(oidc.clone(), SnmpData::Gauge(bl.len().try_into().unwrap_or(0))); // since we have not implemnted the copy trait, we must use the old way ... // 1. Collect the keys to be removed. let keys_to_remove: Vec = snmp_data .range(&oidtab_bond..&oidtab_end) .map(|(k, _)| k.clone()) .collect(); // 2. Iterate over the collected keys and remove them. for key in keys_to_remove { snmp_data.remove(&key); } if !bl.is_empty() { let mut n = 1; for e in bl { snmp_data.insert(oidtab_bond.add_suffix_int(n), SnmpData::String(e.bond)); snmp_data.insert(oidtab_bond_state.add_suffix_int(n), SnmpData::String(e.master_state)); snmp_data.insert(oidtab_slave.add_suffix_int(n), SnmpData::String(e.slave)); snmp_data.insert(oidtab_slave_state.add_suffix_int(n), SnmpData::String(e.slave_state)); snmp_data.insert(oidtab_slave_mode.add_suffix_int(n), SnmpData::String(e.mode)); n += 1; } } } { let guard = lock.lock().unwrap(); let qres = cvar.wait_timeout(guard, Duration::from_secs(t_check_interval)).unwrap(); if !qres.1.timed_out() { quit = *qres.0; } } } } debug!("Exit of t_bonding function"); } fn t_filesum( t_quit: Arc<(Mutex, Condvar)>, t_check_interval: u64, snmp_data: Arc>, options: DataFunctionsFilesum, ) { debug!("Startup of t_filesum function"); #[cfg(target_os = "linux")] { // allocate some strings for holding file contents between check runs let mut oldpasswd = Vec::with_capacity(2048); let mut oldshadow = Vec::with_capacity(2048); let mut oldgroup = Vec::with_capacity(2048); let mut oldauthkey = Vec::with_capacity(2048); let mut hash_passwd = String::with_capacity(128); let mut diff_passwd = String::with_capacity(2048); let mut hash_shadow = String::with_capacity(128); let mut diff_shadow = String::with_capacity(2048); let mut hash_group = String::with_capacity(128); let mut diff_group = String::with_capacity(2048); let mut hash_authkey = String::with_capacity(128); let mut diff_authkey = String::with_capacity(2048); // allocate Option for our regex ... let re_passwd = compile_re_bin(options.passwd, "passwd"); let re_shadow = compile_re_bin(options.shadow, "shadow"); let re_group = compile_re_bin(options.group, "group"); let re_authkey = compile_re_bin(options.authorized_keys, "authorized_keys"); // prepare variables which we use in the whole function let oid_filesum_time = Oid::from_str("6.1.0").unwrap(); let oid_passwd_hash = Oid::from_str("6.3.1.2.1").unwrap(); let oid_passwd_diff = Oid::from_str("6.3.1.3.1").unwrap(); let oid_shadow_hash = Oid::from_str("6.3.1.2.2").unwrap(); let oid_shadow_diff = Oid::from_str("6.3.1.3.2").unwrap(); let oid_group_hash = Oid::from_str("6.3.1.2.3").unwrap(); let oid_group_diff = Oid::from_str("6.3.1.3.3").unwrap(); let oid_authkey_hash = Oid::from_str("6.3.1.2.4").unwrap(); let oid_authkey_diff = Oid::from_str("6.3.1.3.4").unwrap(); let oid_xxx_diff = Oid::from_str("6.3.1.3.999").unwrap(); let fn_passwd_str = String::from("/etc/passwd"); let fn_shadow_str = String::from("/etc/shadow"); let fn_group_str = String::from("/etc/group"); let fn_authkey_str = String::from("/root/.ssh/authorized_keys"); let fn_passwd = PathBuf::from(&fn_passwd_str); let fn_shadow = PathBuf::from(&fn_shadow_str); let fn_group = PathBuf::from(&fn_group_str); let fn_authkey = PathBuf::from(&fn_authkey_str); { let oid_filesum_cnt = Oid::from_str("6.2.0").unwrap(); let oid_passwd_filename = Oid::from_str("6.3.1.1.1").unwrap(); let oid_shadow_filename = Oid::from_str("6.3.1.1.2").unwrap(); let oid_group_filename = Oid::from_str("6.3.1.1.3").unwrap(); let oid_authkey_filename = Oid::from_str("6.3.1.1.4").unwrap(); trace!("try to lock mutex snmp_data to update filesum header data"); let mut guard = snmp_data.lock().unwrap(); trace!("mutex for update filesum snmp_data now locked"); let snmp_data = &mut guard.data; snmp_data.insert(oid_filesum_cnt, SnmpData::Gauge(4)); snmp_data.insert(oid_passwd_filename, SnmpData::String(fn_passwd_str.clone())); snmp_data.insert(oid_shadow_filename, SnmpData::String(fn_shadow_str.clone())); snmp_data.insert(oid_group_filename, SnmpData::String(fn_group_str.clone())); snmp_data.insert(oid_authkey_filename, SnmpData::String(fn_authkey_str.clone())); } let (lock, cvar) = &*t_quit; let mut quit = false; let mut is_changed = true; while !quit { match filesum_filtered(&fn_passwd, &mut oldpasswd, &mut diff_passwd, re_passwd.as_ref()) { Ok((changed, hash)) => { if changed { warn!("Hash of {} is now {}", fn_passwd_str, hash); hash_passwd = hash; is_changed = true; } else { trace!("Hash of {} is still now {}", fn_passwd_str, hash); if hash_passwd.is_empty() { hash_passwd = hash; } } } Err(e) => { error!("Filesum error {}", e); } } match filesum_filtered(&fn_shadow, &mut oldshadow, &mut diff_shadow, re_shadow.as_ref()) { Ok((changed, hash)) => { if changed { warn!("Hash of {} is now {}", fn_shadow_str, hash); hash_shadow = hash; is_changed = true; } else { trace!("Hash of {} is still now {}", fn_shadow_str, hash); if hash_shadow.is_empty() { hash_shadow = hash; } } } Err(e) => { error!("Filesum error {}", e); } } match filesum_filtered(&fn_group, &mut oldgroup, &mut diff_group, re_group.as_ref()) { Ok((changed, hash)) => { if changed { warn!("Hash of {} is now {}", fn_group_str, hash); hash_group = hash; is_changed = true; } else { trace!("Hash of {} is still now {}", fn_group_str, hash); if hash_group.is_empty() { hash_group = hash; } } } Err(e) => { error!("Filesum error {}", e); } } match filesum_filtered(&fn_authkey, &mut oldauthkey, &mut diff_authkey, re_authkey.as_ref()) { Ok((changed, hash)) => { if changed { warn!("Hash of {} is now {}", fn_authkey_str, hash); hash_authkey = hash; is_changed = true; } else { trace!("Hash of {} is still now {}", fn_authkey_str, hash); if hash_authkey.is_empty() { hash_authkey = hash; } } } Err(e) => { error!("Filesum error {}", e); } } let now = Utc::now().timestamp().try_into().unwrap_or(0); { trace!( "try to lock mutex snmp_data to update filesum data, timestamp {:?}", now ); let mut guard = snmp_data.lock().unwrap(); trace!("mutex for update filesum snmp_data now locked"); let snmp_data = &mut guard.data; snmp_data.insert(oid_filesum_time.clone(), SnmpData::Gauge(now)); if is_changed { snmp_data.insert(oid_passwd_hash.clone(), SnmpData::String(hash_passwd.clone())); snmp_data.insert(oid_shadow_hash.clone(), SnmpData::String(hash_shadow.clone())); snmp_data.insert(oid_group_hash.clone(), SnmpData::String(hash_group.clone())); snmp_data.insert(oid_authkey_hash.clone(), SnmpData::String(hash_authkey.clone())); // since we have not implemnted the copy trait, we must use the old way ... // 1. Collect the keys to be removed. let keys_to_remove: Vec = snmp_data .range(&oid_passwd_diff..&oid_xxx_diff) .map(|(k, _)| k.clone()) .collect(); // 2. Iterate over the collected keys and remove them. for key in keys_to_remove { snmp_data.remove(&key); } if !diff_passwd.is_empty() { let mut n = 1; for l in diff_passwd.lines() { snmp_data.insert(oid_passwd_diff.add_suffix_int(n), SnmpData::String(l.to_string())); n += 1; } } if !diff_shadow.is_empty() { let mut n = 1; for l in diff_shadow.lines() { snmp_data.insert(oid_shadow_diff.add_suffix_int(n), SnmpData::String(l.to_string())); n += 1; } } if !diff_group.is_empty() { let mut n = 1; for l in diff_group.lines() { snmp_data.insert(oid_group_diff.add_suffix_int(n), SnmpData::String(l.to_string())); n += 1; } } if !diff_authkey.is_empty() { let mut n = 1; for l in diff_authkey.lines() { snmp_data.insert(oid_authkey_diff.add_suffix_int(n), SnmpData::String(l.to_string())); n += 1; } } } is_changed = false; } { let guard = lock.lock().unwrap(); let qres = cvar.wait_timeout(guard, Duration::from_secs(t_check_interval)).unwrap(); if !qres.1.timed_out() { quit = *qres.0; } } } } debug!("Exit of t_filesum function"); } fn t_processes(t_quit: Arc<(Mutex, Condvar)>, t_check_interval: u64, snmp_data: Arc>) { debug!("Startup of t_processes function"); #[cfg(target_os = "linux")] { let proc_path = Path::new("/proc"); let mut proc_data = Ptypes::default(); let oidt = Oid::from_str("4.1.0").unwrap(); let oidz = Oid::from_str("4.2.0").unwrap(); let oidr = Oid::from_str("4.3.0").unwrap(); let oids = Oid::from_str("4.4.0").unwrap(); let oidi = Oid::from_str("4.5.0").unwrap(); let oidw = Oid::from_str("4.6.0").unwrap(); let (lock, cvar) = &*t_quit; let mut quit = false; while !quit { debug!("Starting count_processes function for /proc"); let res = processes::count_processes(proc_path); match res { Ok(v) => { trace!("count_processes finished, result: {:?}", v); proc_data = v; } Err(e) => { error!("Error at reading proc files: {}", e); } } let now = Utc::now().timestamp().try_into().unwrap_or(0); { trace!( "try to lock mutex snmp_data to update processdata with {:#?} => {:?}", now, proc_data ); let mut guard = snmp_data.lock().unwrap(); trace!("mutex for update processdata snmp_data now locked"); let snmp_data = &mut guard.data; snmp_data.insert(oidt.clone(), SnmpData::Gauge(now)); snmp_data.insert(oidz.clone(), SnmpData::Gauge(proc_data.zombie)); snmp_data.insert(oidr.clone(), SnmpData::Gauge(proc_data.running)); snmp_data.insert(oids.clone(), SnmpData::Gauge(proc_data.sleeping)); snmp_data.insert(oidi.clone(), SnmpData::Gauge(proc_data.idle)); snmp_data.insert(oidw.clone(), SnmpData::Gauge(proc_data.waiting)); } { let guard = lock.lock().unwrap(); let qres = cvar.wait_timeout(guard, Duration::from_secs(t_check_interval)).unwrap(); if !qres.1.timed_out() { quit = *qres.0; } } } } debug!("Exit of t_processes function"); } fn t_meminfo(t_quit: Arc<(Mutex, Condvar)>, t_check_interval: u64, snmp_data: Arc>) { debug!("Startup of t_meminfo function"); #[cfg(target_os = "linux")] { let re = RegexBuilder::new(r"^MemAvailable:\s+(\d+) kB$") .multi_line(true) .unicode(false) .build() .expect("Unable to compile Regex"); let re_old = RegexBuilder::new(r"^MemFree:\s+(\d+) kB.*?Buffers:\s+(\d+) kB$") .multi_line(true) .dot_matches_new_line(true) .unicode(true) // Muss true sein, sonst bekommen wir eine Runtime Panic .build() .expect("Unable to compile Regex"); let fpath = PathBuf::from("/proc/meminfo"); let oidt = Oid::from_str("3.1.0").unwrap(); let oidm = Oid::from_str("3.2.0").unwrap(); let (lock, cvar) = &*t_quit; let mut quit = false; while !quit { let mut freekb: u64 = 0; if let Ok(meminfo_contents) = fs::read_to_string(&fpath) { trace!("Read /prod/meminfo, contents: {:#?}", meminfo_contents); if let Some(m) = re.captures(&meminfo_contents) { trace!("regex mached {:?}", m); if let Some(m) = m.get(1) { freekb += m.as_str().parse().unwrap_or(0); debug!("freekb via regex parsed as {:#?}", freekb); } } else if let Some(m) = re_old.captures(&meminfo_contents) { trace!("old regex mached {:#?}", m); if let Some(m) = m.get(1) { freekb += m.as_str().parse().unwrap_or(0); } if let Some(m) = m.get(2) { freekb += m.as_str().parse().unwrap_or(0); } debug!("freekb via old regex parsed as {:#?}", freekb); } let now = Utc::now().timestamp().try_into().unwrap_or(0); { trace!( "try to lock mutex snmp_data to update meminfo with {:#?} => {:#?}", now, freekb ); let mut guard = snmp_data.lock().unwrap(); trace!("mutex for update meminfo snmp_data now locked"); let snmp_data = &mut guard.data; snmp_data.insert(oidt.clone(), SnmpData::Gauge(now)); //snmp_data.insert(oidm.clone(), SnmpData::String(freekb.to_string())); snmp_data.insert(oidm.clone(), SnmpData::Counter64(freekb)); } }; // store results in this mutex protected block { let guard = lock.lock().unwrap(); let qres = cvar.wait_timeout(guard, Duration::from_secs(t_check_interval)).unwrap(); if !qres.1.timed_out() { quit = *qres.0; } } } } debug!("Exit of t_meminfo function"); } fn log_debug_watcher( t_quit: Arc<(Mutex, Condvar)>, mut log: LoggerHandle, t_marker: PathBuf, t_check_interval: u64, ) { debug!("Start of log_debug_watcher function"); let mut last = false; let (lock, cvar) = &*t_quit; let mut quit = false; while !quit { trace!("quit not requested, going to check marker file now"); match t_marker.try_exists() { Ok(v) => { if v != last { trace!("marker file {} is now readable: {:?}", t_marker.display(), v); let r = match v { true => log.parse_and_push_temp_spec("trace"), false => { log.pop_temp_spec(); Ok(()) } }; match r { Ok(_) => warn!( "Log config changed to {}", log.current_max_level() .expect("Retrive the current log level not possible") ), Err(e) => error!("Unable to change log config {:#?}", e), }; last = v; }; } Err(e) => { error!( "Unable to check debug marker file {}, error: {:#?}", t_marker.display(), e ) } }; { let guard = lock.lock().unwrap(); let qres = cvar.wait_timeout(guard, Duration::from_secs(t_check_interval)).unwrap(); if !qres.1.timed_out() { quit = *qres.0; } } //thread::sleep(Duration::from_secs(t_check_interval)); } debug!("Exit of log_debug_watcher function"); } pub fn start_workers( config: &config::AppConfig, quit_pair: &Arc<(Mutex, Condvar)>, snmp_data: &Arc>, log: flexi_logger::LoggerHandle, ) -> Vec> { // handles collect all our thread handles ... let mut handles: Vec> = Vec::new(); // start log_debug_watcher if config.intervals.log_debug_watcher.is_some() { let t_quit = Arc::clone(quit_pair); let t_marker = config.debug_log_marker.clone(); let t_check_interval = config .intervals .log_debug_watcher .expect("Unable to get config.intervals.log_debug_watcher"); match Builder::new() .name("log_debug_watcher".to_string()) .spawn(move || log_debug_watcher(t_quit, log, t_marker, t_check_interval)) { Ok(v) => { info!( "started log_debug_watcher thread, checking {} every {}s", config.debug_log_marker.display(), t_check_interval ); handles.push(v); } Err(e) => { error!("Unable to start log_debug_watcher thread: {:#?}", e); //eprintln!("Unable to start log_debug_watcher thread: {:#?}", e); } }; } else { warn!("log_debug_watcher thread not started, as it was disabled via config") } // start meminfo worker thread if let Some(t_check_interval) = config.intervals.meminfo() { let t_quit = Arc::clone(quit_pair); let t_snmp_data = Arc::clone(snmp_data); match Builder::new() .name("meminfo".to_string()) .spawn(move || t_meminfo(t_quit, t_check_interval, t_snmp_data)) { Ok(v) => { info!("started meminfo thread, checking meminfo every {}s", t_check_interval); handles.push(v); } Err(e) => { error!("Unable to start meminfo thread: {:#?}", e); //eprintln!("Unable to start meminfo thread: {:#?}", e); } }; } else { warn!("meminfo thread not started, as it was disabled via config") } // start zombie/processes worker thread if let Some(t_check_interval) = config.intervals.processes() { let t_quit = Arc::clone(quit_pair); let t_snmp_data = Arc::clone(snmp_data); match Builder::new() .name("processes".to_string()) .spawn(move || t_processes(t_quit, t_check_interval, t_snmp_data)) { Ok(v) => { info!( "started processes thread, checking processes every {}s", t_check_interval ); handles.push(v); } Err(e) => { error!("Unable to start processes thread: {:#?}", e); //eprintln!("Unable to start processes thread: {:#?}", e); } }; } else { warn!("processes thread not started, as it was disabled via config") } // start filesum worker thread if let Some(t_check_interval) = config.intervals.filesum() { let t_quit = Arc::clone(quit_pair); let t_snmp_data = Arc::clone(snmp_data); let c = config.extra_config.filesum(); match Builder::new() .name("filesum".to_string()) .spawn(move || t_filesum(t_quit, t_check_interval, t_snmp_data, c)) { Ok(v) => { info!("started filesum thread, checking filesum every {}s", t_check_interval); handles.push(v); } Err(e) => { error!("Unable to start filesum thread: {:#?}", e); //eprintln!("Unable to start filesum thread: {:#?}", e); } }; } else { warn!("filesum thread not started, as it was disabled via config") } // start multipath worker thread if let Some(t_check_interval) = config.intervals.multipath() { let t_quit = Arc::clone(quit_pair); let t_snmp_data = Arc::clone(snmp_data); let c = config.extra_config.multipath(); match Builder::new() .name("multipath".to_string()) .spawn(move || t_multipath(t_quit, t_check_interval, t_snmp_data, c)) { Ok(v) => { info!( "started multipath thread, checking multipath every {}s", t_check_interval ); handles.push(v); } Err(e) => { error!("Unable to start multipath thread: {:#?}", e); //eprintln!("Unable to start multipath thread: {:#?}", e); } }; } else { warn!("multipath thread not started, as it was disabled via config") } // start bonding worker thread if let Some(t_check_interval) = config.intervals.bonding() { let t_quit = Arc::clone(quit_pair); let t_snmp_data = Arc::clone(snmp_data); let c = config.extra_config.bonding(); match Builder::new() .name("bonding".to_string()) .spawn(move || t_bonding(t_quit, t_check_interval, t_snmp_data, c)) { Ok(v) => { info!("started bonding thread, checking bonding every {}s", t_check_interval); handles.push(v); } Err(e) => { error!("Unable to start bonding thread: {:#?}", e); //eprintln!("Unable to start bonding thread: {:#?}", e); } }; } else { warn!("bonding thread not started, as it was disabled via config") } // return handles so main can join at the end to all handles ... handles }