import requests import socket from loguru import logger from pythorhead import Lemmy from fediseer.consts import FEDISEER_VERSION import fediseer.exceptions as e class InstanceInfo(): domain = None node_info = None instance_info = None admin_usernames = set() software = None version = None open_registrations = None approval_required = None email_verify = None has_captcha = None _allow_unreachable = False _req_timeout = 5 _nodeinfo_err: Exception = None _siteinfo_err: Exception = None def __init__(self, domain, allow_unreachable=False, req_timeout=5): self.domain = domain self._allow_unreachable = allow_unreachable self._req_timeout = req_timeout if domain.endswith("test.dbzer0.com"): # Fake instances for testing chain of trust self.open_registrations = False self.approval_required = False self.email_verify = True self.has_captcha = True self.software = "lemmy" self.version = "0.19.3" self.admin_usernames = {"db0"} self.node_info = InstanceInfo.get_nodeinfo("lemmy.dbzer0.com") self.instance_info = {} return if domain == "fediseer.com": self.open_registrations = False self.approval_required = False self.email_verify = False self.has_captcha = False self.software = "fediseer" self.version = FEDISEER_VERSION self.admin_usernames = {"fediseer"} self.node_info = {} self.instance_info = {} return try: self.node_info = InstanceInfo.get_nodeinfo(domain,req_timeout=self._req_timeout) except Exception as err: self._nodeinfo_err = err def get_instance_info(self): try: self.parse_instance_info() except Exception as err: self._siteinfo_err = err # This is just to report for the error message if self.software is not None: sw = self.software else: sw = 'unknown' if not self._allow_unreachable: logger.error(f"Error retrieving {sw} site info for {self.domain}: {err}") raise Exception(f"Error retrieving {sw} site info for {self.domain}: {err}") try: self.retrieve_admins() except: pass def get_lemmy_admins(self): self.admin_usernames = set([a["person"]["name"] for a in self.instance_info["admins"]]) def get_mastodon_admins(self): if "contact_account" in self.instance_info: # New API if "username" not in self.instance_info["contact_account"]: raise Exception(f"No admin contact is specified for {self.domain}.") self.admin_usernames = {self.instance_info["contact_account"]["username"]} elif "contact" in self.instance_info: # Old API if "account" not in self.instance_info["contact"]: raise Exception(f"No admin contact is specified for {self.domain}.") self.admin_usernames = {self.instance_info["contact"]["account"]["username"]} else: raise Exception(f"Could not determine admin contacts for {self.domain}.") def get_misskey_admins(self): site_users = None users_json = None offset = 0 admins_found = set() while users_json is None or len(users_json) != 0 and offset < 500: payload = { "limit": 10, "offset": offset, "sort": "+createdAt", "state": "alive", "origin": "local", "hostname": None } site_users = requests.post(f"https://{self.domain}/api/users", json=payload) users_json = site_users.json() for user_entry in users_json: if user_entry.get("isAdmin") is True: admins_found.add(user_entry["username"]) for role in user_entry.get("roles",[]): if role.get("isAdministrator") is True: admins_found.add(user_entry["username"]) offset += 10 if len(admins_found) == 0: raise Exception(f"No admin contact is specified for {self.domain}.") self.admin_usernames = admins_found def get_pleroma_admins(self): if "staffAccounts" not in self.node_info["metadata"] or len(self.node_info["metadata"]["staffAccounts"]) == 0: raise Exception(f"No admin contact is specified for {self.domain}.") for staff in self.node_info["metadata"]["staffAccounts"]: self.admin_usernames.add(staff.split('/')[-1]) def discover_admins(self): try: self.get_mastodon_admins() return except: pass try: self.get_lemmy_admins() return except: pass try: self.get_pleroma_admins() return except: pass try: self.get_misskey_admins() return except: pass logger.warning(f"Site software '{self.software} does not match any of the known APIs") raise Exception(f"Site software '{self.software} does not match any of the known APIs") def get_unknown_admins(self): return [] def retrieve_admins(self): software_map = { "lemmy": self.get_lemmy_admins, "mastodon": self.get_mastodon_admins, "friendica": self.get_mastodon_admins, "pleroma": self.get_pleroma_admins, "akkoma": self.get_pleroma_admins, "misskey": self.get_misskey_admins, "firefish": self.get_mastodon_admins, "iceshrimp": self.get_mastodon_admins, "mitra": self.get_mastodon_admins, "unknown": self.get_unknown_admins, "wildcard": self.get_unknown_admins, } if self.software not in software_map: self.discover_admins() else: software_map[self.software]() def get_lemmy_info(self): requested_lemmy = Lemmy(f"https://{self.domain}") self.instance_info = requested_lemmy.site.get() if not self.instance_info: raise Exception(f"Error encountered while polling lemmy domain. Please check it's running correctly") self.open_registrations = self.instance_info["site_view"]["local_site"]["registration_mode"] == "open" self.email_verify = self.instance_info["site_view"]["local_site"]["require_email_verification"] self.approval_required = self.instance_info["site_view"]["local_site"]["registration_mode"] == "RequireApplication" self.has_captcha = self.instance_info["site_view"]["local_site"]["captcha_enabled"] def get_mastodon_info(self): site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout) try: self.instance_info = site.json() except Exception as err: if "challenge-error-text" in site.text: raise Exception("Instance is preventing scripted retrieval of their site info.") raise err self.approval_required = self.instance_info["approval_required"] if self.node_info is None: raise Exception("Error retrieving nodeinfo") self.open_registrations = self.node_info["openRegistrations"] self.email_verify = None self.has_captcha = None def get_pleroma_info(self): site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout) try: self.instance_info = site.json() except Exception as err: if "challenge-error-text" in site.text: raise Exception("Instance is preventing scripted retrieval of their site info.") raise err self.approval_required = self.instance_info["approval_required"] if self.node_info is None: raise Exception("Error retrieving nodeinfo") self.open_registrations = self.node_info["openRegistrations"] self.email_verify = None self.has_captcha = None def get_firefish_info(self): site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout) try: self.instance_info = site.json() except Exception as err: if "challenge-error-text" in site.text: raise Exception("Instance is preventing scripted retrieval of their site info.") raise err self.approval_required = self.instance_info["approval_required"] if self.node_info is None: raise Exception("Error retrieving nodeinfo") self.open_registrations = self.node_info["openRegistrations"] self.email_verify = self.node_info["metadata"]["emailRequiredForSignup"] self.has_captcha = self.node_info["metadata"]["enableHcaptcha"] is True or self.node_info["metadata"]["enableRecaptcha"] is True def get_unknown_info(self): if self.node_info is not None: self.open_registrations = self.node_info.get("openRegistrations", False) def discover_info(self): # Mastodon API site = requests.get(f"https://{self.domain}/api/v1/instance",timeout=self._req_timeout,allow_redirects=False) if site.status_code != 200: raise Exception(f"Unexpected status code retrieved when discovering instance info: {site.status_code}") try: self.instance_info = site.json() except Exception as err: if "challenge-error-text" in site.text: raise Exception("Instance is preventing scripted retrieval of their site info.") raise err self.approval_required = self.instance_info.get("approval_required") if self.node_info is None: raise Exception("Error retrieving nodeinfo") self.open_registrations = self.node_info.get("openRegistrations") # Only firefish and lemmy report the next two if "metadata" in self.node_info: self.email_verify = self.node_info["metadata"].get("emailRequiredForSignup") self.has_captcha = None if self.node_info["metadata"].get("enableHcaptcha") is True or self.node_info.get("enableRecaptcha") is True: self.has_captcha = True def parse_instance_info(self): if self.domain == "fediseer.com": return if not self.node_info: if self._allow_unreachable: self.software = "unknown" self.version = "unknown" if "*" in self.domain: self.software = "wildcard" else: self.software = self.node_info["software"]["name"].lower() self.version = self.node_info["software"].get("version","unknown") software_map = { "lemmy": self.get_lemmy_info, "mastodon": self.get_mastodon_info, "friendica": self.get_mastodon_info, "pleroma": self.get_pleroma_info, "akkoma": self.get_pleroma_info, "firefish": self.get_firefish_info, "iceshrimp": self.get_firefish_info, "mitra": self.get_firefish_info, "unknown": self.get_unknown_info, "wildcard": self.get_unknown_info, # Instance info not supported for misskey yet "misskey": self.get_unknown_info, } if self.software not in software_map: self.discover_info() else: software_map[self.software]() def is_admin(self, user): admin = user in self.admin_usernames if not admin and self.software == "firefish": payload = { "username": user } user_info = requests.post(f"https://{self.domain}/api/users/show", timeout=self._req_timeout, json=payload).json() admin = user_info.get('isAdmin', False) if admin: self.admin_usernames.add(user) return admin @staticmethod def get_nodeinfo(domain, req_timeout=3): headers = { "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Sec-GPC": "1", "User-Agent": f"Fediseer/{FEDISEER_VERSION}", } wellknown = requests.get(f"https://{domain}/.well-known/nodeinfo", headers=headers, timeout=req_timeout).json() headers["Sec-Fetch-Site"] = "cross-site" nodeinfo = requests.get(wellknown['links'][-1]['href'], headers=headers, timeout=req_timeout).json() return nodeinfo @staticmethod def is_reachable(domain, req_timeout=5): # Attempts to check if we can even reach the frontpage of the domain # so that we know if it's an issue reaching the nodeinfo, or a problem of reaching the domain logger.debug(domain) req = requests.get(f"https://{domain}", timeout=req_timeout, allow_redirects=False) logger.debug(req.status_code) if req.status_code not in [200,401,403]: raise Exception(f"Status code unexpected for instance frontpage: {req.status_code}") def domain_exists(self): try: socket.gethostbyname(self.domain) return True except: return False # # Debug # ii = InstanceInfo("outpoa.st") # if ii.domain_exists(): # ii.get_instance_info() # logger.info([ # ii.software, # ii.open_registrations, # ii.approval_required, # ii.email_verify, # ii.has_captcha, # ii.admin_usernames, # ]) # else: # logger.error("Domain does not exist") # import sys # sys.exit()