diff --git a/server/src/bin/main.rs b/server/src/bin/main.rs index 96f8181d0..701a4e36e 100644 --- a/server/src/bin/main.rs +++ b/server/src/bin/main.rs @@ -29,7 +29,8 @@ fn chat_route(req: &HttpRequest) -> Result) -> Result Running { // notify chat server - ctx.state().addr.do_send(Disconnect { id: self.id }); + ctx.state().addr.do_send(Disconnect { + id: self.id, + ip: self.ip.to_owned(), + }); Running::Stop } } @@ -111,7 +117,7 @@ impl StreamHandler for WSSession { .addr .send(StandardMessage { id: self.id, - msg: m + msg: m, }) .into_actor(self) .then(|res, _, ctx| { @@ -215,7 +221,7 @@ impl WSSession { // notify chat server ctx.state() .addr - .do_send(Disconnect { id: act.id }); + .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() }); // stop actor ctx.stop(); diff --git a/server/src/websocket_server/server.rs b/server/src/websocket_server/server.rs index fef60e8b4..b24169513 100644 --- a/server/src/websocket_server/server.rs +++ b/server/src/websocket_server/server.rs @@ -11,6 +11,7 @@ use bcrypt::{verify}; use std::str::FromStr; use diesel::PgConnection; use failure::Error; +use std::time::{SystemTime}; use {Crud, Joinable, Likeable, Followable, Bannable, Saveable, establish_connection, naive_now, naive_from_unix, SortType, SearchType, has_slurs, remove_slurs}; use actions::community::*; @@ -25,6 +26,11 @@ use actions::user_view::*; use actions::moderator_views::*; use actions::moderator::*; +const RATE_LIMIT_MESSAGES: i32 = 30; +const RATE_LIMIT_PER_SECOND: i32 = 60; +const RATE_LIMIT_REGISTER_MESSAGES: i32 = 1; +const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60; + #[derive(EnumString,ToString,Debug)] pub enum UserOperation { Login, Register, CreateCommunity, CreatePost, ListCommunities, ListCategories, GetPost, GetCommunity, CreateComment, EditComment, SaveComment, CreateCommentLike, GetPosts, CreatePostLike, EditPost, SavePost, EditCommunity, FollowCommunity, GetFollowedCommunities, GetUserDetails, GetReplies, GetModlog, BanFromCommunity, AddModToCommunity, CreateSite, EditSite, GetSite, AddAdmin, BanUser, Search, MarkAllAsRead @@ -48,12 +54,14 @@ pub struct WSMessage(pub String); #[rtype(usize)] pub struct Connect { pub addr: Recipient, + pub ip: String, } /// Session is disconnected #[derive(Message)] pub struct Disconnect { pub id: usize, + pub ip: String, } /// Send message to specific room @@ -487,10 +495,22 @@ pub struct MarkAllAsRead { auth: String } +#[derive(Debug)] +pub struct RateLimitBucket { + last_checked: SystemTime, + allowance: f64 +} + +pub struct SessionInfo { + pub addr: Recipient, + pub ip: String, +} + /// `ChatServer` manages chat rooms and responsible for coordinating chat /// session. implementation is super primitive pub struct ChatServer { - sessions: HashMap>, // A map from generated random ID to session addr + sessions: HashMap, // A map from generated random ID to session addr + rate_limits: HashMap, rooms: HashMap>, // A map from room / post name to set of connectionIDs rng: ThreadRng, } @@ -502,6 +522,7 @@ impl Default for ChatServer { ChatServer { sessions: HashMap::new(), + rate_limits: HashMap::new(), rooms: rooms, rng: rand::thread_rng(), } @@ -514,8 +535,8 @@ impl ChatServer { if let Some(sessions) = self.rooms.get(&room) { for id in sessions { if *id != skip_id { - if let Some(addr) = self.sessions.get(id) { - let _ = addr.do_send(WSMessage(message.to_owned())); + if let Some(info) = self.sessions.get(id) { + let _ = info.addr.do_send(WSMessage(message.to_owned())); } } } @@ -540,8 +561,50 @@ impl ChatServer { Ok(()) } + + fn check_rate_limit_register(&mut self, addr: usize) -> Result<(), Error> { + self.check_rate_limit_full(addr, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND) + } + + fn check_rate_limit(&mut self, addr: usize) -> Result<(), Error> { + self.check_rate_limit_full(addr, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND) + } + + fn check_rate_limit_full(&mut self, addr: usize, rate: i32, per: i32) -> Result<(), Error> { + if let Some(info) = self.sessions.get(&addr) { + if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) { + if rate_limit.allowance == -2f64 { + rate_limit.allowance = rate as f64; + }; + + let current = SystemTime::now(); + let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64; + rate_limit.last_checked = current; + rate_limit.allowance += time_passed * (rate as f64 / per as f64); + if rate_limit.allowance > rate as f64 { + rate_limit.allowance = rate as f64; + } + + if rate_limit.allowance < 1.0 { + println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance); + Err(ErrorMessage { + op: "Rate Limit".to_string(), + message: format!("Too many requests. {} per {} seconds", rate, per), + })? + } else { + rate_limit.allowance -= 1.0; + Ok(()) + } + } else { + Ok(()) + } + } else { + Ok(()) + } + } } + /// Make actor from `ChatServer` impl Actor for ChatServer { /// We are going to use simple Context, we just need ability to communicate @@ -555,14 +618,30 @@ impl Actor for ChatServer { impl Handler for ChatServer { type Result = usize; - fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: Connect, _ctx: &mut Context) -> Self::Result { // notify all users in same room // self.send_room_message(&"Main".to_owned(), "Someone joined", 0); // register session with random id let id = self.rng.gen::(); - self.sessions.insert(id, msg.addr); + println!("{} Joined", &msg.ip); + + self.sessions.insert(id, SessionInfo { + addr: msg.addr, + ip: msg.ip.to_owned(), + }); + + if self.rate_limits.get(&msg.ip).is_none() { + self.rate_limits.insert(msg.ip, RateLimitBucket { + last_checked: SystemTime::now(), + allowance: -2f64, + }); + } + + // for (k,v) in &self.rate_limits { + // println!("{}: {:?}", k,v); + // } // auto join session to Main room // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); @@ -572,6 +651,7 @@ impl Handler for ChatServer { } } + /// Handler for Disconnect message. impl Handler for ChatServer { type Result = (); @@ -794,10 +874,12 @@ impl Perform for Register { fn op_type(&self) -> UserOperation { UserOperation::Register } - fn perform(&self, _chat: &mut ChatServer, _addr: usize) -> Result { + fn perform(&self, chat: &mut ChatServer, addr: usize) -> Result { let conn = establish_connection(); + chat.check_rate_limit_register(addr)?; + // Make sure passwords match if &self.password != &self.password_verify { return Err(self.error("Passwords do not match."))? @@ -884,10 +966,12 @@ impl Perform for CreateCommunity { UserOperation::CreateCommunity } - fn perform(&self, _chat: &mut ChatServer, _addr: usize) -> Result { + fn perform(&self, chat: &mut ChatServer, addr: usize) -> Result { let conn = establish_connection(); + chat.check_rate_limit_register(addr)?; + let claims = match Claims::decode(&self.auth) { Ok(claims) => claims.claims, Err(_e) => { @@ -1030,10 +1114,12 @@ impl Perform for CreatePost { UserOperation::CreatePost } - fn perform(&self, _chat: &mut ChatServer, _addr: usize) -> Result { + fn perform(&self, chat: &mut ChatServer, addr: usize) -> Result { let conn = establish_connection(); + chat.check_rate_limit_register(addr)?; + let claims = match Claims::decode(&self.auth) { Ok(claims) => claims.claims, Err(_e) => { @@ -1242,6 +1328,8 @@ impl Perform for CreateComment { let conn = establish_connection(); + chat.check_rate_limit(addr)?; + let claims = match Claims::decode(&self.auth) { Ok(claims) => claims.claims, Err(_e) => { @@ -1390,7 +1478,7 @@ impl Perform for EditComment { deleted: self.deleted.to_owned(), read: self.read.to_owned(), updated: if self.read.is_some() { orig_comment.updated } else {Some(naive_now())} - }; + }; let _updated_comment = match Comment::update(&conn, self.edit_id, &comment_form) { Ok(comment) => comment, @@ -1500,6 +1588,8 @@ impl Perform for CreateCommentLike { let conn = establish_connection(); + chat.check_rate_limit(addr)?; + let claims = match Claims::decode(&self.auth) { Ok(claims) => claims.claims, Err(_e) => { @@ -1628,10 +1718,12 @@ impl Perform for CreatePostLike { UserOperation::CreatePostLike } - fn perform(&self, _chat: &mut ChatServer, _addr: usize) -> Result { + fn perform(&self, chat: &mut ChatServer, addr: usize) -> Result { let conn = establish_connection(); + chat.check_rate_limit(addr)?; + let claims = match Claims::decode(&self.auth) { Ok(claims) => claims.claims, Err(_e) => { @@ -2695,7 +2787,7 @@ impl Perform for Search { }, SearchType::Comments => { comments = CommentView::list(&conn, - &sort, + &sort, None, None, Some(self.q.to_owned()), @@ -2717,7 +2809,7 @@ impl Perform for Search { self.page, self.limit)?; comments = CommentView::list(&conn, - &sort, + &sort, None, None, Some(self.q.to_owned()), diff --git a/ui/src/components/main.tsx b/ui/src/components/main.tsx index f4ad37713..44025fef6 100644 --- a/ui/src/components/main.tsx +++ b/ui/src/components/main.tsx @@ -128,7 +128,7 @@ export class Main extends Component { {this.trendingCommunities()} {UserService.Instance.user && this.state.subscribedCommunities.length > 0 &&
-
Subscribed communities
+
Subscribed communities
    {this.state.subscribedCommunities.map(community =>
  • {community.community_name}