Merge branch 'main' into invite_instances

invite_instances
Dessalines 2021-12-08 16:48:17 -05:00
commit b9978cc141
49 changed files with 542 additions and 362 deletions

437
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -61,6 +61,8 @@ http-signature-normalization-actix = { version = "0.5.0-beta.10", default-featur
tokio = { version = "1.12.0", features = ["sync"] }
anyhow = "1.0.44"
reqwest = { version = "0.11.4", features = ["json"] }
reqwest-middleware = "0.1.2"
reqwest-tracing = "0.2.0"
activitystreams = "0.7.0-alpha.11"
actix-rt = { version = "2.2.0", default-features = false }
serde_json = { version = "1.0.68", features = ["preserve_order"] }

View File

@ -30,7 +30,6 @@ serde = { version = "1.0.130", features = ["derive"] }
actix = "0.12.0"
actix-web = { version = "4.0.0-beta.9", default-features = false }
actix-rt = { version = "2.2.0", default-features = false }
awc = { version = "3.0.0-beta.8", default-features = false }
rand = "0.8.4"
strum = "0.21.0"
strum_macros = "0.21.1"

View File

@ -25,7 +25,6 @@ serde = { version = "1.0.130", features = ["derive"] }
actix = "0.12.0"
actix-web = { version = "4.0.0-beta.9", default-features = false }
actix-rt = { version = "2.2.0", default-features = false }
awc = { version = "3.0.0-beta.8", default-features = false }
tracing = "0.1.29"
rand = "0.8.4"
strum = "0.21.0"

View File

@ -53,7 +53,7 @@ impl PerformCrud for GetCommunity {
.await?;
ObjectId::<ApubCommunity>::new(community_actor_id)
.dereference(context, &mut 0)
.dereference(context, context.client(), &mut 0)
.await
.map_err(LemmyError::from)
.map_err(|e| e.with_message("couldnt_find_community"))?

View File

@ -59,7 +59,7 @@ impl PerformCrud for GetPersonDetails {
webfinger_resolve::<ApubPerson>(&name, EndpointType::Person, context, &mut 0).await?;
let person = ObjectId::<ApubPerson>::new(actor_id)
.dereference(context, &mut 0)
.dereference(context, context.client(), &mut 0)
.await;
person
.map_err(LemmyError::from)

View File

@ -30,7 +30,6 @@ serde_with = "1.10.0"
actix = "0.12.0"
actix-web = { version = "4.0.0-beta.9", default-features = false }
actix-rt = { version = "2.2.0", default-features = false }
awc = { version = "3.0.0-beta.8", default-features = false }
tracing = "0.1.29"
rand = "0.8.4"
strum = "0.21.0"
@ -55,3 +54,4 @@ once_cell = "1.8.0"
[dev-dependencies]
serial_test = "0.5.1"
assert-json-diff = "2.0.1"
reqwest-middleware = "0.1.2"

View File

@ -71,7 +71,9 @@ impl CreateOrUpdateComment {
.collect();
let mut inboxes = vec![];
for t in tagged_users {
let person = t.dereference(context, request_counter).await?;
let person = t
.dereference(context, context.client(), request_counter)
.await?;
inboxes.push(person.shared_inbox_or_inbox_url());
}

View File

@ -21,7 +21,9 @@ async fn get_notif_recipients(
) -> Result<Vec<LocalUserId>, LemmyError> {
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
let actor = actor.dereference(context, request_counter).await?;
let actor = actor
.dereference(context, context.client(), request_counter)
.await?;
// Note:
// Although mentions could be gotten from the post tags (they are included there), or the ccs,

View File

@ -86,7 +86,10 @@ impl ActivityHandler for AddMod {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community = self.get_community(context, request_counter).await?;
let new_mod = self.object.dereference(context, request_counter).await?;
let new_mod = self
.object
.dereference(context, context.client(), request_counter)
.await?;
// If we had to refetch the community while parsing the activity, then the new mod has already
// been added. Skip it here as it would result in a duplicate key error.

View File

@ -93,7 +93,10 @@ impl ActivityHandler for BlockUserFromCommunity {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community = self.get_community(context, request_counter).await?;
let blocked_user = self.object.dereference(context, request_counter).await?;
let blocked_user = self
.object
.dereference(context, context.client(), request_counter)
.await?;
let community_user_ban_form = CommunityPersonBanForm {
community_id: community.id,
@ -129,6 +132,9 @@ impl GetCommunity for BlockUserFromCommunity {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<ApubCommunity, LemmyError> {
self.target.dereference(context, request_counter).await
self
.target
.dereference(context, context.client(), request_counter)
.await
}
}

View File

@ -44,6 +44,6 @@ async fn get_community_from_moderators_url(
) -> Result<ApubCommunity, LemmyError> {
let community_id = Url::parse(&moderators.to_string().replace("/moderators", ""))?;
ObjectId::new(community_id)
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await
}

View File

@ -86,7 +86,10 @@ impl ActivityHandler for RemoveMod {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community = self.get_community(context, request_counter).await?;
let remove_mod = self.object.dereference(context, request_counter).await?;
let remove_mod = self
.object
.dereference(context, context.client(), request_counter)
.await?;
let form = CommunityModeratorForm {
community_id: community.id,

View File

@ -74,7 +74,9 @@ impl ActivityHandler for Report {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_activity(&self.id, self.actor.inner(), &context.settings())?;
let community = self.to[0].dereference(context, request_counter).await?;
let community = self.to[0]
.dereference(context, context.client(), request_counter)
.await?;
verify_person_in_community(&self.actor, &community, context, request_counter).await?;
Ok(())
}
@ -85,8 +87,15 @@ impl ActivityHandler for Report {
context: &Data<LemmyContext>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = self.actor.dereference(context, request_counter).await?;
match self.object.dereference(context, request_counter).await? {
let actor = self
.actor
.dereference(context, context.client(), request_counter)
.await?;
match self
.object
.dereference(context, context.client(), request_counter)
.await?
{
PostOrComment::Post(post) => {
let report_form = PostReportForm {
creator_id: actor.id,

View File

@ -87,7 +87,7 @@ impl ActivityHandler for UndoBlockUserFromCommunity {
let blocked_user = self
.object
.object
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
let community_user_ban_form = CommunityPersonBanForm {

View File

@ -121,6 +121,8 @@ impl GetCommunity for UpdateCommunity {
request_counter: &mut i32,
) -> Result<ApubCommunity, LemmyError> {
let cid = ObjectId::new(self.object.id.clone());
cid.dereference(context, request_counter).await
cid
.dereference(context, context.client(), request_counter)
.await
}
}

View File

@ -139,7 +139,9 @@ pub(in crate::activities) async fn receive_remove_action(
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = actor.dereference(context, request_counter).await?;
let actor = actor
.dereference(context, context.client(), request_counter)
.await?;
use UserOperationCrud::*;
match DeletableObjects::read_from_db(object, context).await? {
DeletableObjects::Community(community) => {

View File

@ -165,7 +165,9 @@ async fn receive_delete_action(
match DeletableObjects::read_from_db(object, context).await? {
DeletableObjects::Community(community) => {
if community.local {
let mod_ = actor.dereference(context, request_counter).await?;
let mod_ = actor
.dereference(context, context.client(), request_counter)
.await?;
let object = DeletableObjects::Community(community.clone());
send_apub_delete(&mod_, &community.clone(), object, true, context).await?;
}

View File

@ -25,7 +25,7 @@ impl AcceptFollowCommunity {
let person = follow
.actor
.clone()
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
let accept = AcceptFollowCommunity {
actor: ObjectId::new(community.actor_id()),
@ -65,11 +65,14 @@ impl ActivityHandler for AcceptFollowCommunity {
context: &Data<LemmyContext>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = self.actor.dereference(context, request_counter).await?;
let person = self
.actor
.dereference(context, context.client(), request_counter)
.await?;
let community = self
.object
.actor
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
// This will throw an error if no follow was requested
blocking(context.pool(), move |conn| {

View File

@ -75,7 +75,10 @@ impl ActivityHandler for FollowCommunity {
) -> Result<(), LemmyError> {
verify_activity(&self.id, self.actor.inner(), &context.settings())?;
verify_person(&self.actor, context, request_counter).await?;
let community = self.object.dereference(context, request_counter).await?;
let community = self
.object
.dereference(context, context.client(), request_counter)
.await?;
verify_person_in_community(&self.actor, &community, context, request_counter).await?;
Ok(())
}
@ -86,8 +89,14 @@ impl ActivityHandler for FollowCommunity {
context: &Data<LemmyContext>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = self.actor.dereference(context, request_counter).await?;
let community = self.object.dereference(context, request_counter).await?;
let person = self
.actor
.dereference(context, context.client(), request_counter)
.await?;
let community = self
.object
.dereference(context, context.client(), request_counter)
.await?;
let community_follower_form = CommunityFollowerForm {
community_id: community.id,
person_id: person.id,

View File

@ -64,11 +64,14 @@ impl ActivityHandler for UndoFollowCommunity {
context: &Data<LemmyContext>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = self.actor.dereference(context, request_counter).await?;
let person = self
.actor
.dereference(context, context.client(), request_counter)
.await?;
let community = self
.object
.object
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
let community_follower_form = CommunityFollowerForm {

View File

@ -41,7 +41,9 @@ async fn verify_person(
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = person_id.dereference(context, request_counter).await?;
let person = person_id
.dereference(context, context.client(), request_counter)
.await?;
if person.banned {
let error = LemmyError::from(anyhow::anyhow!("Person {} is banned", person_id));
return Err(error.with_message("banned"));
@ -58,7 +60,9 @@ pub(crate) async fn verify_person_in_community(
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = person_id.dereference(context, request_counter).await?;
let person = person_id
.dereference(context, context.client(), request_counter)
.await?;
if person.banned {
return Err(LemmyError::from_message("Person is banned from site"));
}
@ -90,7 +94,9 @@ pub(crate) async fn verify_mod_action(
request_counter: &mut i32,
) -> Result<(), LemmyError> {
if community.local {
let actor = actor_id.dereference(context, request_counter).await?;
let actor = actor_id
.dereference(context, context.client(), request_counter)
.await?;
// Note: this will also return true for admins in addition to mods, but as we dont know about
// remote admins, it doesnt make any difference.

View File

@ -86,11 +86,14 @@ impl ActivityHandler for UndoVote {
context: &Data<LemmyContext>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = self.actor.dereference(context, request_counter).await?;
let actor = self
.actor
.dereference(context, context.client(), request_counter)
.await?;
let object = self
.object
.object
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
match object {
PostOrComment::Post(p) => undo_vote_post(actor, &p, context).await,

View File

@ -90,8 +90,14 @@ impl ActivityHandler for Vote {
context: &Data<LemmyContext>,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = self.actor.dereference(context, request_counter).await?;
let object = self.object.dereference(context, request_counter).await?;
let actor = self
.actor
.dereference(context, context.client(), request_counter)
.await?;
let object = self
.object
.dereference(context, context.client(), request_counter)
.await?;
match object {
PostOrComment::Post(p) => vote_post(&self.kind, actor, &p, context).await,
PostOrComment::Comment(c) => vote_comment(&self.kind, actor, &c, context).await,
@ -107,7 +113,10 @@ impl GetCommunity for Vote {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<ApubCommunity, LemmyError> {
let object = self.object.dereference(context, request_counter).await?;
let object = self
.object
.dereference(context, context.client(), request_counter)
.await?;
let cid = match object {
PostOrComment::Post(p) => p.community_id,
PostOrComment::Comment(c) => {

View File

@ -110,7 +110,9 @@ impl ApubObject for ApubCommunityModerators {
// Add new mods to database which have been added to moderators collection
for mod_id in apub.ordered_items {
let mod_id = ObjectId::new(mod_id);
let mod_user: ApubPerson = mod_id.dereference(&data.1, request_counter).await?;
let mod_user: ApubPerson = mod_id
.dereference(&data.1, data.1.client(), request_counter)
.await?;
if !current_moderators
.iter()
@ -154,7 +156,8 @@ mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_community_moderators() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await;
let community_id = community.id;

View File

@ -27,7 +27,7 @@ pub async fn search_by_apub_id(
match Url::parse(query) {
Ok(url) => {
ObjectId::new(url)
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await
}
Err(_) => {
@ -43,7 +43,7 @@ pub async fn search_by_apub_id(
.await?;
Ok(SearchableObjects::Person(
ObjectId::new(id)
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?,
))
}
@ -57,7 +57,7 @@ pub async fn search_by_apub_id(
.await?;
Ok(SearchableObjects::Community(
ObjectId::new(id)
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?,
))
}

View File

@ -109,7 +109,7 @@ where
.collect();
for l in links {
let object = ObjectId::<Kind>::new(l)
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await;
if object.is_ok() {
return object.map(|o| o.actor_id().into());

View File

@ -119,7 +119,9 @@ pub(crate) async fn get_apub_community_outbox(
.await??;
let id = ObjectId::new(generate_outbox_url(&community.actor_id)?);
let outbox_data = CommunityContext(community.into(), context.get_ref().clone());
let outbox: ApubCommunityOutbox = id.dereference(&outbox_data, &mut 0).await?;
let outbox: ApubCommunityOutbox = id
.dereference(&outbox_data, context.client(), &mut 0)
.await?;
Ok(create_apub_response(&outbox.into_apub(&outbox_data).await?))
}
@ -135,7 +137,9 @@ pub(crate) async fn get_apub_community_moderators(
.into();
let id = ObjectId::new(generate_outbox_url(&community.actor_id)?);
let outbox_data = CommunityContext(community, context.get_ref().clone());
let moderators: ApubCommunityModerators = id.dereference(&outbox_data, &mut 0).await?;
let moderators: ApubCommunityModerators = id
.dereference(&outbox_data, context.client(), &mut 0)
.await?;
Ok(create_apub_response(
&moderators.into_apub(&outbox_data).await?,
))

View File

@ -95,7 +95,7 @@ where
check_is_apub_id_valid(&activity_data.actor, false, &context.settings())?;
let request_counter = &mut 0;
let actor = ObjectId::<UserOrCommunity>::new(activity_data.actor)
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
verify_signature(&request, &actor.public_key())?;

View File

@ -179,7 +179,7 @@ impl ApubObject for ApubComment {
) -> Result<ApubComment, LemmyError> {
let creator = note
.attributed_to
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
let (post, parent_comment_id) = note.get_parents(context, request_counter).await?;
@ -246,7 +246,8 @@ pub(crate) mod tests {
#[actix_rt::test]
#[serial]
pub(crate) async fn test_parse_lemmy_comment() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;
@ -276,7 +277,8 @@ pub(crate) mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_comment() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;

View File

@ -147,14 +147,14 @@ impl ApubObject for ApubCommunity {
group
.outbox
.dereference(&outbox_data, request_counter)
.dereference(&outbox_data, context.client(), request_counter)
.await
.map_err(|e| debug!("{}", e))
.ok();
if let Some(moderators) = &group.moderators {
moderators
.dereference(&outbox_data, request_counter)
.dereference(&outbox_data, context.client(), request_counter)
.await
.map_err(|e| debug!("{}", e))
.ok();
@ -247,7 +247,8 @@ pub(crate) mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_community() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await;

View File

@ -39,6 +39,7 @@ pub(crate) mod tests {
};
use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use serde::de::DeserializeOwned;
use std::{fs::File, io::BufReader, sync::Arc};
use tokio::sync::Mutex;
@ -57,6 +58,8 @@ pub(crate) mod tests {
.user_agent(build_user_agent(&settings))
.build()
.unwrap();
let client = ClientBuilder::new(client).build();
let secret = Secret {
id: 0,
jwt_secret: "".to_string(),

View File

@ -224,7 +224,8 @@ pub(crate) mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_person() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let person = parse_lemmy_person(&context).await;
@ -238,7 +239,8 @@ pub(crate) mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_person() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let json = file_to_json_object("assets/pleroma/objects/person.json");
let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap();

View File

@ -158,7 +158,7 @@ impl ApubObject for ApubPost {
) -> Result<ApubPost, LemmyError> {
let creator = page
.attributed_to
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
let community = page.extract_community(context, request_counter).await?;
@ -216,7 +216,8 @@ mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_post() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await;
let person = parse_lemmy_person(&context).await;

View File

@ -113,7 +113,7 @@ impl ApubObject for ApubPrivateMessage {
verify_domains_match(note.attributed_to.inner(), note.id.inner())?;
let person = note
.attributed_to
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
if person.banned {
return Err(LemmyError::from_message("Person is banned from site"));
@ -129,9 +129,11 @@ impl ApubObject for ApubPrivateMessage {
) -> Result<ApubPrivateMessage, LemmyError> {
let creator = note
.attributed_to
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?;
let recipient = note.to[0]
.dereference(context, context.client(), request_counter)
.await?;
let recipient = note.to[0].dereference(context, request_counter).await?;
let content = if let Some(source) = &note.source {
source.content.clone()
} else {
@ -195,7 +197,8 @@ mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_pm() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await;
@ -223,7 +226,8 @@ mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_pm() {
let manager = create_activity_queue();
let client = reqwest::Client::new().into();
let manager = create_activity_queue(client);
let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await;

View File

@ -67,7 +67,7 @@ impl Note {
let parent = Box::pin(
self
.in_reply_to
.dereference(context, request_counter)
.dereference(context, context.client(), request_counter)
.await?,
);
match parent.deref() {

View File

@ -70,7 +70,10 @@ impl Page {
loop {
if let Some(cid) = to_iter.next() {
let cid = ObjectId::new(cid.clone());
if let Ok(c) = cid.dereference(context, request_counter).await {
if let Ok(c) = cid
.dereference(context, context.client(), request_counter)
.await
{
break Ok(c);
}
} else {

View File

@ -17,6 +17,7 @@ url = { version = "2.2.2", features = ["serde"] }
serde_json = { version = "1.0.68", features = ["preserve_order"] }
anyhow = "1.0.44"
reqwest = { version = "0.11.4", features = ["json"] }
reqwest-middleware = "0.1.2"
tracing = "0.1.29"
base64 = "0.13.0"
openssl = "0.10.36"
@ -25,6 +26,6 @@ http = "0.2.5"
sha2 = "0.9.8"
actix-web = { version = "4.0.0-beta.9", default-features = false }
http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] }
http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] }
http-signature-normalization-reqwest = { version = "0.3.0", default-features = false, features = ["sha-2", "middleware"] }
background-jobs = "0.11.0"
diesel = "1.4.8"

View File

@ -10,7 +10,7 @@ use background_jobs::{
WorkerConfig,
};
use lemmy_utils::{location_info, LemmyError};
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::{env, fmt::Debug, future::Future, pin::Pin};
use tracing::{info, warn};
@ -21,7 +21,7 @@ pub async fn send_activity(
actor: &dyn ActorType,
inboxes: Vec<&Url>,
activity: String,
client: &Client,
client: &ClientWithMiddleware,
activity_queue: &QueueHandle,
) -> Result<(), LemmyError> {
for i in inboxes {
@ -66,7 +66,7 @@ impl ActixJob for SendActivityTask {
}
}
async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
async fn do_send(task: SendActivityTask, client: &ClientWithMiddleware) -> Result<(), Error> {
info!("Sending {} to {}", task.activity_id, task.inbox);
let result = sign_and_send(
client,
@ -101,10 +101,10 @@ async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
Ok(())
}
pub fn create_activity_queue() -> Manager {
pub fn create_activity_queue(client: ClientWithMiddleware) -> Manager {
// Configure and start our workers
WorkerConfig::new_managed(Storage::new(), |_| MyState {
client: Client::default(),
WorkerConfig::new_managed(Storage::new(), move |_| MyState {
client: client.clone(),
})
.register::<SendActivityTask>()
.start()
@ -112,5 +112,5 @@ pub fn create_activity_queue() -> Manager {
#[derive(Clone)]
struct MyState {
pub client: Client,
pub client: ClientWithMiddleware,
}

View File

@ -2,13 +2,9 @@ use crate::{traits::ApubObject, APUB_JSON_CONTENT_TYPE};
use activitystreams::chrono::{Duration as ChronoDuration, NaiveDateTime, Utc};
use anyhow::anyhow;
use diesel::NotFound;
use lemmy_utils::{
request::{build_user_agent, retry},
settings::structs::Settings,
LemmyError,
};
use once_cell::sync::Lazy;
use reqwest::{Client, StatusCode};
use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError};
use reqwest::StatusCode;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
@ -18,13 +14,6 @@ use std::{
use tracing::info;
use url::Url;
static CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.user_agent(build_user_agent(&Settings::get()))
.build()
.expect("Couldn't build client")
});
/// We store Url on the heap because it is quite large (88 bytes).
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(transparent)]
@ -53,6 +42,7 @@ where
pub async fn dereference(
&self,
data: &<Kind as ApubObject>::DataType,
client: &ClientWithMiddleware,
request_counter: &mut i32,
) -> Result<Kind, LemmyError> {
let db_object = self.dereference_from_db(data).await?;
@ -71,7 +61,7 @@ where
if let Some(last_refreshed_at) = object.last_refreshed_at() {
if should_refetch_object(last_refreshed_at) {
return self
.dereference_from_http(data, request_counter, Some(object))
.dereference_from_http(data, client, request_counter, Some(object))
.await;
}
}
@ -80,7 +70,7 @@ where
// object not found, need to fetch over http
else {
self
.dereference_from_http(data, request_counter, None)
.dereference_from_http(data, client, request_counter, None)
.await
}
}
@ -107,6 +97,7 @@ where
async fn dereference_from_http(
&self,
data: &<Kind as ApubObject>::DataType,
client: &ClientWithMiddleware,
request_counter: &mut i32,
db_object: Option<Kind>,
) -> Result<Kind, LemmyError> {
@ -120,7 +111,7 @@ where
}
let res = retry(|| {
CLIENT
client
.get(self.0.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(Duration::from_secs(60))

View File

@ -11,7 +11,8 @@ use openssl::{
pkey::PKey,
sign::{Signer, Verifier},
};
use reqwest::{Client, Response};
use reqwest::Response;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::str::FromStr;
@ -24,7 +25,7 @@ static HTTP_SIG_CONFIG: Lazy<Config> = Lazy::new(Config::new);
/// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and
/// `activity` as request body. The request is signed with `private_key` and then sent.
pub async fn sign_and_send(
client: &Client,
client: &ClientWithMiddleware,
inbox_url: &Url,
activity: String,
actor_id: &Url,
@ -43,7 +44,7 @@ pub async fn sign_and_send(
);
headers.insert(HeaderName::from_str("Host")?, HeaderValue::from_str(&host)?);
let response = client
let request = client
.post(&inbox_url.to_string())
.headers(headers)
.signature_with_digest(
@ -61,6 +62,8 @@ pub async fn sign_and_send(
)
.await?;
let response = client.execute(request).await?;
Ok(response)
}

View File

@ -26,10 +26,13 @@ actix-http = "3.0.0-beta.10"
sha2 = "0.9.8"
anyhow = "1.0.44"
chrono = { version = "0.4.19", features = ["serde"] }
futures = "0.3.18"
reqwest = { version = "0.11.7", features = ["stream"] }
reqwest-middleware = "0.1.2"
rss = "1.10.0"
serde = { version = "1.0.130", features = ["derive"] }
awc = { version = "3.0.0-beta.8", default-features = false }
url = { version = "2.2.2", features = ["serde"] }
strum = "0.21.0"
once_cell = "1.8.0"
tracing = "0.1.29"
tokio = { version = "1", features = ["sync"] }

View File

@ -1,18 +1,18 @@
use actix_http::http::header::ACCEPT_ENCODING;
use actix_http::{
header::{HeaderName, HOST},
http::header::ACCEPT_ENCODING,
};
use actix_web::{body::BodyStream, http::StatusCode, web::Data, *};
use anyhow::anyhow;
use awc::Client;
use futures::stream::{Stream, StreamExt};
use lemmy_utils::{claims::Claims, rate_limit::RateLimit, LemmyError};
use lemmy_websocket::LemmyContext;
use reqwest::Body;
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use serde::{Deserialize, Serialize};
use std::time::Duration;
pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimit) {
let client = Client::builder()
.header("User-Agent", "pict-rs-frontend, v0.1.0")
.timeout(Duration::from_secs(30))
.finish();
pub fn config(cfg: &mut web::ServiceConfig, client: ClientWithMiddleware, rate_limit: &RateLimit) {
cfg
.app_data(Data::new(client))
.service(
@ -43,10 +43,34 @@ struct PictrsParams {
thumbnail: Option<String>,
}
fn adapt_request(
request: &HttpRequest,
client: &ClientWithMiddleware,
url: String,
) -> RequestBuilder {
// remove accept-encoding header so that pictrs doesnt compress the response
const INVALID_HEADERS: &[HeaderName] = &[ACCEPT_ENCODING, HOST];
let client_request = client
.request(request.method().clone(), url)
.timeout(Duration::from_secs(30));
request
.headers()
.iter()
.fold(client_request, |client_req, (key, value)| {
if INVALID_HEADERS.contains(key) {
client_req
} else {
client_req.header(key, value)
}
})
}
async fn upload(
req: HttpRequest,
body: web::Payload,
client: web::Data<Client>,
client: web::Data<ClientWithMiddleware>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, Error> {
// TODO: check rate limit here
@ -58,32 +82,31 @@ async fn upload(
return Ok(HttpResponse::Unauthorized().finish());
};
let mut client_req = client.request_from(
format!("{}/image", pictrs_url(context.settings().pictrs_url)?),
req.head(),
);
// remove content-encoding header so that pictrs doesnt send gzipped response
client_req.headers_mut().remove(ACCEPT_ENCODING);
let image_url = format!("{}/image", pictrs_url(context.settings().pictrs_url)?);
let mut client_req = adapt_request(&req, &client, image_url);
if let Some(addr) = req.head().peer_addr {
client_req = client_req.insert_header(("X-Forwarded-For", addr.to_string()))
client_req = client_req.header("X-Forwarded-For", addr.to_string())
};
let mut res = client_req
.send_stream(body)
let res = client_req
.body(Body::wrap_stream(make_send(body)))
.send()
.await
.map_err(error::ErrorBadRequest)?;
let status = res.status();
let images = res.json::<Images>().await.map_err(error::ErrorBadRequest)?;
Ok(HttpResponse::build(res.status()).json(images))
Ok(HttpResponse::build(status).json(images))
}
async fn full_res(
filename: web::Path<String>,
web::Query(params): web::Query<PictrsParams>,
req: HttpRequest,
client: web::Data<Client>,
client: web::Data<ClientWithMiddleware>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, Error> {
let name = &filename.into_inner();
@ -119,20 +142,19 @@ async fn full_res(
async fn image(
url: String,
req: HttpRequest,
client: web::Data<Client>,
client: web::Data<ClientWithMiddleware>,
) -> Result<HttpResponse, Error> {
let mut client_req = client.request_from(url, req.head());
client_req.headers_mut().remove(ACCEPT_ENCODING);
let mut client_req = adapt_request(&req, &client, url);
if let Some(addr) = req.head().peer_addr {
client_req = client_req.insert_header(("X-Forwarded-For", addr.to_string()))
};
client_req = client_req.header("X-Forwarded-For", addr.to_string());
}
let res = client_req
.no_decompress()
.send()
.await
.map_err(error::ErrorBadRequest)?;
if let Some(addr) = req.head().peer_addr {
client_req = client_req.header("X-Forwarded-For", addr.to_string());
}
let res = client_req.send().await.map_err(error::ErrorBadRequest)?;
if res.status() == StatusCode::NOT_FOUND {
return Ok(HttpResponse::NotFound().finish());
@ -144,13 +166,13 @@ async fn image(
client_res.insert_header((name.clone(), value.clone()));
}
Ok(client_res.body(BodyStream::new(res)))
Ok(client_res.body(BodyStream::new(res.bytes_stream())))
}
async fn delete(
components: web::Path<(String, String)>,
req: HttpRequest,
client: web::Data<Client>,
client: web::Data<ClientWithMiddleware>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, Error> {
let (token, file) = components.into_inner();
@ -162,22 +184,59 @@ async fn delete(
&file
);
let mut client_req = client.request_from(url, req.head());
client_req.headers_mut().remove(ACCEPT_ENCODING);
let mut client_req = adapt_request(&req, &client, url);
if let Some(addr) = req.head().peer_addr {
client_req = client_req.insert_header(("X-Forwarded-For", addr.to_string()))
};
client_req = client_req.header("X-Forwarded-For", addr.to_string());
}
let res = client_req
.no_decompress()
.send()
.await
.map_err(error::ErrorBadRequest)?;
let res = client_req.send().await.map_err(error::ErrorBadRequest)?;
Ok(HttpResponse::build(res.status()).body(BodyStream::new(res)))
Ok(HttpResponse::build(res.status()).body(BodyStream::new(res.bytes_stream())))
}
fn pictrs_url(pictrs_url: Option<String>) -> Result<String, LemmyError> {
pictrs_url.ok_or_else(|| anyhow!("images_disabled").into())
}
fn make_send<S>(mut stream: S) -> impl Stream<Item = S::Item> + Send + Unpin + 'static
where
S: Stream + Unpin + 'static,
S::Item: Send,
{
// NOTE: the 8 here is arbitrary
let (tx, rx) = tokio::sync::mpsc::channel(8);
// NOTE: spawning stream into a new task can potentially hit this bug:
// - https://github.com/actix/actix-web/issues/1679
//
// Since 4.0.0-beta.2 this issue is incredibly less frequent. I have not personally reproduced it.
// That said, it is still technically possible to encounter.
actix_web::rt::spawn(async move {
while let Some(res) = stream.next().await {
if tx.send(res).await.is_err() {
break;
}
}
});
SendStream { rx }
}
struct SendStream<T> {
rx: tokio::sync::mpsc::Receiver<T>,
}
impl<T> Stream for SendStream<T>
where
T: Send,
{
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.rx).poll_recv(cx)
}
}

View File

@ -32,6 +32,7 @@ actix-web = { version = "4.0.0-beta.9", default-features = false, features = ["r
actix-rt = { version = "2.2.0", default-features = false }
anyhow = "1.0.44"
reqwest = { version = "0.11.4", features = ["json"] }
reqwest-middleware = "0.1.2"
tokio = { version = "1.12.0", features = ["sync"] }
strum = "0.21.0"
strum_macros = "0.21.1"

View File

@ -1,7 +1,7 @@
use crate::{settings::structs::Settings, version::VERSION, LemmyError};
use anyhow::anyhow;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::future::Future;
use thiserror::Error;
@ -17,30 +17,33 @@ struct SendError(pub String);
#[error("Error receiving response, {0}")]
pub struct RecvError(pub String);
pub async fn retry<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
pub async fn retry<F, Fut, T>(f: F) -> Result<T, reqwest_middleware::Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, reqwest::Error>>,
Fut: Future<Output = Result<T, reqwest_middleware::Error>>,
{
retry_custom(|| async { Ok((f)().await) }).await
}
async fn retry_custom<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
async fn retry_custom<F, Fut, T>(f: F) -> Result<T, reqwest_middleware::Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<Result<T, reqwest::Error>, reqwest::Error>>,
Fut: Future<Output = Result<Result<T, reqwest_middleware::Error>, reqwest_middleware::Error>>,
{
let mut response: Option<Result<T, reqwest::Error>> = None;
let mut response: Option<Result<T, reqwest_middleware::Error>> = None;
for _ in 0u8..3 {
match (f)().await? {
Ok(t) => return Ok(t),
Err(e) => {
Err(reqwest_middleware::Error::Reqwest(e)) => {
if e.is_timeout() {
response = Some(Err(e));
response = Some(Err(reqwest_middleware::Error::Reqwest(e)));
continue;
}
return Err(e);
return Err(reqwest_middleware::Error::Reqwest(e));
}
Err(otherwise) => {
return Err(otherwise);
}
}
}
@ -57,7 +60,10 @@ pub struct SiteMetadata {
}
/// Fetches the post link html tags (like title, description, image, etc)
pub async fn fetch_site_metadata(client: &Client, url: &Url) -> Result<SiteMetadata, LemmyError> {
pub async fn fetch_site_metadata(
client: &ClientWithMiddleware,
url: &Url,
) -> Result<SiteMetadata, LemmyError> {
let response = client.get(url.as_str()).send().await?;
let html = response
@ -119,7 +125,7 @@ pub(crate) struct PictrsFile {
}
pub(crate) async fn fetch_pictrs(
client: &Client,
client: &ClientWithMiddleware,
settings: &Settings,
image_url: &Url,
) -> Result<PictrsResponse, LemmyError> {
@ -152,7 +158,7 @@ pub(crate) async fn fetch_pictrs(
/// Both are options, since the URL might be either an html page, or an image
/// Returns the SiteMetadata, and a Pictrs URL, if there is a picture associated
pub async fn fetch_site_data(
client: &Client,
client: &ClientWithMiddleware,
settings: &Settings,
url: Option<&Url>,
) -> (Option<SiteMetadata>, Option<Url>) {
@ -201,7 +207,7 @@ pub async fn fetch_site_data(
}
}
async fn is_image_content_type(client: &Client, url: &Url) -> Result<(), LemmyError> {
async fn is_image_content_type(client: &ClientWithMiddleware, url: &Url) -> Result<(), LemmyError> {
let response = client.get(url.as_str()).send().await?;
if response
.headers()
@ -239,7 +245,8 @@ mod tests {
let client = reqwest::Client::builder()
.user_agent(build_user_agent(&settings))
.build()
.unwrap();
.unwrap()
.into();
let sample_url = Url::parse("https://www.redspark.nu/en/peoples-war/district-leader-of-chand-led-cpn-arrested-in-bhojpur/").unwrap();
let sample_res = fetch_site_metadata(&client, &sample_url).await.unwrap();
assert_eq!(

View File

@ -19,6 +19,7 @@ lemmy_db_schema = { version = "=0.14.4-rc.4", path = "../db_schema" }
lemmy_db_views = { version = "=0.14.4-rc.4", path = "../db_views" }
lemmy_db_views_actor = { version = "=0.14.4-rc.4", path = "../db_views_actor" }
reqwest = { version = "0.11.4", features = ["json"] }
reqwest-middleware = "0.1.2"
tracing = "0.1.29"
rand = "0.8.4"
serde = { version = "1.0.130", features = ["derive"] }

View File

@ -27,7 +27,7 @@ use lemmy_utils::{
LemmyError,
};
use rand::rngs::ThreadRng;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize;
use serde_json::Value;
use std::{
@ -90,7 +90,7 @@ pub struct ChatServer {
message_handler_crud: MessageHandlerCrudType,
/// An HTTP Client
client: Client,
client: ClientWithMiddleware,
activity_queue: QueueHandle,
}
@ -110,7 +110,7 @@ impl ChatServer {
rate_limiter: RateLimit,
message_handler: MessageHandlerType,
message_handler_crud: MessageHandlerCrudType,
client: Client,
client: ClientWithMiddleware,
activity_queue: QueueHandle,
settings: Settings,
secret: Secret,

View File

@ -6,7 +6,7 @@ use actix::Addr;
use background_jobs::QueueHandle;
use lemmy_db_schema::{source::secret::Secret, DbPool};
use lemmy_utils::{settings::structs::Settings, LemmyError};
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize;
pub mod chat_server;
@ -18,7 +18,7 @@ pub mod send;
pub struct LemmyContext {
pool: DbPool,
chat_server: Addr<ChatServer>,
client: Client,
client: ClientWithMiddleware,
activity_queue: QueueHandle,
settings: Settings,
secret: Secret,
@ -28,7 +28,7 @@ impl LemmyContext {
pub fn create(
pool: DbPool,
chat_server: Addr<ChatServer>,
client: Client,
client: ClientWithMiddleware,
activity_queue: QueueHandle,
settings: Settings,
secret: Secret,
@ -48,7 +48,7 @@ impl LemmyContext {
pub fn chat_server(&self) -> &Addr<ChatServer> {
&self.chat_server
}
pub fn client(&self) -> &Client {
pub fn client(&self) -> &ClientWithMiddleware {
&self.client
}
pub fn activity_queue(&self) -> &QueueHandle {

View File

@ -29,6 +29,8 @@ use lemmy_utils::{
};
use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
use std::{env, sync::Arc, thread};
use tokio::sync::Mutex;
use tracing_actix_web::TracingLogger;
@ -95,7 +97,9 @@ async fn main() -> Result<(), LemmyError> {
.user_agent(build_user_agent(&settings))
.build()?;
let queue_manager = create_activity_queue();
let client = ClientBuilder::new(client).with(TracingMiddleware).build();
let queue_manager = create_activity_queue(client.clone());
let activity_queue = queue_manager.queue_handle().clone();
@ -131,7 +135,7 @@ async fn main() -> Result<(), LemmyError> {
.configure(|cfg| api_routes::config(cfg, &rate_limiter))
.configure(|cfg| lemmy_apub::http::routes::config(cfg, &settings))
.configure(feeds::config)
.configure(|cfg| images::config(cfg, &rate_limiter))
.configure(|cfg| images::config(cfg, client.clone(), &rate_limiter))
.configure(nodeinfo::config)
.configure(|cfg| webfinger::config(cfg, &settings))
})