346 lines
14 KiB
Python
346 lines
14 KiB
Python
import requests
|
|
import socket
|
|
from loguru import logger
|
|
from pythorhead import Lemmy
|
|
from fediseer.consts import FEDISEER_VERSION
|
|
import fediseer.exceptions as e
|
|
|
|
class InstanceInfo():
|
|
|
|
domain = None
|
|
node_info = None
|
|
instance_info = None
|
|
admin_usernames = set()
|
|
software = None
|
|
version = None
|
|
open_registrations = None
|
|
approval_required = None
|
|
email_verify = None
|
|
has_captcha = None
|
|
_allow_unreachable = False
|
|
_req_timeout = 5
|
|
_nodeinfo_err: Exception = None
|
|
_siteinfo_err: Exception = None
|
|
|
|
def __init__(self, domain, allow_unreachable=False, req_timeout=5):
|
|
self.domain = domain
|
|
self._allow_unreachable = allow_unreachable
|
|
self._req_timeout = req_timeout
|
|
if domain.endswith("test.dbzer0.com"):
|
|
# Fake instances for testing chain of trust
|
|
self.open_registrations = False
|
|
self.approval_required = False
|
|
self.email_verify = True
|
|
self.has_captcha = True
|
|
self.software = "lemmy"
|
|
self.version = "0.19.3"
|
|
self.admin_usernames = {"db0"}
|
|
self.node_info = InstanceInfo.get_nodeinfo("lemmy.dbzer0.com")
|
|
self.instance_info = {}
|
|
return
|
|
if domain == "fediseer.com":
|
|
self.open_registrations = False
|
|
self.approval_required = False
|
|
self.email_verify = False
|
|
self.has_captcha = False
|
|
self.software = "fediseer"
|
|
self.version = FEDISEER_VERSION
|
|
self.admin_usernames = {"fediseer"}
|
|
self.node_info = {}
|
|
self.instance_info = {}
|
|
return
|
|
|
|
try:
|
|
self.node_info = InstanceInfo.get_nodeinfo(domain,req_timeout=self._req_timeout)
|
|
except Exception as err:
|
|
self._nodeinfo_err = err
|
|
|
|
def get_instance_info(self):
|
|
try:
|
|
self.parse_instance_info()
|
|
except Exception as err:
|
|
self._siteinfo_err = err
|
|
# This is just to report for the error message
|
|
if self.software is not None:
|
|
sw = self.software
|
|
else:
|
|
sw = 'unknown'
|
|
if not self._allow_unreachable:
|
|
logger.error(f"Error retrieving {sw} site info for {self.domain}: {err}")
|
|
raise Exception(f"Error retrieving {sw} site info for {self.domain}: {err}")
|
|
try:
|
|
self.retrieve_admins()
|
|
except:
|
|
pass
|
|
|
|
def get_lemmy_admins(self):
|
|
self.admin_usernames = set([a["person"]["name"] for a in self.instance_info["admins"]])
|
|
|
|
def get_mastodon_admins(self):
|
|
if "contact_account" in self.instance_info: # New API
|
|
if "username" not in self.instance_info["contact_account"]:
|
|
raise Exception(f"No admin contact is specified for {self.domain}.")
|
|
self.admin_usernames = {self.instance_info["contact_account"]["username"]}
|
|
elif "contact" in self.instance_info: # Old API
|
|
if "account" not in self.instance_info["contact"]:
|
|
raise Exception(f"No admin contact is specified for {self.domain}.")
|
|
self.admin_usernames = {self.instance_info["contact"]["account"]["username"]}
|
|
else:
|
|
raise Exception(f"Could not determine admin contacts for {self.domain}.")
|
|
|
|
def get_misskey_admins(self):
|
|
site_users = None
|
|
users_json = None
|
|
offset = 0
|
|
admins_found = set()
|
|
while users_json is None or len(users_json) != 0 and offset < 500:
|
|
payload = {
|
|
"limit": 10,
|
|
"offset": offset,
|
|
"sort": "+createdAt",
|
|
"state": "alive",
|
|
"origin": "local",
|
|
"hostname": None
|
|
}
|
|
site_users = requests.post(f"https://{self.domain}/api/users", json=payload)
|
|
users_json = site_users.json()
|
|
for user_entry in users_json:
|
|
if user_entry.get("isAdmin") is True:
|
|
admins_found.add(user_entry["username"])
|
|
for role in user_entry.get("roles",[]):
|
|
if role.get("isAdministrator") is True:
|
|
admins_found.add(user_entry["username"])
|
|
offset += 10
|
|
if len(admins_found) == 0:
|
|
raise Exception(f"No admin contact is specified for {self.domain}.")
|
|
self.admin_usernames = admins_found
|
|
|
|
def get_pleroma_admins(self):
|
|
if "staffAccounts" not in self.node_info["metadata"] or len(self.node_info["metadata"]["staffAccounts"]) == 0:
|
|
raise Exception(f"No admin contact is specified for {self.domain}.")
|
|
for staff in self.node_info["metadata"]["staffAccounts"]:
|
|
self.admin_usernames.add(staff.split('/')[-1])
|
|
|
|
def discover_admins(self):
|
|
try:
|
|
self.get_mastodon_admins()
|
|
return
|
|
except:
|
|
pass
|
|
try:
|
|
self.get_lemmy_admins()
|
|
return
|
|
except:
|
|
pass
|
|
try:
|
|
self.get_pleroma_admins()
|
|
return
|
|
except:
|
|
pass
|
|
try:
|
|
self.get_misskey_admins()
|
|
return
|
|
except:
|
|
pass
|
|
logger.warning(f"Site software '{self.software} does not match any of the known APIs")
|
|
raise Exception(f"Site software '{self.software} does not match any of the known APIs")
|
|
|
|
def get_unknown_admins(self):
|
|
return []
|
|
|
|
def retrieve_admins(self):
|
|
software_map = {
|
|
"lemmy": self.get_lemmy_admins,
|
|
"mastodon": self.get_mastodon_admins,
|
|
"friendica": self.get_mastodon_admins,
|
|
"pleroma": self.get_pleroma_admins,
|
|
"akkoma": self.get_pleroma_admins,
|
|
"misskey": self.get_misskey_admins,
|
|
"firefish": self.get_mastodon_admins,
|
|
"iceshrimp": self.get_mastodon_admins,
|
|
"mitra": self.get_mastodon_admins,
|
|
"unknown": self.get_unknown_admins,
|
|
"wildcard": self.get_unknown_admins,
|
|
}
|
|
if self.software not in software_map:
|
|
self.discover_admins()
|
|
else:
|
|
software_map[self.software]()
|
|
|
|
def get_lemmy_info(self):
|
|
requested_lemmy = Lemmy(f"https://{self.domain}")
|
|
self.instance_info = requested_lemmy.site.get()
|
|
if not self.instance_info:
|
|
raise Exception(f"Error encountered while polling lemmy domain. Please check it's running correctly")
|
|
self.open_registrations = self.instance_info["site_view"]["local_site"]["registration_mode"] == "open"
|
|
self.email_verify = self.instance_info["site_view"]["local_site"]["require_email_verification"]
|
|
self.approval_required = self.instance_info["site_view"]["local_site"]["registration_mode"] == "RequireApplication"
|
|
self.has_captcha = self.instance_info["site_view"]["local_site"]["captcha_enabled"]
|
|
|
|
def get_mastodon_info(self):
|
|
site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout)
|
|
try:
|
|
self.instance_info = site.json()
|
|
except Exception as err:
|
|
if "challenge-error-text" in site.text:
|
|
raise Exception("Instance is preventing scripted retrieval of their site info.")
|
|
raise err
|
|
self.approval_required = self.instance_info["approval_required"]
|
|
if self.node_info is None:
|
|
raise Exception("Error retrieving nodeinfo")
|
|
self.open_registrations = self.node_info["openRegistrations"]
|
|
self.email_verify = None
|
|
self.has_captcha = None
|
|
|
|
def get_pleroma_info(self):
|
|
site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout)
|
|
try:
|
|
self.instance_info = site.json()
|
|
except Exception as err:
|
|
if "challenge-error-text" in site.text:
|
|
raise Exception("Instance is preventing scripted retrieval of their site info.")
|
|
raise err
|
|
self.approval_required = self.instance_info["approval_required"]
|
|
if self.node_info is None:
|
|
raise Exception("Error retrieving nodeinfo")
|
|
self.open_registrations = self.node_info["openRegistrations"]
|
|
self.email_verify = None
|
|
self.has_captcha = None
|
|
|
|
def get_firefish_info(self):
|
|
site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout)
|
|
try:
|
|
self.instance_info = site.json()
|
|
except Exception as err:
|
|
if "challenge-error-text" in site.text:
|
|
raise Exception("Instance is preventing scripted retrieval of their site info.")
|
|
raise err
|
|
self.approval_required = self.instance_info["approval_required"]
|
|
if self.node_info is None:
|
|
raise Exception("Error retrieving nodeinfo")
|
|
self.open_registrations = self.node_info["openRegistrations"]
|
|
self.email_verify = self.node_info["metadata"]["emailRequiredForSignup"]
|
|
self.has_captcha = self.node_info["metadata"]["enableHcaptcha"] is True or self.node_info["metadata"]["enableRecaptcha"] is True
|
|
|
|
def get_unknown_info(self):
|
|
if self.node_info is not None:
|
|
self.open_registrations = self.node_info.get("openRegistrations", False)
|
|
|
|
def discover_info(self):
|
|
# Mastodon API
|
|
site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout,allow_redirects=False)
|
|
if site.status_code != 200:
|
|
raise Exception(f"Unexpected status code retrieved when discovering instance info: {site.status_code}")
|
|
try:
|
|
self.instance_info = site.json()
|
|
except Exception as err:
|
|
if "challenge-error-text" in site.text:
|
|
raise Exception("Instance is preventing scripted retrieval of their site info.")
|
|
raise err
|
|
self.approval_required = self.instance_info.get("approval_required")
|
|
if self.node_info is None:
|
|
raise Exception("Error retrieving nodeinfo")
|
|
self.open_registrations = self.node_info.get("openRegistrations")
|
|
# Only firefish and lemmy report the next two
|
|
if "metadata" in self.node_info:
|
|
self.email_verify = self.node_info["metadata"].get("emailRequiredForSignup")
|
|
self.has_captcha = None
|
|
if self.node_info["metadata"].get("enableHcaptcha") is True or self.node_info.get("enableRecaptcha") is True:
|
|
self.has_captcha = True
|
|
|
|
|
|
def parse_instance_info(self):
|
|
if self.domain == "fediseer.com":
|
|
return
|
|
if not self.node_info:
|
|
if self._allow_unreachable:
|
|
self.software = "unknown"
|
|
self.version = "unknown"
|
|
if "*" in self.domain:
|
|
self.software = "wildcard"
|
|
else:
|
|
self.software = self.node_info["software"]["name"].lower()
|
|
self.version = self.node_info["software"].get("version","unknown")
|
|
software_map = {
|
|
"lemmy": self.get_lemmy_info,
|
|
"mastodon": self.get_mastodon_info,
|
|
"friendica": self.get_mastodon_info,
|
|
"pleroma": self.get_pleroma_info,
|
|
"akkoma": self.get_pleroma_info,
|
|
"firefish": self.get_firefish_info,
|
|
"iceshrimp": self.get_firefish_info,
|
|
"mitra": self.get_firefish_info,
|
|
"unknown": self.get_unknown_info,
|
|
"wildcard": self.get_unknown_info,
|
|
# Instance info not supported for misskey yet
|
|
"misskey": self.get_unknown_info,
|
|
}
|
|
if self.software not in software_map:
|
|
self.discover_info()
|
|
else:
|
|
software_map[self.software]()
|
|
|
|
def is_admin(self, user):
|
|
admin = user in self.admin_usernames
|
|
|
|
if not admin and self.software == "firefish":
|
|
payload = {
|
|
"username": user
|
|
}
|
|
user_info = requests.post(f"https://{self.domain}/api/users/show", timeout=self._req_timeout, json=payload).json()
|
|
admin = user_info.get('isAdmin', False)
|
|
if admin:
|
|
self.admin_usernames.add(user)
|
|
|
|
return admin
|
|
|
|
@staticmethod
|
|
def get_nodeinfo(domain, req_timeout=3):
|
|
headers = {
|
|
"Sec-Fetch-Dest": "document",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Sec-Fetch-Site": "none",
|
|
"Sec-Fetch-User": "?1",
|
|
"Sec-GPC": "1",
|
|
"User-Agent": f"Fediseer/{FEDISEER_VERSION}",
|
|
}
|
|
wellknown = requests.get(f"https://{domain}/.well-known/nodeinfo", headers=headers, timeout=req_timeout).json()
|
|
headers["Sec-Fetch-Site"] = "cross-site"
|
|
nodeinfo = requests.get(wellknown['links'][-1]['href'], headers=headers, timeout=req_timeout).json()
|
|
return nodeinfo
|
|
|
|
@staticmethod
|
|
def is_reachable(domain, req_timeout=5):
|
|
# Attempts to check if we can even reach the frontpage of the domain
|
|
# so that we know if it's an issue reaching the nodeinfo, or a problem of reaching the domain
|
|
logger.debug(domain)
|
|
req = requests.get(f"https://{domain}", timeout=req_timeout, allow_redirects=False)
|
|
logger.debug(req.status_code)
|
|
if req.status_code not in [200,401,403]:
|
|
raise Exception(f"Status code unexpected for instance frontpage: {req.status_code}")
|
|
|
|
def domain_exists(self):
|
|
try:
|
|
socket.gethostbyname(self.domain)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
# # Debug
|
|
# ii = InstanceInfo("outpoa.st")
|
|
# if ii.domain_exists():
|
|
# ii.get_instance_info()
|
|
# logger.info([
|
|
# ii.software,
|
|
# ii.open_registrations,
|
|
# ii.approval_required,
|
|
# ii.email_verify,
|
|
# ii.has_captcha,
|
|
# ii.admin_usernames,
|
|
# ])
|
|
# else:
|
|
# logger.error("Domain does not exist")
|
|
# import sys
|
|
# sys.exit()
|