diff --git a/server/src/main.rs b/server/src/main.rs index 72bacf323..5e9a1dae8 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,194 +20,194 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// Entry point for our route fn chat_route( - req: HttpRequest, - stream: web::Payload, - chat_server: web::Data>, -) -> Result { - ws::start( - WSSession { - cs_addr: chat_server.get_ref().to_owned(), - id: 0, - hb: Instant::now(), - ip: req - .connection_info() - .remote() - .unwrap_or("127.0.0.1:12345") - .split(":") - .next() - .unwrap_or("127.0.0.1") - .to_string(), - }, - &req, - stream, + req: HttpRequest, + stream: web::Payload, + chat_server: web::Data>, + ) -> Result { + ws::start( + WSSession { + cs_addr: chat_server.get_ref().to_owned(), + id: 0, + hb: Instant::now(), + ip: req + .connection_info() + .remote() + .unwrap_or("127.0.0.1:12345") + .split(":") + .next() + .unwrap_or("127.0.0.1") + .to_string(), + }, + &req, + stream, ) } struct WSSession { - cs_addr: Addr, - /// unique session id - id: usize, - ip: String, - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), - /// otherwise we drop connection. - hb: Instant, + cs_addr: Addr, + /// unique session id + id: usize, + ip: String, + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + hb: Instant, } impl Actor for WSSession { - type Context = ws::WebsocketContext; + type Context = ws::WebsocketContext; - /// Method is called on actor start. - /// We register ws session with ChatServer - fn started(&mut self, ctx: &mut Self::Context) { - // we'll start heartbeat process on session start. - self.hb(ctx); + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); - // register self in chat server. `AsyncContext::wait` register - // future within context, but context waits until this future resolves - // before processing any other events. - // across all routes within application - let addr = ctx.address(); - self.cs_addr - .send(Connect { - addr: addr.recipient(), - ip: self.ip.to_owned(), - }) - .into_actor(self) - .then(|res, act, ctx| { - match res { - Ok(res) => act.id = res, - // something is wrong with chat server - _ => ctx.stop(), - } - fut::ok(()) - }) - .wait(ctx); - } + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // across all routes within application + let addr = ctx.address(); + self.cs_addr + .send(Connect { + addr: addr.recipient(), + ip: self.ip.to_owned(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }) + .wait(ctx); + } - fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { - // notify chat server - self.cs_addr.do_send(Disconnect { - id: self.id, - ip: self.ip.to_owned(), - }); - Running::Stop - } + fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { + // notify chat server + self.cs_addr.do_send(Disconnect { + id: self.id, + ip: self.ip.to_owned(), + }); + Running::Stop + } } /// Handle messages from chat server, we simply send it to peer websocket /// These are room messages, IE sent to others in the room impl Handler for WSSession { - type Result = (); + type Result = (); - fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { - // println!("id: {} msg: {}", self.id, msg.0); - ctx.text(msg.0); - } + fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { + // println!("id: {} msg: {}", self.id, msg.0); + ctx.text(msg.0); + } } /// WebSocket message handler impl StreamHandler for WSSession { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); - match msg { - ws::Message::Ping(msg) => { - self.hb = Instant::now(); - ctx.pong(&msg); - } - ws::Message::Pong(_) => { - self.hb = Instant::now(); - } - ws::Message::Text(text) => { - let m = text.trim().to_owned(); - println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } + ws::Message::Text(text) => { + let m = text.trim().to_owned(); + println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); - self.cs_addr - .send(StandardMessage { - id: self.id, - msg: m, - }) - .into_actor(self) - .then(|res, _, ctx| { - match res { - Ok(res) => ctx.text(res), - Err(e) => { - eprintln!("{}", &e); - } - } - fut::ok(()) - }) - .wait(ctx); + self.cs_addr + .send(StandardMessage { + id: self.id, + msg: m, + }) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(res) => ctx.text(res), + Err(e) => { + eprintln!("{}", &e); + } } - ws::Message::Binary(_bin) => println!("Unexpected binary"), - ws::Message::Close(_) => { - ctx.stop(); - } - _ => {} - } + fut::ok(()) + }) + .wait(ctx); + } + ws::Message::Binary(_bin) => println!("Unexpected binary"), + ws::Message::Close(_) => { + ctx.stop(); + } + _ => {} } + } } impl WSSession { - /// helper method that sends ping to client every second. - /// - /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // heartbeat timed out - println!("Websocket Client heartbeat failed, disconnecting!"); + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); - // notify chat server - act.cs_addr.do_send(Disconnect { - id: act.id, - ip: act.ip.to_owned(), - }); - - // stop actor - ctx.stop(); - - // don't try to send a ping - return; - } - - ctx.ping(""); + // notify chat server + act.cs_addr.do_send(Disconnect { + id: act.id, + ip: act.ip.to_owned(), }); - } + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(""); + }); + } } fn main() { - let _ = env_logger::init(); - let sys = actix::System::new("lemmy"); + let _ = env_logger::init(); + let sys = actix::System::new("lemmy"); - // Run the migrations from code - let conn = establish_connection(); - embedded_migrations::run(&conn).unwrap(); + // Run the migrations from code + let conn = establish_connection(); + embedded_migrations::run(&conn).unwrap(); - // Start chat server actor in separate thread - let server = ChatServer::default().start(); - // Create Http server with websocket support - HttpServer::new(move || { - App::new() - .data(server.clone()) - .service(web::resource("/api/v1/ws").to(chat_route)) - // .service(web::resource("/api/v1/rest").route(web::post().to(||{}))) - .service(web::resource("/").to(index)) - // static resources - .service(actix_files::Files::new("/static", front_end_dir())) - }) - .bind("0.0.0.0:8536") + // Start chat server actor in separate thread + let server = ChatServer::default().start(); + // Create Http server with websocket support + HttpServer::new(move || { + App::new() + .data(server.clone()) + .service(web::resource("/api/v1/ws").to(chat_route)) + // .service(web::resource("/api/v1/rest").route(web::post().to(||{}))) + .service(web::resource("/").to(index)) + // static resources + .service(actix_files::Files::new("/static", front_end_dir())) + }) + .bind("0.0.0.0:8536") .unwrap() .start(); - println!("Started http server: 0.0.0.0:8536"); - let _ = sys.run(); + println!("Started http server: 0.0.0.0:8536"); + let _ = sys.run(); } fn index() -> Result { - Ok(NamedFile::open(front_end_dir() + "/index.html")?) + Ok(NamedFile::open(front_end_dir() + "/index.html")?) } fn front_end_dir() -> String { - env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string()) + env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string()) }