For managing web proxies across different projects/processes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

216 lines
5.5 KiB

use rand::prelude::StdRng;
use rand::seq::SliceRandom;
use rand::{thread_rng, SeedableRng};
use serde::Deserialize;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{cmp, fs};
use tokio::time;
use warp::{http, Filter, Reply};
const ONE_YEAR: Duration = Duration::from_secs(60 * 60 * 24 * 365);
#[derive(Deserialize, Debug)]
struct WebsiteConfig {
// May add more here in the future
delay: u32, // seconds
blacklist_minutes: u32,
}
#[derive(Deserialize, Debug)]
struct Config {
proxies: Vec<String>,
websites: HashMap<String, WebsiteConfig>,
}
#[derive(Clone)]
struct Website {
proxies: HashMap<String, Instant>,
index: usize,
delay: u32,
blacklist_minutes: u32,
}
#[derive(Clone)]
struct ServerState {
websites: HashMap<String, Arc<Mutex<Website>>>,
proxies: Vec<String>,
rng: Arc<Mutex<StdRng>>,
}
#[derive(Deserialize)]
struct BlacklistBody {
proxy: String,
}
#[tokio::main]
async fn main() {
let mut websites: HashMap<String, Arc<Mutex<Website>>> = HashMap::new();
let (filename, file_contents) = read_conf_file();
let config: Config = toml::from_str(&file_contents).unwrap();
// Validate the proxies
if config.proxies.is_empty() {
panic!("No proxies were specified. Edit {} to fix this.", filename);
}
// Validate the websites
if config.websites.is_empty() {
panic!("No websites were specified. Edit {} to fix this.", filename);
}
// Populate the proxies
for (site, site_conf) in &config.websites {
let mut hsh: HashMap<String, Instant> = HashMap::new();
for proxy in &config.proxies {
// Q: Why are we adding the delay time here?
// A: So that if the server restarts, we don't immediately start handing out proxies
// which were just used a second ago (causing the rate limit to be exceeded)
hsh.insert(
proxy.clone(),
Instant::now() + Duration::from_secs(site_conf.delay.into()),
);
}
websites.insert(
site.clone(),
Arc::new(Mutex::new(Website {
proxies: hsh,
index: 0,
delay: config.websites[site].delay,
blacklist_minutes: config.websites[site].blacklist_minutes,
})),
);
}
let state = Arc::new(RwLock::new(ServerState {
websites,
proxies: config.proxies,
rng: Arc::new(Mutex::new(StdRng::from_rng(thread_rng()).unwrap())),
}));
let get_proxy = {
let state = state.clone();
warp::get()
.and(warp::path!("v1" / String))
.and(warp::any().map(move || state.clone()))
.and_then(get_proxy_fn)
};
let blacklist_proxy = warp::post()
.and(warp::path!("v1" / String))
.and(warp::any().map(move || state.clone()))
.and(warp::body::content_length_limit(1024 * 16))
.and(warp::body::json())
.and_then(blacklist_proxy_fn);
warp::serve(get_proxy.or(blacklist_proxy))
.run(([0, 0, 0, 0], 3030))
.await;
}
async fn get_proxy_fn(
website_str: String,
state: Arc<RwLock<ServerState>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let (proxy_address, proxy_time) = {
let state = state.read().unwrap();
match state.websites.get(&website_str) {
Some(x) => {
let mut website = x.lock().unwrap();
// TODO Is there a better way to do this?
let mut proxies: Vec<(String, Instant)> = website
.proxies
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
//for _ in 0..100 {
proxies.shuffle(state.rng.lock().unwrap().deref_mut());
//}
println!("{:?}", proxies);
let mut min_proxy_address: String = "".to_string();
let mut min_time = Instant::now() + ONE_YEAR;
for (proxy_address, proxy_time) in &proxies {
if proxy_time < &min_time {
min_time = *proxy_time;
min_proxy_address = proxy_address.to_string();
if min_time < Instant::now() {
// We've found a proxy with no delay. Might as well use it
break;
}
}
}
let dur = Duration::from_secs(website.delay.into());
*website.proxies.get_mut(&min_proxy_address).unwrap() =
cmp::max(min_time, Instant::now()) + dur;
(min_proxy_address, min_time)
}
None => {
return Ok(warp::reply::with_status(
"Website not found",
http::status::StatusCode::NOT_FOUND,
)
.into_response())
}
}
};
time::delay_until(proxy_time.into()).await; // Wait until proxy is ready
Ok(proxy_address.into_response())
}
// Marks a proxy as blacklisted(not working, blocked by website, etc.)
// Will auto-un-blacklist after 15 minutes
async fn blacklist_proxy_fn(
website_str: String,
state: Arc<RwLock<ServerState>>,
body: BlacklistBody,
) -> Result<impl warp::Reply, warp::Rejection> {
let state = state.read().unwrap();
match state.websites.get(&website_str) {
Some(x) => {
let mut website = x.lock().unwrap();
let blacklist_minutes = website.blacklist_minutes as u64;
match website.proxies.get_mut(&body.proxy) {
Some(y) => {
*y = Instant::now() + Duration::from_secs(60 * blacklist_minutes);
Ok(
warp::reply::with_status("Success", http::status::StatusCode::OK)
.into_response(),
)
}
None => Ok(warp::reply::with_status(
"Proxy not found",
http::status::StatusCode::NOT_FOUND,
)
.into_response()),
}
}
None => Ok(warp::reply::with_status(
"Website not found",
http::status::StatusCode::NOT_FOUND,
)
.into_response()),
}
}
fn read_conf_file() -> (String, String) {
let filenames = ["config.toml", "/etc/proxy_manager/config.toml"];
for filename in filenames.iter() {
if let Ok(x) = fs::read_to_string(filename) {
return (filename.to_string(), x);
}
}
panic!("Could not open config file");
}