Browse Source

better prototype

pull/1/head
Stephen 6 months ago
parent
commit
09e76e4c80
3 changed files with 118 additions and 65 deletions
  1. +12
    -12
      Cargo.lock
  2. +1
    -1
      Cargo.toml
  3. +105
    -52
      src/main.rs

+ 12
- 12
Cargo.lock View File

@ -1,18 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "ProxyManager"
version = "0.1.0"
dependencies = [
"chrono",
"rand 0.7.3",
"serde",
"tokio",
"toml",
"warp",
]
[[package]]
name = "autocfg"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@ -715,6 +703,18 @@ dependencies = [
]
[[package]]
name = "proxy_manager"
version = "0.1.0"
dependencies = [
"chrono",
"rand 0.7.3",
"serde",
"tokio",
"toml",
"warp",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"


+ 1
- 1
Cargo.toml View File

@ -1,5 +1,5 @@
[package]
name = "ProxyManager"
name = "proxy_manager"
version = "0.1.0"
authors = ["Stephen D"]
edition = "2018"


+ 105
- 52
src/main.rs View File

@ -1,11 +1,15 @@
use std::collections::HashMap;
use serde::Deserialize;
use std::{fs, thread};
use chrono::{DateTime, Utc, Duration};
use std::sync::{Arc, Mutex};
use std::{fs, cmp};
use std::sync::{Arc, Mutex, RwLock};
use warp::{http, Filter, Reply};
use rand::seq::SliceRandom;
use tokio::time;
use std::time::{Instant, Duration};
use rand::seq::SliceRandom;
use rand::thread_rng;
const BLACKLIST_DELAY_TIME_MINS: u64 = 15;
const ONE_YEAR: Duration = Duration::from_secs(60 * 60 * 24 * 365);
#[derive(Deserialize, Debug)]
struct WebsiteConfig { // May add more here in the future
@ -18,20 +22,22 @@ struct Config {
websites: HashMap<String, WebsiteConfig>
}
#[derive(Clone)]
struct Website {
name: String,
proxies: HashMap<String, Instant>,
index: usize,
delay: u32
}
#[derive(Clone)]
struct ServerState {
// < proxy url, < site name, time which this proxy is valid after> >
proxies: HashMap<String, HashMap<String, DateTime<Utc>>>,
websites: HashMap<String, u32>
websites: HashMap<String, Arc<Mutex<Website>>>,
proxies: Vec<String>
}
#[tokio::main]
async fn main() {
let mut proxies: HashMap<String, HashMap<String, DateTime<Utc>>> = HashMap::new();
let mut websites: HashMap<String, Arc<Mutex<Website>>> = HashMap::new();
// TODO don't hard-code the file name
let config: Config = toml::from_str(
@ -39,71 +45,118 @@ async fn main() {
).unwrap();
// Populate the proxies
for proxy in &config.proxies {
let mut hsh = HashMap::new();
for (site, site_conf) in &config.websites {
for (site, site_conf) in &config.websites {
let mut hsh: HashMap<String, Instant> = HashMap::new();
for proxy in &config.proxies {
// Q: Why are we adding the delay time here?
// A: So that if the server restarts, we don't immediately start handing out proxies
// which were just used a second ago (causing the rate limit to be exceeded)
hsh.insert(site.clone(), Utc::now() + Duration::seconds(site_conf.delay.into()));
hsh.insert(proxy.clone(),
Instant::now() + Duration::from_secs(site_conf.delay.into()));
}
proxies.insert(proxy.clone(), hsh);
websites.insert(site.clone(), Arc::new(Mutex::new(Website {
proxies: hsh,
index: 0,
delay: config.websites[site].delay
})));
}
let mut websites_arr: HashMap<String, u32> = config.websites.into_iter()
.map(|(k, v)| { (k, v.delay) })
.collect();
let state = Arc::new(Mutex::new(ServerState {
proxies, websites: websites_arr
let state = Arc::new(RwLock::new(ServerState {
websites, proxies: config.proxies
}));
let get_proxy = warp::get()
.and(warp::path!("v1" / String))
let get_proxy = {
let state = state.clone();
warp::get()
.and(warp::path!("v1" / String))
.and(warp::any().map(move || state.clone()))
.and_then(get_proxy_fn)
};
let blacklist_proxy = warp::post()
.and(warp::path!("v1" / String / String))
.and(warp::any().map(move || state.clone()))
.and_then(get_proxy_fn);
.and_then(blacklist_proxy_fn);
warp::serve(get_proxy)
warp::serve(get_proxy.or(blacklist_proxy))
.run(([0, 0, 0, 0], 3030))
.await;
}
async fn get_proxy_fn(website: String, state: Arc<Mutex<ServerState>>)
async fn get_proxy_fn(website_str: String, state: Arc<RwLock<ServerState>>)
-> Result<impl warp::Reply, warp::Rejection> {
let mut state = state.lock().unwrap();
// Make sure that the website actually exists
if !state.websites.contains_key(&website) {
return Ok(warp::reply::with_status(
"Given website not found",
http::StatusCode::NOT_FOUND).into_response()
);
}
let proxy = {
let proxies_keys: Vec<String> = state.proxies.keys().map(|x| x.clone()).collect();
let mut proxy: &String;
loop {
proxy = proxies_keys.choose(&mut rand::thread_rng()).unwrap();
if state.proxies[proxy][&website] - Utc::now() < Duration::zero() {
break;
let (proxy_address, proxy_time) = {
let state = state.read().unwrap();
match state.websites.get(&website_str) {
Some(x) => {
let mut website = x.lock().unwrap();
// TODO Is there a better way to do this?
let mut proxies: Vec<(String, Instant)> = website.proxies.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
proxies.shuffle(&mut thread_rng());
let mut min_proxy_address: String = "".to_string();
let mut min_time = Instant::now() + ONE_YEAR;
for (proxy_address, proxy_time) in &proxies {
if proxy_time < &min_time {
min_time = *proxy_time;
min_proxy_address = proxy_address.to_string();
if min_time < Instant::now() {
// We've found a proxy with no delay. Might as well use it
break;
}
}
};
let dur = Duration::from_secs(website.delay.into());
*website.proxies.get_mut(&min_proxy_address).unwrap() =
cmp::max(min_time, Instant::now()) + dur;
(min_proxy_address, min_time)
}
None => {
return Ok(
warp::reply::with_status("Website not found",
http::status::StatusCode::NOT_FOUND)
.into_response()
)
}
// This blocks all threads!
// However, that's okay
thread::sleep(time::Duration::from_millis(10));
}
proxy.clone()
};
// Mark proxy as just used
let delay = state.websites[&website].into();
state.proxies.get_mut(&proxy).unwrap().insert(website.clone(), Utc::now() + Duration::seconds(delay));
time::delay_until(proxy_time.into()).await; // Wait until proxy is ready
Ok((proxy).into_response())
Ok(warp::reply::json(&proxy_address).into_response())
}
// Marks a proxy as blacklisted(not working, blocked by website, etc.)
// Will auto-unblacklist after 15 minutes
async fn blacklist_proxy_fn(website: String, state: ServerState) {
async fn blacklist_proxy_fn(website_str: String, proxy: String, state: Arc<RwLock<ServerState>>)
-> Result<impl warp::Reply, warp::Rejection> {
let state = state.read().unwrap();
match state.websites.get(&website_str) {
Some(x) => {
match x.lock().unwrap().proxies.get_mut(&proxy) {
Some(y) => {
*y = Instant::now() + Duration::from_secs(60 * BLACKLIST_DELAY_TIME_MINS);
return Ok(warp::reply::with_status("Success",
http::status::StatusCode::OK)
.into_response())
}
None => {
return Ok(warp::reply::with_status("Proxy not found",
http::status::StatusCode::NOT_FOUND)
.into_response())
}
}
}
None => {
return Ok(warp::reply::with_status("Website not found",
http::status::StatusCode::NOT_FOUND)
.into_response())
}
}
}

Loading…
Cancel
Save