Upload files to "/"
parent
eef1b25d34
commit
3fe8fb9cc6
|
@ -0,0 +1,219 @@
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import requests
|
||||||
|
from datetime import datetime
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.table import Table
|
||||||
|
from rich import box
|
||||||
|
|
||||||
|
class MultiTool:
|
||||||
|
def __init__(self, notes_file="notes.txt", logs_file="logs.txt", ip_storage_file="ip_storage.txt"):
|
||||||
|
self.notes_file = notes_file
|
||||||
|
self.logs_file = logs_file
|
||||||
|
self.ip_storage_file = ip_storage_file
|
||||||
|
self.console = Console()
|
||||||
|
self._initialize_files()
|
||||||
|
|
||||||
|
def _initialize_files(self):
|
||||||
|
open(self.notes_file, 'a').close()
|
||||||
|
open(self.logs_file, 'a').close()
|
||||||
|
open(self.ip_storage_file, 'a').close()
|
||||||
|
|
||||||
|
def get_user_ip(self):
|
||||||
|
ssh_client_info = os.getenv("SSH_CLIENT")
|
||||||
|
if ssh_client_info:
|
||||||
|
return ssh_client_info.split()[0]
|
||||||
|
return "localhost"
|
||||||
|
|
||||||
|
def get_timestamp(self):
|
||||||
|
return datetime.now().strftime("%m/%d/%Y %I:%M %p")
|
||||||
|
|
||||||
|
def write_note(self, username, content):
|
||||||
|
with open(self.notes_file, 'a') as f:
|
||||||
|
f.write(f"{self.get_timestamp()} - {username} - {content}\n")
|
||||||
|
self._log_action(username, "Added note")
|
||||||
|
|
||||||
|
def view_notes(self, username):
|
||||||
|
with open(self.notes_file, 'r') as f:
|
||||||
|
notes = f.readlines()
|
||||||
|
self.console.print("Notes:")
|
||||||
|
for note in notes:
|
||||||
|
self.console.print(note.strip())
|
||||||
|
self._log_action(username, "Viewed notes")
|
||||||
|
|
||||||
|
def add_log(self, username, content):
|
||||||
|
with open(self.logs_file, 'a') as f:
|
||||||
|
f.write(f"{self.get_timestamp()} - {username} - IP: {self.get_user_ip()} - {content}\n")
|
||||||
|
self.console.print("Log added.")
|
||||||
|
|
||||||
|
def view_logs(self, username):
|
||||||
|
with open(self.logs_file, 'r') as f:
|
||||||
|
logs = f.readlines()
|
||||||
|
self.console.print("Logs:")
|
||||||
|
for log in logs:
|
||||||
|
self.console.print(log.strip())
|
||||||
|
self._log_action(username, "Viewed logs")
|
||||||
|
|
||||||
|
def _log_action(self, username, action):
|
||||||
|
with open(self.logs_file, 'a') as f:
|
||||||
|
f.write(f"{self.get_timestamp()} - {username} - IP: {self.get_user_ip()} - {action}\n")
|
||||||
|
|
||||||
|
def parse_auth_log(self):
|
||||||
|
auth_log = "/var/log/auth.log"
|
||||||
|
successful_logins = []
|
||||||
|
failed_logins = []
|
||||||
|
|
||||||
|
success_pattern = re.compile(r"(\w+\s+\d+ \d+:\d+:\d+) .*sshd.*Accepted.*for (\w+) from ([\d.]+)")
|
||||||
|
failure_pattern = re.compile(r"(\w+\s+\d+ \d+:\d+:\d+) .*sshd.*Failed.*for (\w+) from ([\d.]+)")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(auth_log, 'r') as log_file:
|
||||||
|
for line in log_file:
|
||||||
|
success_match = success_pattern.search(line)
|
||||||
|
if success_match:
|
||||||
|
date_str, user, ip = success_match.groups()
|
||||||
|
timestamp = datetime.strptime(date_str, "%b %d %H:%M:%S").replace(year=datetime.now().year)
|
||||||
|
successful_logins.append({"timestamp": timestamp, "user": user, "ip": ip})
|
||||||
|
|
||||||
|
failure_match = failure_pattern.search(line)
|
||||||
|
if failure_match:
|
||||||
|
date_str, user, ip = failure_match.groups()
|
||||||
|
timestamp = datetime.strptime(date_str, "%b %d %H:%M:%S").replace(year=datetime.now().year)
|
||||||
|
failed_logins.append({"timestamp": timestamp, "user": user, "ip": ip})
|
||||||
|
except PermissionError:
|
||||||
|
self.console.print("[bold red]Permission denied: Run the script as root or with sudo privileges.[/bold red]")
|
||||||
|
return [], []
|
||||||
|
|
||||||
|
return successful_logins, failed_logins
|
||||||
|
|
||||||
|
def display_ssh_logins(self):
|
||||||
|
successful_logins, failed_logins = self.parse_auth_log()
|
||||||
|
if not successful_logins and not failed_logins:
|
||||||
|
self.console.print("[bold yellow]No SSH login attempts found in the log.[/bold yellow]")
|
||||||
|
return
|
||||||
|
|
||||||
|
success_table = Table(title="Successful SSH Logins", box=box.MINIMAL_DOUBLE_HEAD)
|
||||||
|
success_table.add_column("Timestamp", justify="center", style="green")
|
||||||
|
success_table.add_column("Username", justify="center", style="cyan")
|
||||||
|
success_table.add_column("IP Address", justify="center", style="magenta")
|
||||||
|
|
||||||
|
for login in successful_logins:
|
||||||
|
success_table.add_row(
|
||||||
|
login["timestamp"].strftime("%m/%d/%Y %I:%M %p"),
|
||||||
|
login["user"],
|
||||||
|
login["ip"]
|
||||||
|
)
|
||||||
|
|
||||||
|
failed_table = Table(title="Failed SSH Logins", box=box.MINIMAL_DOUBLE_HEAD)
|
||||||
|
failed_table.add_column("Timestamp", justify="center", style="red")
|
||||||
|
failed_table.add_column("Username", justify="center", style="yellow")
|
||||||
|
failed_table.add_column("IP Address", justify="center", style="magenta")
|
||||||
|
|
||||||
|
for login in failed_logins:
|
||||||
|
failed_table.add_row(
|
||||||
|
login["timestamp"].strftime("%m/%d/%Y %I:%M %p"),
|
||||||
|
login["user"],
|
||||||
|
login["ip"]
|
||||||
|
)
|
||||||
|
|
||||||
|
self.console.print(success_table)
|
||||||
|
self.console.print(failed_table)
|
||||||
|
|
||||||
|
def get_ip_info(self, ip):
|
||||||
|
"""Fetch ISP, organization, hosting, mobile, and proxy details from ip-api.com."""
|
||||||
|
try:
|
||||||
|
response = requests.get(f"http://ip-api.com/json/{ip}")
|
||||||
|
data = response.json()
|
||||||
|
if data["status"] == "success":
|
||||||
|
isp = data.get("isp", "Unknown")
|
||||||
|
org = data.get("org", "Unknown")
|
||||||
|
hosting = data.get("hosting", "Unknown")
|
||||||
|
mobile = data.get("mobile", "Unknown")
|
||||||
|
proxy = data.get("proxy", "Unknown")
|
||||||
|
return isp, org, hosting, mobile, proxy
|
||||||
|
else:
|
||||||
|
return "Unknown", "Unknown", "Unknown", "Unknown", "Unknown"
|
||||||
|
except Exception as e:
|
||||||
|
self.console.print(f"[bold red]Failed to fetch IP info: {e}[/bold red]")
|
||||||
|
return "Unknown", "Unknown", "Unknown", "Unknown", "Unknown"
|
||||||
|
|
||||||
|
def add_ip_record(self, username, ip):
|
||||||
|
"""Add a new IP record with associated username, automatically fetching ISP, Organization, Hosting, Mobile, and Proxy."""
|
||||||
|
isp, org, hosting, mobile, proxy = self.get_ip_info(ip)
|
||||||
|
with open(self.ip_storage_file, 'a') as f:
|
||||||
|
f.write(f"{self.get_timestamp()} - Username: {username} - IP: {ip} - ISP: {isp} - Organization: {org} - Hosting: {hosting} - Mobile: {mobile} - Proxy: {proxy}\n")
|
||||||
|
self.console.print("IP record added with ISP, Organization, Hosting, Mobile, and Proxy details.")
|
||||||
|
|
||||||
|
def view_ip_records(self):
|
||||||
|
"""View all stored IP records."""
|
||||||
|
with open(self.ip_storage_file, 'r') as f:
|
||||||
|
ip_records = f.readlines()
|
||||||
|
|
||||||
|
ip_table = Table(title="Stored IP Records", box=box.MINIMAL_DOUBLE_HEAD)
|
||||||
|
ip_table.add_column("Timestamp", justify="center", style="green")
|
||||||
|
ip_table.add_column("Username", justify="center", style="cyan")
|
||||||
|
ip_table.add_column("IP Address", justify="center", style="magenta")
|
||||||
|
ip_table.add_column("ISP", justify="center", style="yellow")
|
||||||
|
ip_table.add_column("Organization", justify="center", style="blue")
|
||||||
|
ip_table.add_column("Hosting", justify="center", style="yellow")
|
||||||
|
ip_table.add_column("Mobile", justify="center", style="magenta")
|
||||||
|
ip_table.add_column("Proxy", justify="center", style="red")
|
||||||
|
|
||||||
|
for record in ip_records:
|
||||||
|
if not record.strip():
|
||||||
|
continue # Skip empty lines
|
||||||
|
try:
|
||||||
|
parts = record.strip().split(" - ")
|
||||||
|
timestamp = parts[0].split(": ", 1)[1]
|
||||||
|
user = parts[1].split(": ", 1)[1]
|
||||||
|
ip = parts[2].split(": ", 1)[1]
|
||||||
|
isp = parts[3].split(": ", 1)[1]
|
||||||
|
org = parts[4].split(": ", 1)[1]
|
||||||
|
hosting = parts[5].split(": ", 1)[1]
|
||||||
|
mobile = parts[6].split(": ", 1)[1]
|
||||||
|
proxy = parts[7].split(": ", 1)[1]
|
||||||
|
ip_table.add_row(timestamp, user, ip, isp, org, hosting, mobile, proxy)
|
||||||
|
except IndexError:
|
||||||
|
# Skip lines that don't match the expected format
|
||||||
|
self.console.print(f"[bold yellow]Warning: Skipping malformed line:[/bold yellow] {record.strip()}")
|
||||||
|
|
||||||
|
self.console.print(ip_table)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
tool = MultiTool()
|
||||||
|
username = input("Enter your username: ")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
tool.console.print("\n--- Multi Tool ---", style="bold blue")
|
||||||
|
tool.console.print("1. Write a note")
|
||||||
|
tool.console.print("2. View notes")
|
||||||
|
tool.console.print("3. Add a log")
|
||||||
|
tool.console.print("4. View logs")
|
||||||
|
tool.console.print("5. View SSH login attempts")
|
||||||
|
tool.console.print("6. Add IP record")
|
||||||
|
tool.console.print("7. View IP records")
|
||||||
|
tool.console.print("8. Exit")
|
||||||
|
|
||||||
|
choice = input("Choose an option: ")
|
||||||
|
if choice == "1":
|
||||||
|
content = input("Enter the note content: ")
|
||||||
|
tool.write_note(username, content)
|
||||||
|
elif choice == "2":
|
||||||
|
tool.view_notes(username)
|
||||||
|
elif choice == "3":
|
||||||
|
content = input("Enter log content: ")
|
||||||
|
tool.add_log(username, content)
|
||||||
|
elif choice == "4":
|
||||||
|
tool.view_logs(username)
|
||||||
|
elif choice == "5":
|
||||||
|
tool.display_ssh_logins()
|
||||||
|
elif choice == "6":
|
||||||
|
ip = input("Enter IP address to record: ")
|
||||||
|
tool.add_ip_record(username, ip)
|
||||||
|
elif choice == "7":
|
||||||
|
tool.view_ip_records()
|
||||||
|
elif choice == "8":
|
||||||
|
tool.console.print("[bold green]Exiting...[/bold green]")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
tool.console.print("[bold red]Invalid choice, please try again.[/bold red]")
|
Loading…
Reference in New Issue