Browse Source

first commit

master
Stephen 4 months ago
commit
19d682eadd
3 changed files with 279 additions and 0 deletions
  1. +1
    -0
      .gitignore
  2. +11
    -0
      Cargo.toml
  3. +267
    -0
      src/main.rs

+ 1
- 0
.gitignore View File

@ -0,0 +1 @@
/target

+ 11
- 0
Cargo.toml View File

@ -0,0 +1,11 @@
[package]
name = "telnet_chat_roulette"
version = "0.1.0"
authors = ["Stephen <webmaster@scd31.com>"]
edition = "2018"
[dependencies]
tokio = { version = "0.2", features = ["full"] }
tokio-util = { version = "0.3.0", features = ["full"] }
futures = "0.3.0"
colored = "1.9"

+ 267
- 0
src/main.rs View File

@ -0,0 +1,267 @@
use std::net::SocketAddr;
use std::collections::HashMap;
use std::sync::Arc;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::error::Error;
use std::str;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::sync::{mpsc, Mutex};
use tokio::stream::{Stream, StreamExt};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
use futures::SinkExt;
extern crate colored;
use colored::*;
// Channel stuff
type Tx = mpsc::UnboundedSender<String>;
type Rx = mpsc::UnboundedReceiver<String>;
enum Message {
Send(String), // Send to client
Receive(String) // Receive from client
}
#[derive(Debug)]
struct StateClient {
tx: Tx,
partner: Option<SocketAddr>,
username: String,
}
impl StateClient {
fn new(tx: Tx, username: String) -> Self {
Self {
tx,
username,
partner: None,
}
}
fn send(&mut self, msg: String) -> Result<(), tokio::sync::mpsc::error::SendError<String>> {
self.tx.send(msg)
}
fn send_status(&mut self, msg: String) -> Result<(), tokio::sync::mpsc::error::SendError<String>> {
self.tx.send(format!("{}", msg.yellow()))
}
fn send_success(&mut self, msg: String) -> Result<(), tokio::sync::mpsc::error::SendError<String>> {
self.tx.send(format!("{}", msg.bright_green()))
}
}
#[derive(Debug)]
struct Shared
{
clients: HashMap<SocketAddr, StateClient>,
single: Option<SocketAddr>,
}
impl Shared {
fn new() -> Self {
Shared { clients: HashMap::new(), single: None }
}
async fn disconnect(&mut self, addr: &SocketAddr) {
if self.single == Some(*addr) {
self.single = None;
}
if let Some(partner_addr) = self.clients.get_mut(addr).unwrap().partner {
{
let partner = self.clients.get_mut(&partner_addr).unwrap();
partner.send(format!("{} {}", partner.username.yellow(), "disconnected.".yellow())).unwrap();
}
self.find_partner(&partner_addr).await;
}
self.clients.remove(addr);
}
async fn send_partner(&mut self, addr: SocketAddr, msg: &str) {
if let Some(x) = self.clients[&addr].partner {
if let Some(y) = self.clients.get_mut(&x) {
if y.send(msg.into()).is_err() {
self.disconnect(&x).await;
}
}
else {
self.disconnect(&x).await;
}
}
}
async fn find_partner(&mut self, addr: &SocketAddr) {
{
let client = self.clients.get_mut(addr).unwrap();
client.partner = None;
client.send_status("Waiting for another person...".to_owned()).unwrap();
}
match self.single {
Some(x) => {
if x == *addr { return; } // don't match with ourselves
let our_username = self.clients.get_mut(addr).unwrap().username.clone();
let their_username = self.clients.get_mut(&x).unwrap().username.clone();
{
let client = self.clients.get_mut(addr).unwrap();
client.partner = Some(x);
client.send_success(format!("You are now connected to {}.", their_username)).unwrap();
}
{
let partner = self.clients.get_mut(&x).unwrap();
partner.partner = Some(*addr);
partner.send_success(format!("You are now connected to {}.", our_username)).unwrap();
}
self.single = None;
}
None => {
self.single = Some(addr.to_owned());
}
}
}
}
struct Client {
lines: Framed<TcpStream, LinesCodec>,
rx: Rx,
}
impl Client {
async fn new(state: Arc<Mutex<Shared>>, lines: Framed<TcpStream, LinesCodec>, username: String) -> io::Result<Client> {
let addr = lines.get_ref().peer_addr()?;
let (tx, rx) = mpsc::unbounded_channel();
state.lock().await.clients.insert(addr, StateClient::new(tx, username));
Ok(Client { lines, rx })
}
}
impl Stream for Client {
type Item = Result<Message, LinesCodecError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
return Poll::Ready(Some(Ok(Message::Send(v))));
}
let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));
Poll::Ready(
match result {
Some(Ok(message)) => Some(Ok(Message::Receive(message))),
Some(Err(e)) => Some(Err(e)),
None => None
}
)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
colored::control::SHOULD_COLORIZE.set_override(true);
let mut listener = TcpListener::bind("0.0.0.0:7878").await.unwrap();
let state = Arc::new(Mutex::new(Shared::new()));
loop {
let (socket, addr) = match listener.accept().await {
Ok(x) => x,
Err(_) => continue
};
let state = Arc::clone(&state);
let state2 = Arc::clone(&state);
tokio::spawn(async move {
if let Err(e) = process(state, socket, addr).await {
println!("Stop that! {}", e);
}
// Client disconnected
{
let mut state = state2.lock().await;
state.disconnect(&addr).await;
//println!("goodbye");
}
});
}
}
async fn process(state: Arc<Mutex<Shared>>, socket: TcpStream, addr: SocketAddr) -> Result<(), Box<dyn Error>> {
let mut lines = Framed::new(socket, LinesCodec::new());
motd(state.clone(), &mut lines).await?;
lines.send(&format!("{}", "Please enter a username: \r".bright_blue())).await?;
let username = match lines.next().await {
Some(Ok(x)) => x,
_ => return Ok(())
};
process_with_username(state.clone(), addr, lines, &username).await?;
Ok(())
}
async fn motd(
state: Arc<Mutex<Shared>>,
lines: &mut Framed<TcpStream,LinesCodec>
) -> Result<(), Box<dyn Error>> {
lines.send("TELNET CHAT ROULETTE\r".yellow().to_string()).await?;
lines.send(format!("{} {}\r", state.lock().await.clients.len(), "users online".yellow().to_string())).await?;
lines.send("Type /quit to quit\r".yellow().to_string()).await?;
Ok(())
}
async fn process_with_username(state: Arc<Mutex<Shared>>,
addr: SocketAddr,
lines: Framed<TcpStream, LinesCodec>,
username: &str) -> Result<(), Box<dyn Error>> {
let mut client = Client::new(state.clone(), lines, username.to_string()).await?;
println!("New connection from {}", addr);
state.lock().await.find_partner(&addr).await;
while let Some(result) = client.next().await {
match result {
//Message received from client
Ok(Message::Receive(msg)) => {
if msg.trim() == "/quit" {
client.lines.send(format!("{}", "Goodbye!".yellow())).await?;
return Ok(());
}
if !msg.is_empty() {
state.lock().await.send_partner(
addr,
&format!("<{}> {}", username.red().to_string(), msg.clone())
).await;
}
}
//Send message to client
Ok(Message::Send(msg)) => {
client.lines.send(&msg).await?;
}
Err(e) => {
println!(
"an error occurred while processing messages for {}; error = {:?}", addr, e
);
}
}
}
Ok(())
}

Loading…
Cancel
Save