Browse Source

Move to produer/consumer in wiki2graph

master
Stephen 2 months ago
parent
commit
8884ad2ef6
4 changed files with 95 additions and 68 deletions
  1. +2
    -0
      Cargo.lock
  2. +2
    -1
      Cargo.toml
  3. +89
    -65
      src/bin/wiki2graph.rs
  4. +2
    -2
      src/process.rs

+ 2
- 0
Cargo.lock View File

@ -290,8 +290,10 @@ name = "wikigraph"
version = "0.1.0"
dependencies = [
"bincode",
"crossbeam-queue",
"fnv",
"lazy_static",
"num_cpus",
"rayon",
"regex",
"serde",


+ 2
- 1
Cargo.toml View File

@ -13,4 +13,5 @@ regex = "1.3"
lazy_static = "1.4"
bincode = "1.3"
fnv = "1.0"
crossbeam-queue = "0.2"
num_cpus = "1.13"

+ 89
- 65
src/bin/wiki2graph.rs View File

@ -1,14 +1,17 @@
extern crate wikigraph;
use crossbeam_queue::{ArrayQueue, PushError};
use wikigraph::process::{process_graph, save_graph, GraphIntermediate};
use wikigraph::wiki::WikiPage;
use rayon::ThreadPoolBuilder;
use std::env;
use std::fs::File;
use std::io::BufReader;
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use xml::reader::{EventReader, XmlEvent};
@ -41,78 +44,99 @@ fn main() {
let file = BufReader::new(file);
let parser = EventReader::new(file);
let pool = ThreadPoolBuilder::new().build().unwrap();
let graph = Arc::new(Mutex::new(GraphIntermediate::new()));
let job_queue = Arc::new(ArrayQueue::new(num_cpus::get() * 10));
pool.install(|| {
rayon::scope(|scope| {
let mut state: FileState = FileState::Root;
let mut cur_page = WikiPage::new();
for e in parser {
// Keeps the job queue from overflowing
// Amazingly, Rayon has absolutely no way of dealing with this
// There isn't even a method to get the current number of jobs in the queue ):
while pool.current_thread_has_pending_tasks().unwrap() {}
match e {
Ok(XmlEvent::StartElement { name, .. }) => {
let name = name.local_name;
// println!("{}", name);
state = match (&state, name.as_str()) {
(FileState::Root, "sitename") => FileState::Title,
(FileState::Root, "page") => FileState::Page,
(FileState::Page, "title") => FileState::PageTitle,
(FileState::Page, "text") => FileState::PageBody,
_ => state,
}
// Consumer
for _ in 0..num_cpus::get() {
let job_queue = job_queue.clone();
let graph = graph.clone();
thread::spawn(move || loop {
let page = loop {
match job_queue.pop() {
Ok(x) => break x,
Err(_) => sleep(Duration::from_millis(1)),
}
};
process_graph(&graph, page);
});
}
// Producer
let mut state: FileState = FileState::Root;
let mut cur_page = WikiPage::new();
for e in parser {
match e {
Ok(XmlEvent::StartElement { name, .. }) => {
let name = name.local_name;
// println!("{}", name);
state = match (&state, name.as_str()) {
(FileState::Root, "sitename") => FileState::Title,
(FileState::Root, "page") => FileState::Page,
(FileState::Page, "title") => FileState::PageTitle,
(FileState::Page, "text") => FileState::PageBody,
_ => state,
}
}
Ok(XmlEvent::EndElement { name, .. }) => {
let name = name.local_name;
match (&state, name.as_str()) {
(FileState::Title, "sitename") => {
state = FileState::Root;
}
Ok(XmlEvent::EndElement { name, .. }) => {
let name = name.local_name;
match (&state, name.as_str()) {
(FileState::Title, "sitename") => {
state = FileState::Root;
}
(FileState::Page, "page") => {
state = FileState::Root;
// Process page
scope.spawn(|_| {
process_graph(graph.clone(), cur_page);
});
// Reset page
cur_page = WikiPage::new();
}
(FileState::PageTitle, "title") => {
state = FileState::Page;
}
(FileState::PageBody, "text") => {
state = FileState::Page;
// Freeze text so that we don't accidentally combine revisions
cur_page.frozen = true;
}
_ => {}
(FileState::Page, "page") => {
state = FileState::Root;
// Add to job queue
loop {
cur_page = match job_queue.push(cur_page) {
Ok(_) => break,
Err(PushError(x)) => x,
};
sleep(Duration::from_millis(1));
}
// Reset page
cur_page = WikiPage::new();
}
Ok(XmlEvent::Characters(data)) => match state {
FileState::Title => {
println!("Title: {}", data);
}
FileState::PageTitle => {
cur_page.title += &data;
}
FileState::PageBody => {
cur_page.add_content(&data);
}
_ => {}
},
Err(e) => {
println!("Error: {}", e);
break;
(FileState::PageTitle, "title") => {
state = FileState::Page;
}
(FileState::PageBody, "text") => {
state = FileState::Page;
// Freeze text so that we don't accidentally combine revisions
cur_page.frozen = true;
}
_ => {}
}
}
});
});
Ok(XmlEvent::Characters(data)) => match state {
FileState::Title => {
println!("Title: {}", data);
}
FileState::PageTitle => {
cur_page.title += &data;
}
FileState::PageBody => {
cur_page.add_content(&data);
}
_ => {}
},
Err(e) => {
println!("Error: {}", e);
break;
}
_ => {}
}
}
// Wait for queue to empty
while job_queue.is_full() {
sleep(Duration::from_millis(100));
}
println!("Done! Saving...");
if let Ok(graph) = Arc::try_unwrap(graph) {
save_graph(graph.into_inner().unwrap(), output_filename);


+ 2
- 2
src/process.rs View File

@ -70,7 +70,7 @@ impl Default for GraphIntermediate {
}
}
pub fn process_graph(graph: Arc<Mutex<GraphIntermediate>>, page: WikiPage) {
pub fn process_graph(graph: &Arc<Mutex<GraphIntermediate>>, page: WikiPage) {
let links = extract_links(&page.body);
let mut graph = graph.lock().unwrap();
links.iter().for_each(|(target, weight)| {
@ -107,7 +107,7 @@ pub fn save_graph(graph_og: GraphIntermediate, filename: &str) {
graph_og.edges,
);
std::mem::drop(graph_og.nodes); // Save some RAM
let mut buffer = BufWriter::new(File::create(filename).unwrap());
/* BINCODE */


Loading…
Cancel
Save