From 6e64806b2f9bd77ec98296b4306c615880894a1c Mon Sep 17 00:00:00 2001 From: Nathan Reiner Date: Sat, 8 Jul 2023 16:25:49 +0200 Subject: add multithreading to merge and generate --- src/gui/mod.rs | 41 ++++++---- src/index.rs | 237 +++++++++++++++++++++++++++++++++++++++++---------------- src/main.rs | 30 +++++--- 3 files changed, 214 insertions(+), 94 deletions(-) diff --git a/src/gui/mod.rs b/src/gui/mod.rs index 5677db1..389fdb6 100644 --- a/src/gui/mod.rs +++ b/src/gui/mod.rs @@ -1,4 +1,3 @@ -use std::io::Write; use std::thread; use std::time::Duration; @@ -13,12 +12,14 @@ use std::sync::Mutex; use crate::index::Index; use crate::searchresult::SearchResult; use crate::splitter; +use crate::index::GenState; mod easing; mod circular; static SEARCH_ID: Lazy = Lazy::new(text_input::Id::unique); static GENERATE_PROGRESS : Lazy> = Lazy::new(|| { Mutex::new(0) }); +static GENERATE_STATUS : Lazy> = Lazy::new(|| { Mutex::new(GenState::Fetching) }); pub fn run() -> iced::Result { App::run(Settings { @@ -33,16 +34,11 @@ pub fn run() -> iced::Result { #[derive(Debug)] enum App { StartMenu, - Generating(GenState), + Generating(GenProgress), Loading, Search(State), } -#[derive(Debug, Default)] -struct GenState { - progress: u8, -} - #[derive(Debug, Default)] struct State { input_value: String, @@ -55,6 +51,12 @@ struct SearchState { index : Index, } +#[derive(Debug, Clone, Default)] +struct GenProgress { + state: GenState, + progress: u8 +} + impl SearchState { pub async fn search(self) -> Vec { let search_args = splitter::split_to_words(self.search.clone()); @@ -67,7 +69,7 @@ enum Message { Load, Generate, Loaded(Index), - GeneratingUpdate(u8), + GeneratingUpdate(GenProgress), InputChanged(String), SearchSubmit, SearchFinished(Vec) @@ -100,22 +102,23 @@ async fn generate() -> Index { let input = input.unwrap(); let input = input.to_str(); let input = input.unwrap(); - let index = Index::generate(input, |counter, nof| { - let p = ((counter * 100) / nof) as u8; + let index = Index::generate(input, |t, p| { *GENERATE_PROGRESS.lock().unwrap() = p; - std::io::stdout().flush().ok(); + *GENERATE_STATUS.lock().unwrap() = t; }); index.save(file.to_string()); index } -async fn generate_update_timer() -> u8 { +async fn generate_update_timer() -> GenProgress { thread::sleep(Duration::from_millis(100)); let p; + let s; { p = *GENERATE_PROGRESS.lock().unwrap(); + s = *GENERATE_STATUS.lock().unwrap(); } - p + GenProgress { state : s, progress: p } } impl Application for App { @@ -174,8 +177,9 @@ impl Application for App { *self = App::Search(State { index, ..Default::default() }); Command::none() } - Message::GeneratingUpdate(p) => { - state.progress = p; + Message::GeneratingUpdate(s) => { + state.progress = s.progress; + state.state = s.state; Command::perform(generate_update_timer(), Message::GeneratingUpdate) } _ => { @@ -225,8 +229,13 @@ impl Application for App { ].spacing(10).align_items(iced::Alignment::Center) } App::Generating(state) => { + let t = match state.state { + GenState::Fetching => { text("Fetching") } + GenState::Parsing => { text("Parsing") } + GenState::Merging => { text("Merging") } + }; column![ - text("Generating"), + t, progress_bar(0.0..=100.0, state.progress as f32) ].spacing(10).align_items(iced::Alignment::Center) } diff --git a/src/index.rs b/src/index.rs index 27bb56b..78df3a1 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,6 +1,7 @@ use std::collections::{HashSet, HashMap}; use std::fs::File; use std::io::{Write, BufReader, BufRead}; +use std::sync::mpsc::{channel, Sender}; use walkdir::*; use std::thread; use std::option::Option::None; @@ -27,6 +28,14 @@ impl Default for Index { } } +#[derive(Clone, Debug, Default, Copy)] +pub enum GenState { + #[default] + Fetching, + Parsing, + Merging +} + impl Index { pub fn empty() -> Self { Self { @@ -35,58 +44,99 @@ impl Index { } } - pub fn generate(input_path : &str, callback : impl Fn(u64, u64)) -> Self { - let mut dict = Dictionary::new(); - let mut filecache : Vec = Vec::new(); + pub fn generate(input_path : &str, callback : impl Fn(GenState, u8)) -> Self { let mut nof = 0; let mut counter = 0; + let mut crawler_handles = Vec::new(); + let num_threads = thread::available_parallelism().unwrap().get(); + let mut tx_vec : Vec> = Vec::new(); thread::scope(|s| { let mut nof_handle : Option<_> = Some(s.spawn(|| filecount(input_path))); + for _ in 0..num_threads { + let (tx, rx) = channel(); + tx_vec.push(tx); + crawler_handles.push(thread::spawn(move || { + let mut dict = Dictionary::new(); + let mut filecache : Vec = Vec::new(); + + loop { + let path = rx.recv().unwrap(); + if path.is_empty() { + return Self { + dictionary : dict, + filecache + } + } + + let content : String = text::extract_text(path.as_str()); + + if content.is_empty() { + continue; + } + + let words : Vec = splitter::split_to_words(content); + + for word in words.iter() { + dict.set(word.clone()); + } + + let fv = dict.vectorize_word_list(words.clone()); + filecache.push(FileCache { + path, + vector : fv + }); + } + })); + } + + let mut next_crawler = 0; + let mut last_p = u64::MAX; + for entry in WalkDir::new(input_path) .into_iter() .filter_map(|e| e.ok()) { counter += 1; if entry.path().is_file() { - let content : String = text::extract_text(entry.path().to_str().unwrap()); - - if content.is_empty() { - continue + tx_vec[next_crawler].send(entry.path().to_str().unwrap().to_string()).ok(); + next_crawler += 1; + if next_crawler == num_threads { + next_crawler = 0; } - let words : Vec = splitter::split_to_words(content); - - for word in words.iter() { - dict.set(word.clone()); - } - - let fv = dict.vectorize_word_list(words.clone()); - filecache.push(FileCache { - path : entry.path().to_str().unwrap().to_string(), - vector : fv - }); - - - } - match nof_handle { - Some(t) => { - nof = t.join().unwrap(); - nof_handle = None; - } - None => { - callback(counter, nof); + match nof_handle { + Some(t) => { + nof = t.join().unwrap(); + nof_handle = None; + } + None => { + // Make sure that we only push a update + // if there is a visual change to the number + // because updating the screen takes a lot + // of time. + let p = counter * 100 / nof; + if p != last_p { + callback(GenState::Fetching, p as u8); + last_p = p; + } + } } } } - - callback(nof, nof); }); - Self { - dictionary : dict, - filecache + let mut indexes = Vec::new(); + let mut i = 0; + + for handle in crawler_handles { + callback(GenState::Parsing, (i * 100 / num_threads) as u8); + tx_vec[i].send(String::new()).ok(); + indexes.push(handle.join().unwrap()); + i += 1; } + + Index::merge(indexes, |p| { callback(GenState::Merging, p) }) } pub fn from_file(path : String) -> Self { @@ -95,7 +145,6 @@ impl Index { let mut filecache : Vec = Vec::new(); let mut dict = Dictionary::new(); - for line in reader.lines() { let l = line.unwrap(); if l.starts_with('#') { @@ -111,51 +160,105 @@ impl Index { } } - pub fn merge(a : Index, b : Index) -> Self { - let mut a_hash : HashSet = HashSet::new(); - let mut diff : Vec = Vec::new(); - let mut dict = a.dictionary.clone(); - let mut filecache = a.filecache.clone(); + fn merge_two(first : Index, second : Index) -> thread::JoinHandle { + thread::spawn(move || { + let (a, b) = if first.filecache.len() < second.filecache.len() { + (second, first) + } else { + (first, second) + }; + let mut filecache = a.filecache.clone(); + let mut dictionary = Dictionary::default(); - for file in a.filecache.iter() { - a_hash.insert(file.clone()); - } + thread::scope(|s| { + let mut a_hash : HashSet = HashSet::new(); + let mut diff : Vec = Vec::new(); - for file in b.filecache.iter() { - if !a_hash.contains(file) { - diff.push(file.clone()); - } - } + let converter_handle = s.spawn(|| { + let mut b_id_to_word : HashMap = HashMap::new(); - for (word, _) in b.dictionary.iter() { - dict.set(word.clone()); - } + for (value, id) in b.dictionary.iter() { + b_id_to_word.insert(id.clone(), value.clone()); + } + b_id_to_word + }); - let mut b_id_to_word : HashMap = HashMap::new(); + let dict_handle = s.spawn(|| { + let mut dict = a.dictionary.clone(); + for (word, _) in b.dictionary.iter() { + dict.set(word.clone()); + } + dict + }); - for (value, id) in b.dictionary.iter() { - b_id_to_word.insert(*id, value.clone()); - } + for file in a.filecache.iter() { + a_hash.insert(file.clone()); + } + + for file in b.filecache.iter() { + if !a_hash.contains(file) { + diff.push(file.clone()); + } + } + + let b_id_to_word = converter_handle.join().unwrap(); + dictionary = dict_handle.join().unwrap(); + + for file in diff { + let mut words = Vec::new(); + + for (word_id, i) in file.vector.iter() { + for _ in 0..*i { + words.push(b_id_to_word.get(word_id).unwrap().clone()); + } + } - for file in diff { - let mut words = Vec::new(); + filecache.push(FileCache { + path : file.path.clone(), + vector: dictionary.vectorize_word_list(words) + }); + } + }); + + Self { + dictionary, + filecache + } + }) + } - for (word_id, i) in file.vector.iter() { - for _ in 0..*i { - words.push(b_id_to_word.get(word_id).unwrap().clone()); + pub fn merge(indexes : Vec, callback : impl Fn(u8)) -> Self { + let mut idxs : Vec = indexes.clone(); + let max = (idxs.len() as f32).log2().ceil() as u32; + let mut i = 0 as u32; + + while idxs.len() > 1 { + callback((i * 100 / max) as u8); + i += 1; + let mut idxs_handle = Vec::new(); + let mut processed = Vec::new(); + + for chunk in idxs.chunks(2) { + if chunk.len() == 2 { + let a = chunk[0].clone(); + let b = chunk[1].clone(); + idxs_handle.push(Index::merge_two(a, b)); + } else { + for idx in chunk.iter() { + processed.push(idx.clone()) + } } } - filecache.push(FileCache { - path : file.path.clone(), - vector: dict.vectorize_word_list(words) - }); - } + for idx_handle in idxs_handle { + let idx : Index = idx_handle.join().unwrap(); + processed.push(idx) + } - Self { - dictionary: dict, - filecache + idxs = processed; } + + idxs.get(0).unwrap().clone() } pub fn search(&self, search_args : Vec) -> Vec { diff --git a/src/main.rs b/src/main.rs index 8cb8466..9a18e7e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![feature(thread_id_value)] pub mod vector; pub mod dictionary; pub mod text; @@ -12,6 +13,7 @@ pub mod gui; use index::Index; use std::io::*; use std::env; +use crate::index::GenState; fn main() { let args: Vec<_> = env::args().collect(); @@ -27,8 +29,13 @@ fn main() { let input = args.get(2).unwrap(); let file = args.get(3).unwrap(); - let _ = Index::generate(input, |counter, nof| { - eprint!("\r\x1b[2K{} of {} files indexed ({}%)", counter, nof, (counter * 100) / nof); + let _ = Index::generate(input, |t, p| { + eprint!("\r\x1b[2K{}% ", p); + match t { + GenState::Fetching => { eprint!("fetched") } + GenState::Parsing => { eprint!("parsed") } + GenState::Merging => { eprint!("merged") } + }; std::io::stdout().flush().ok(); }).save(file.to_string()); } else if cmd == "-s" { @@ -48,18 +55,19 @@ fn main() { println!("{}", result.path); } } else if cmd == "-m" { - if args.len() != 5 { - eprintln!("{} -m ", args.get(0).unwrap()); + if args.len() < 5 { + eprintln!("{} -m index...", args.get(0).unwrap()); return; } - let index1 = args.get(2).unwrap().clone(); - let index2 = args.get(3).unwrap().clone(); - let merged = args.get(4).unwrap().clone(); - let _ = Index::merge( - Index::from_file(index1), - Index::from_file(index2) - ).save(merged); + let merged = args.get(2).unwrap().clone(); + let v : Vec = args.get(3..(args.len())).unwrap().into(); + let indexes = v.iter().map(|s| Index::from_file(s.clone())).collect(); + let _ = Index::merge(indexes, + |p| { + eprint!("\r\x1b[2K{}% merged", p); + } + ).save(merged); } } else { let _ = gui::run(); -- cgit v1.2.3-70-g09d2