forked from MarseyWorld/MarseyWorld
master
parent
de39ebcaa9
commit
d17842d3c7
|
@ -412,125 +412,6 @@ def get_post_title(v):
|
|||
except BaseException:
|
||||
return {"error": f"Could not find a title"}, 400
|
||||
|
||||
def thumbs(new_post):
|
||||
pid = new_post.id
|
||||
post = get_post(pid, graceful=True, session=g.db)
|
||||
if not post:
|
||||
# account for possible follower lag
|
||||
time.sleep(60)
|
||||
post = get_post(pid, session=g.db)
|
||||
|
||||
fetch_url=post.url
|
||||
|
||||
#get the content
|
||||
|
||||
#mimic chrome browser agent
|
||||
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36"}
|
||||
|
||||
try:
|
||||
x=requests.get(fetch_url, headers=headers)
|
||||
except:
|
||||
return False, "Unable to connect to source"
|
||||
|
||||
if x.status_code != 200:
|
||||
return False, f"Source returned status {x.status_code}."
|
||||
|
||||
#if content is image, stick with that. Otherwise, parse html.
|
||||
|
||||
if x.headers.get("Content-Type","").startswith("text/html"):
|
||||
#parse html, find image, load image
|
||||
soup=BeautifulSoup(x.content, 'html.parser')
|
||||
#parse html
|
||||
|
||||
#create list of urls to check
|
||||
thumb_candidate_urls=[]
|
||||
|
||||
#iterate through desired meta tags
|
||||
meta_tags = [
|
||||
"twitter:image",
|
||||
"og:image",
|
||||
"thumbnail"
|
||||
]
|
||||
|
||||
for tag_name in meta_tags:
|
||||
|
||||
|
||||
|
||||
tag = soup.find(
|
||||
'meta',
|
||||
attrs={
|
||||
"name": tag_name,
|
||||
"content": True
|
||||
}
|
||||
)
|
||||
if not tag:
|
||||
tag = soup.find(
|
||||
'meta',
|
||||
attrs={
|
||||
'property': tag_name,
|
||||
'content': True
|
||||
}
|
||||
)
|
||||
if tag:
|
||||
thumb_candidate_urls.append(expand_url(post.url, tag['content']))
|
||||
|
||||
#parse html doc for <img> elements
|
||||
for tag in soup.find_all("img", attrs={'src':True}):
|
||||
thumb_candidate_urls.append(expand_url(post.url, tag['src']))
|
||||
|
||||
|
||||
#now we have a list of candidate urls to try
|
||||
for url in thumb_candidate_urls:
|
||||
|
||||
try:
|
||||
image_req=requests.get(url, headers=headers)
|
||||
except:
|
||||
continue
|
||||
|
||||
if image_req.status_code >= 400:
|
||||
continue
|
||||
|
||||
if not image_req.headers.get("Content-Type","").startswith("image/"):
|
||||
continue
|
||||
|
||||
if image_req.headers.get("Content-Type","").startswith("image/svg"):
|
||||
continue
|
||||
|
||||
image = PILimage.open(BytesIO(image_req.content))
|
||||
if image.width < 30 or image.height < 30:
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
else:
|
||||
#getting here means we are out of candidate urls (or there never were any)
|
||||
return False, "No usable images"
|
||||
|
||||
|
||||
|
||||
|
||||
elif x.headers.get("Content-Type","").startswith("image/"):
|
||||
#image is originally loaded fetch_url
|
||||
image_req=x
|
||||
image = PILimage.open(BytesIO(x.content))
|
||||
|
||||
else:
|
||||
print(f'Unknown content type {x.headers.get("Content-Type")}')
|
||||
return False, f'Unknown content type {x.headers.get("Content-Type")} for submitted content'
|
||||
|
||||
if 'pcmemes.net' in request.host:
|
||||
with open("image.webp", "wb") as file:
|
||||
for chunk in image_req.iter_content(1024):
|
||||
file.write(chunk)
|
||||
post.thumburl = upload_ibb(resize=True)
|
||||
else:
|
||||
with open("image.png", "wb") as file:
|
||||
for chunk in image_req.iter_content(1024):
|
||||
file.write(chunk)
|
||||
post.thumburl = upload_imgur(filepath="image.png", resize=True)
|
||||
|
||||
g.db.add(post)
|
||||
|
||||
def archiveorg(url):
|
||||
try: requests.get(f'https://web.archive.org/save/{url}', headers={'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}, timeout=100)
|
||||
except Exception as e: print(e)
|
||||
|
@ -946,8 +827,124 @@ def submit_post(v):
|
|||
|
||||
g.db.flush()
|
||||
|
||||
# spin off thumbnail generation and csam detection as new threads
|
||||
if (new_post.url or request.files.get('file')) and (v.is_activated or request.headers.get('cf-ipcountry')!="T1"): thumbs(new_post)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# thumbnail generation
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if (new_post.url or request.files.get('file')) and (v.is_activated or request.headers.get('cf-ipcountry')!="T1"):
|
||||
|
||||
#get the content
|
||||
|
||||
#mimic chrome browser agent
|
||||
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36"}
|
||||
|
||||
try:
|
||||
x=requests.get(new_post.url, headers=headers)
|
||||
except:
|
||||
return False, "Unable to connect to source"
|
||||
|
||||
if x.status_code != 200:
|
||||
return False, f"Source returned status {x.status_code}."
|
||||
|
||||
#if content is image, stick with that. Otherwise, parse html.
|
||||
|
||||
if x.headers.get("Content-Type","").startswith("text/html"):
|
||||
#parse html, find image, load image
|
||||
soup=BeautifulSoup(x.content, 'html.parser')
|
||||
#parse html
|
||||
|
||||
#create list of urls to check
|
||||
thumb_candidate_urls=[]
|
||||
|
||||
#iterate through desired meta tags
|
||||
meta_tags = [
|
||||
"twitter:image",
|
||||
"og:image",
|
||||
"thumbnail"
|
||||
]
|
||||
|
||||
for tag_name in meta_tags:
|
||||
|
||||
|
||||
|
||||
tag = soup.find(
|
||||
'meta',
|
||||
attrs={
|
||||
"name": tag_name,
|
||||
"content": True
|
||||
}
|
||||
)
|
||||
if not tag:
|
||||
tag = soup.find(
|
||||
'meta',
|
||||
attrs={
|
||||
'property': tag_name,
|
||||
'content': True
|
||||
}
|
||||
)
|
||||
if tag:
|
||||
thumb_candidate_urls.append(expand_url(post.url, tag['content']))
|
||||
|
||||
#parse html doc for <img> elements
|
||||
for tag in soup.find_all("img", attrs={'src':True}):
|
||||
thumb_candidate_urls.append(expand_url(post.url, tag['src']))
|
||||
|
||||
|
||||
#now we have a list of candidate urls to try
|
||||
for url in thumb_candidate_urls:
|
||||
|
||||
try:
|
||||
image_req=requests.get(url, headers=headers)
|
||||
except:
|
||||
continue
|
||||
|
||||
if image_req.status_code >= 400:
|
||||
continue
|
||||
|
||||
if not image_req.headers.get("Content-Type","").startswith("image/"):
|
||||
continue
|
||||
|
||||
if image_req.headers.get("Content-Type","").startswith("image/svg"):
|
||||
continue
|
||||
|
||||
image = PILimage.open(BytesIO(image_req.content))
|
||||
if image.width < 30 or image.height < 30:
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
else:
|
||||
#getting here means we are out of candidate urls (or there never were any)
|
||||
return False, "No usable images"
|
||||
|
||||
|
||||
|
||||
|
||||
elif x.headers.get("Content-Type","").startswith("image/"):
|
||||
#image is originally loaded fetch_url
|
||||
image_req=x
|
||||
image = PILimage.open(BytesIO(x.content))
|
||||
|
||||
else:
|
||||
print(f'Unknown content type {x.headers.get("Content-Type")}')
|
||||
return False, f'Unknown content type {x.headers.get("Content-Type")} for submitted content'
|
||||
|
||||
with open("image.webp", "wb") as file:
|
||||
for chunk in image_req.iter_content(1024):
|
||||
file.write(chunk)
|
||||
new_post.thumburl = upload_ibb(resize=True)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
notify_users = set()
|
||||
|
||||
|
|
Loading…
Reference in New Issue