aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Reiner <nathan@nathanreiner.xyz>2023-07-08 16:25:49 +0200
committerNathan Reiner <nathan@nathanreiner.xyz>2023-07-08 16:25:49 +0200
commit6e64806b2f9bd77ec98296b4306c615880894a1c (patch)
tree017d77e961037a42d14113acf1a78327cd7182fb
parentae8fad30cd9e76bcba9949095a2cafabb4f1ca8a (diff)
add multithreading to merge and generate
-rw-r--r--src/gui/mod.rs41
-rw-r--r--src/index.rs237
-rw-r--r--src/main.rs30
3 files changed, 214 insertions, 94 deletions
diff --git a/src/gui/mod.rs b/src/gui/mod.rs
index 5677db1..389fdb6 100644
--- a/src/gui/mod.rs
+++ b/src/gui/mod.rs
@@ -1,4 +1,3 @@
-use std::io::Write;
use std::thread;
use std::time::Duration;
@@ -13,12 +12,14 @@ use std::sync::Mutex;
use crate::index::Index;
use crate::searchresult::SearchResult;
use crate::splitter;
+use crate::index::GenState;
mod easing;
mod circular;
static SEARCH_ID: Lazy<text_input::Id> = Lazy::new(text_input::Id::unique);
static GENERATE_PROGRESS : Lazy<Mutex<u8>> = Lazy::new(|| { Mutex::new(0) });
+static GENERATE_STATUS : Lazy<Mutex<GenState>> = Lazy::new(|| { Mutex::new(GenState::Fetching) });
pub fn run() -> iced::Result {
App::run(Settings {
@@ -33,17 +34,12 @@ pub fn run() -> iced::Result {
#[derive(Debug)]
enum App {
StartMenu,
- Generating(GenState),
+ Generating(GenProgress),
Loading,
Search(State),
}
#[derive(Debug, Default)]
-struct GenState {
- progress: u8,
-}
-
-#[derive(Debug, Default)]
struct State {
input_value: String,
index : Index,
@@ -55,6 +51,12 @@ struct SearchState {
index : Index,
}
+#[derive(Debug, Clone, Default)]
+struct GenProgress {
+ state: GenState,
+ progress: u8
+}
+
impl SearchState {
pub async fn search(self) -> Vec<SearchResult> {
let search_args = splitter::split_to_words(self.search.clone());
@@ -67,7 +69,7 @@ enum Message {
Load,
Generate,
Loaded(Index),
- GeneratingUpdate(u8),
+ GeneratingUpdate(GenProgress),
InputChanged(String),
SearchSubmit,
SearchFinished(Vec<SearchResult>)
@@ -100,22 +102,23 @@ async fn generate() -> Index {
let input = input.unwrap();
let input = input.to_str();
let input = input.unwrap();
- let index = Index::generate(input, |counter, nof| {
- let p = ((counter * 100) / nof) as u8;
+ let index = Index::generate(input, |t, p| {
*GENERATE_PROGRESS.lock().unwrap() = p;
- std::io::stdout().flush().ok();
+ *GENERATE_STATUS.lock().unwrap() = t;
});
index.save(file.to_string());
index
}
-async fn generate_update_timer() -> u8 {
+async fn generate_update_timer() -> GenProgress {
thread::sleep(Duration::from_millis(100));
let p;
+ let s;
{
p = *GENERATE_PROGRESS.lock().unwrap();
+ s = *GENERATE_STATUS.lock().unwrap();
}
- p
+ GenProgress { state : s, progress: p }
}
impl Application for App {
@@ -174,8 +177,9 @@ impl Application for App {
*self = App::Search(State { index, ..Default::default() });
Command::none()
}
- Message::GeneratingUpdate(p) => {
- state.progress = p;
+ Message::GeneratingUpdate(s) => {
+ state.progress = s.progress;
+ state.state = s.state;
Command::perform(generate_update_timer(), Message::GeneratingUpdate)
}
_ => {
@@ -225,8 +229,13 @@ impl Application for App {
].spacing(10).align_items(iced::Alignment::Center)
}
App::Generating(state) => {
+ let t = match state.state {
+ GenState::Fetching => { text("Fetching") }
+ GenState::Parsing => { text("Parsing") }
+ GenState::Merging => { text("Merging") }
+ };
column![
- text("Generating"),
+ t,
progress_bar(0.0..=100.0, state.progress as f32)
].spacing(10).align_items(iced::Alignment::Center)
}
diff --git a/src/index.rs b/src/index.rs
index 27bb56b..78df3a1 100644
--- a/src/index.rs
+++ b/src/index.rs
@@ -1,6 +1,7 @@
use std::collections::{HashSet, HashMap};
use std::fs::File;
use std::io::{Write, BufReader, BufRead};
+use std::sync::mpsc::{channel, Sender};
use walkdir::*;
use std::thread;
use std::option::Option::None;
@@ -27,6 +28,14 @@ impl Default for Index {
}
}
+#[derive(Clone, Debug, Default, Copy)]
+pub enum GenState {
+ #[default]
+ Fetching,
+ Parsing,
+ Merging
+}
+
impl Index {
pub fn empty() -> Self {
Self {
@@ -35,58 +44,99 @@ impl Index {
}
}
- pub fn generate(input_path : &str, callback : impl Fn(u64, u64)) -> Self {
- let mut dict = Dictionary::new();
- let mut filecache : Vec<FileCache> = Vec::new();
+ pub fn generate(input_path : &str, callback : impl Fn(GenState, u8)) -> Self {
let mut nof = 0;
let mut counter = 0;
+ let mut crawler_handles = Vec::new();
+ let num_threads = thread::available_parallelism().unwrap().get();
+ let mut tx_vec : Vec<Sender<String>> = Vec::new();
thread::scope(|s| {
let mut nof_handle : Option<_> = Some(s.spawn(|| filecount(input_path)));
- for entry in WalkDir::new(input_path)
- .into_iter()
- .filter_map(|e| e.ok()) {
- counter += 1;
- if entry.path().is_file() {
- let content : String = text::extract_text(entry.path().to_str().unwrap());
+ for _ in 0..num_threads {
+ let (tx, rx) = channel();
+ tx_vec.push(tx);
+ crawler_handles.push(thread::spawn(move || {
+ let mut dict = Dictionary::new();
+ let mut filecache : Vec<FileCache> = Vec::new();
- if content.is_empty() {
- continue
- }
+ loop {
+ let path = rx.recv().unwrap();
+ if path.is_empty() {
+ return Self {
+ dictionary : dict,
+ filecache
+ }
+ }
- let words : Vec<String> = splitter::split_to_words(content);
+ let content : String = text::extract_text(path.as_str());
- for word in words.iter() {
- dict.set(word.clone());
- }
+ if content.is_empty() {
+ continue;
+ }
- let fv = dict.vectorize_word_list(words.clone());
- filecache.push(FileCache {
- path : entry.path().to_str().unwrap().to_string(),
- vector : fv
- });
+ let words : Vec<String> = splitter::split_to_words(content);
+ for word in words.iter() {
+ dict.set(word.clone());
+ }
- }
- match nof_handle {
- Some(t) => {
- nof = t.join().unwrap();
- nof_handle = None;
+ let fv = dict.vectorize_word_list(words.clone());
+ filecache.push(FileCache {
+ path,
+ vector : fv
+ });
}
- None => {
- callback(counter, nof);
+ }));
+ }
+
+ let mut next_crawler = 0;
+ let mut last_p = u64::MAX;
+
+ for entry in WalkDir::new(input_path)
+ .into_iter()
+ .filter_map(|e| e.ok()) {
+ counter += 1;
+ if entry.path().is_file() {
+ tx_vec[next_crawler].send(entry.path().to_str().unwrap().to_string()).ok();
+ next_crawler += 1;
+ if next_crawler == num_threads {
+ next_crawler = 0;
+ }
+
+ match nof_handle {
+ Some(t) => {
+ nof = t.join().unwrap();
+ nof_handle = None;
+ }
+ None => {
+ // Make sure that we only push a update
+ // if there is a visual change to the number
+ // because updating the screen takes a lot
+ // of time.
+ let p = counter * 100 / nof;
+ if p != last_p {
+ callback(GenState::Fetching, p as u8);
+ last_p = p;
+ }
+ }
}
}
}
-
- callback(nof, nof);
});
- Self {
- dictionary : dict,
- filecache
+ let mut indexes = Vec::new();
+ let mut i = 0;
+
+ for handle in crawler_handles {
+ callback(GenState::Parsing, (i * 100 / num_threads) as u8);
+ tx_vec[i].send(String::new()).ok();
+ indexes.push(handle.join().unwrap());
+ i += 1;
}
+
+ Index::merge(indexes, |p| { callback(GenState::Merging, p) })
}
pub fn from_file(path : String) -> Self {
@@ -95,7 +145,6 @@ impl Index {
let mut filecache : Vec<FileCache> = Vec::new();
let mut dict = Dictionary::new();
-
for line in reader.lines() {
let l = line.unwrap();
if l.starts_with('#') {
@@ -111,51 +160,105 @@ impl Index {
}
}
- pub fn merge(a : Index, b : Index) -> Self {
- let mut a_hash : HashSet<FileCache> = HashSet::new();
- let mut diff : Vec<FileCache> = Vec::new();
- let mut dict = a.dictionary.clone();
- let mut filecache = a.filecache.clone();
+ fn merge_two(first : Index, second : Index) -> thread::JoinHandle<Self> {
+ thread::spawn(move || {
+ let (a, b) = if first.filecache.len() < second.filecache.len() {
+ (second, first)
+ } else {
+ (first, second)
+ };
+ let mut filecache = a.filecache.clone();
+ let mut dictionary = Dictionary::default();
- for file in a.filecache.iter() {
- a_hash.insert(file.clone());
- }
+ thread::scope(|s| {
+ let mut a_hash : HashSet<FileCache> = HashSet::new();
+ let mut diff : Vec<FileCache> = Vec::new();
- for file in b.filecache.iter() {
- if !a_hash.contains(file) {
- diff.push(file.clone());
- }
- }
+ let converter_handle = s.spawn(|| {
+ let mut b_id_to_word : HashMap<u64, String> = HashMap::new();
- for (word, _) in b.dictionary.iter() {
- dict.set(word.clone());
- }
+ for (value, id) in b.dictionary.iter() {
+ b_id_to_word.insert(id.clone(), value.clone());
+ }
+ b_id_to_word
+ });
- let mut b_id_to_word : HashMap<u64, String> = HashMap::new();
+ let dict_handle = s.spawn(|| {
+ let mut dict = a.dictionary.clone();
+ for (word, _) in b.dictionary.iter() {
+ dict.set(word.clone());
+ }
+ dict
+ });
- for (value, id) in b.dictionary.iter() {
- b_id_to_word.insert(*id, value.clone());
- }
+ for file in a.filecache.iter() {
+ a_hash.insert(file.clone());
+ }
+
+ for file in b.filecache.iter() {
+ if !a_hash.contains(file) {
+ diff.push(file.clone());
+ }
+ }
+
+ let b_id_to_word = converter_handle.join().unwrap();
+ dictionary = dict_handle.join().unwrap();
+
+ for file in diff {
+ let mut words = Vec::new();
+
+ for (word_id, i) in file.vector.iter() {
+ for _ in 0..*i {
+ words.push(b_id_to_word.get(word_id).unwrap().clone());
+ }
+ }
- for file in diff {
- let mut words = Vec::new();
+ filecache.push(FileCache {
+ path : file.path.clone(),
+ vector: dictionary.vectorize_word_list(words)
+ });
+ }
+ });
+
+ Self {
+ dictionary,
+ filecache
+ }
+ })
+ }
+
+ pub fn merge(indexes : Vec<Index>, callback : impl Fn(u8)) -> Self {
+ let mut idxs : Vec<Index> = indexes.clone();
+ let max = (idxs.len() as f32).log2().ceil() as u32;
+ let mut i = 0 as u32;
- for (word_id, i) in file.vector.iter() {
- for _ in 0..*i {
- words.push(b_id_to_word.get(word_id).unwrap().clone());
+ while idxs.len() > 1 {
+ callback((i * 100 / max) as u8);
+ i += 1;
+ let mut idxs_handle = Vec::new();
+ let mut processed = Vec::new();
+
+ for chunk in idxs.chunks(2) {
+ if chunk.len() == 2 {
+ let a = chunk[0].clone();
+ let b = chunk[1].clone();
+ idxs_handle.push(Index::merge_two(a, b));
+ } else {
+ for idx in chunk.iter() {
+ processed.push(idx.clone())
+ }
}
}
- filecache.push(FileCache {
- path : file.path.clone(),
- vector: dict.vectorize_word_list(words)
- });
- }
+ for idx_handle in idxs_handle {
+ let idx : Index = idx_handle.join().unwrap();
+ processed.push(idx)
+ }
- Self {
- dictionary: dict,
- filecache
+ idxs = processed;
}
+
+ idxs.get(0).unwrap().clone()
}
pub fn search(&self, search_args : Vec<String>) -> Vec<SearchResult> {
diff --git a/src/main.rs b/src/main.rs
index 8cb8466..9a18e7e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,4 @@
+#![feature(thread_id_value)]
pub mod vector;
pub mod dictionary;
pub mod text;
@@ -12,6 +13,7 @@ pub mod gui;
use index::Index;
use std::io::*;
use std::env;
+use crate::index::GenState;
fn main() {
let args: Vec<_> = env::args().collect();
@@ -27,8 +29,13 @@ fn main() {
let input = args.get(2).unwrap();
let file = args.get(3).unwrap();
- let _ = Index::generate(input, |counter, nof| {
- eprint!("\r\x1b[2K{} of {} files indexed ({}%)", counter, nof, (counter * 100) / nof);
+ let _ = Index::generate(input, |t, p| {
+ eprint!("\r\x1b[2K{}% ", p);
+ match t {
+ GenState::Fetching => { eprint!("fetched") }
+ GenState::Parsing => { eprint!("parsed") }
+ GenState::Merging => { eprint!("merged") }
+ };
std::io::stdout().flush().ok();
}).save(file.to_string());
} else if cmd == "-s" {
@@ -48,18 +55,19 @@ fn main() {
println!("{}", result.path);
}
} else if cmd == "-m" {
- if args.len() != 5 {
- eprintln!("{} -m <index1> <index2> <merged index>", args.get(0).unwrap());
+ if args.len() < 5 {
+ eprintln!("{} -m <output> index...", args.get(0).unwrap());
return;
}
- let index1 = args.get(2).unwrap().clone();
- let index2 = args.get(3).unwrap().clone();
- let merged = args.get(4).unwrap().clone();
- let _ = Index::merge(
- Index::from_file(index1),
- Index::from_file(index2)
- ).save(merged);
+ let merged = args.get(2).unwrap().clone();
+ let v : Vec<String> = args.get(3..(args.len())).unwrap().into();
+ let indexes = v.iter().map(|s| Index::from_file(s.clone())).collect();
+ let _ = Index::merge(indexes,
+ |p| {
+ eprint!("\r\x1b[2K{}% merged", p);
+ }
+ ).save(merged);
}
} else {
let _ = gui::run();