initial commit

This commit is contained in:
Andre Pawlowski 2021-12-27 13:52:26 +01:00
parent f82f54ec2c
commit 224fa8185c
27 changed files with 2277 additions and 0 deletions

36
README.md Normal file
View file

@ -0,0 +1,36 @@
# Linux Security and Monitoring Scripts
These are a collection of security and monitoring scripts you can use to monitor your Linux installation for security-related events or for an investigation. Each script works on its own and is independent from other scripts. The scripts can be set up to either print out their results, send them to you via mail, or using [AlertR](https://github.com/sqall01/alertR) as notification channel.
## Repository Structure
The scripts are located in the directory `scripts/`. Each script contains a short summary in the header of the file with a description of what it is supposed to do, (if needed) dependencies that have to be installed and (if available) references to where the idea for this script stems from.
Each script has a configuration file in the `scripts/config/` directory to configure it. If the configuration file was not found during the execution of the script, the script will fall back to default settings and print out the results. Hence, it is not necessary to provide a configuration file.
The `scripts/lib/` directory contains code that is shared between different scripts.
Scripts using a `monitor_` prefix hold a state and are only useful for monitoring purposes. A single usage of them for an investigation will only result in showing the current state the Linux system and not changes that might be relevant for the system's security.
## Usage
Take a look at the header of the script you want to execute. It contains a short description what this script is supposed to do and what requirements are needed (if any needed at all). If requirements are needed, install them before running the script.
The shared configuration file `scripts/config/config.py` contains settings that are used by all scripts. Furthermore, each script can be configured by using the corresponding configuration file in the `scripts/config/` directory. If no configuration file was found, a default setting is used and the results are printed out.
Finally, you can run all configured scripts by executing `start_search.py` (which is located in the main directory) or by executing each script manually. A Python3 interpreter is needed to run the scripts.
## List of Scripts
| Name | Script |
|---|---|
| Monitoring /etc/hosts file | [monitor_hosts_file.py](scripts/monitor_hosts_file.py) |
| Monitoring /etc/ld.so.preload file | [monitor_ld_preload.py](scripts/monitor_ld_preload.py) |
| Monitoring /etc/passwd file | [monitor_passwd.py](scripts/monitor_passwd.py) |
| Monitoring SSH authorized_keys files | [monitor_ssh_authorized_keys.py](scripts/monitor_ssh_authorized_keys.py) |
| Search for executables in /dev/shm | [search_dev_shm.py](scripts/search_dev_shm.py) |
| Search immutable files | [search_immutable_files.py](scripts/search_immutable_files.py) |
| Search for fileless programs (memfd_create) | [search_memfd_create.py](scripts/search_memfd_create.py) |
| Search for kernel thread impersonations | [search_non_kthreads.py](scripts/search_non_kthreads.py) |
| Test script to check if alerting works | [test_alert.py](scripts/test_alert.py) |
| Verify integrity of installed .deb packages | [verify_deb_packages.py](scripts/verify_deb_packages.py) |

0
scripts/__init__.py Normal file
View file

View file

18
scripts/config/config.py Normal file
View file

@ -0,0 +1,18 @@
from typing import Optional
# NOTE: If no "AlertR alert settings" and "Mail alert settings" are set to
# None, each script will fall back to print its output.
# AlertR alert settings.
ALERTR_FIFO = None # type: Optional[str]
# Mail alert settings.
FROM_ADDR = None # type: Optional[str]
TO_ADDR = None # type: Optional[str]
# Directory to hold states in. Defaults to "/tmp" if not set.
STATE_DIR = "state"
# If "start_search.py" is used to execute all scripts, this setting configures
# the time in seconds before a script times out.
START_PROCESS_TIMEOUT = 60

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,21 @@
from typing import List
# List of directories to search for immutablle files. Defaults to "/".
SEARCH_LOCATIONS = [] # type: List[str]
# To prevent a timeout if this script is run regularly for monitoring,
# the search can be done in steps for each location given in SEARCH_LOCATIONS.
# Steps mean if you have location_A, the first execution of this script will
# process location_A non-recursively and terminates,
# the second execution will process location_A/subdir_A recursively and terminates,
# the third execution will process location_A/subdir_B recursively and terminates and so on.
SEARCH_IN_STEPS = False
# List of directories to ignore.
IMMUTABLE_DIRECTORY_WHITELIST = [] # type: List[str]
# List of immutable files to ignore.
IMMUTABLE_FILE_WHITELIST = [] # type: List[str]
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,7 @@
from typing import List
# List of process names that are ignored.
NON_KTHREAD_WHITELIST = [] # type: List[str]
# Is the script allowed to run or not?
ACTIVATED = True

View file

@ -0,0 +1,2 @@
# Is the script allowed to run or not?
ACTIVATED = False

View file

@ -0,0 +1,10 @@
from typing import List
# Executable of debsums.
DEBSUMS_EXE = "/usr/bin/debsums"
# List of changed deb package files to ignore.
FILE_WHITELIST = [] # type: List[str]
# Is the script allowed to run or not?
ACTIVATED = True

0
scripts/lib/__init__.py Normal file
View file

57
scripts/lib/alerts.py Normal file
View file

@ -0,0 +1,57 @@
import json
import smtplib
import os
import time
from typing import Dict, Any
def raise_alert_alertr(alertr_fifo: str,
optional_data_dict: Dict[str, Any]):
# Send message to AlertR.
msg_dict = dict()
msg_dict["message"] = "sensoralert"
payload_dict = dict()
payload_dict["state"] = 1
payload_dict["dataType"] = 0
payload_dict["data"] = {}
payload_dict["hasLatestData"] = False
payload_dict["changeState"] = False
payload_dict["hasOptionalData"] = True
payload_dict["optionalData"] = optional_data_dict
msg_dict["payload"] = payload_dict
for i in range(10):
try:
# Will throw an exception if FIFO file does not have a reader instead of blocking.
fd = os.open(alertr_fifo, os.O_WRONLY | os.O_NONBLOCK)
os.write(fd, json.dumps(msg_dict).encode("ascii"))
os.close(fd)
# Give AlertR sensor time to process the data.
# Otherwise a parsing error might occur on the FIFO sensor when multiple messages were mixed.
time.sleep(2)
break
except Exception:
time.sleep(5)
def raise_alert_mail(from_addr: str,
to_addr: str,
subject: str,
message: str):
email_header = "From: %s\r\nTo: %s\r\nSubject: %s\r\n" \
% (from_addr, to_addr, subject)
for i in range(10):
try:
smtp_server = smtplib.SMTP("127.0.0.1", 25)
smtp_server.sendmail(from_addr,
to_addr,
email_header + message)
smtp_server.quit()
break
except Exception:
time.sleep(5)

301
scripts/monitor_hosts_file.py Executable file
View file

@ -0,0 +1,301 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Monitor /etc/hosts for changes to detect malicious attempts to divert traffic.
Requirements:
None
"""
import os
import json
import socket
import stat
from typing import Dict, Set
from lib.alerts import raise_alert_alertr, raise_alert_mail
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR, STATE_DIR
from config.monitor_hosts_file import ACTIVATED
STATE_DIR = os.path.join(os.path.dirname(__file__), STATE_DIR, os.path.basename(__file__))
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
STATE_DIR = os.path.join("/tmp", os.path.basename(__file__))
MAIL_SUBJECT = "[Security] Monitoring /etc/hosts on host '%s'" % socket.gethostname()
class MonitorHostsException(Exception):
def __init__(self, msg: str):
self._msg = msg
def __str__(self):
return self._msg
def _get_hosts() -> Dict[str, Set[str]]:
hosts_data = {}
with open("/etc/hosts", 'rt') as fp:
for line in fp:
line = line.strip()
if line == "":
continue
# Ignore comments.
if line[0] == "#":
continue
entry = line.split()
if len(entry) < 2:
raise MonitorHostsException("Not able to parse line: %s" % line)
ip = entry[0]
hosts = set(entry[1:])
if ip not in hosts_data.keys():
hosts_data[ip] = hosts
else:
for host in hosts:
hosts_data[ip].add(host)
return hosts_data
def _load_hosts_data() -> Dict[str, Set[str]]:
state_file = os.path.join(STATE_DIR, "state")
hosts_data = {}
if os.path.isfile(state_file):
data = None
try:
with open(state_file, 'rt') as fp:
data = fp.read()
if data is None:
raise MonitorHostsException("Read state data is None.")
temp = json.loads(data)
# Convert list to set.
for k,v in temp.items():
hosts_data[k] = set(v)
except Exception as e:
raise MonitorHostsException("State data: '%s'; Exception: '%s'" % (str(data), str(e)))
return hosts_data
def _output_error(msg: str):
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if print_output:
print(msg)
else:
hostname = socket.gethostname()
message = "Error monitoring /etc/hosts on host '%s': %s" \
% (hostname, msg)
if ALERTR_FIFO:
optional_data = dict()
optional_data["error"] = True
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
def _store_hosts_data(hosts_data: Dict[str, Set[str]]):
# Create state dir if it does not exist.
if not os.path.exists(STATE_DIR):
os.makedirs(STATE_DIR)
state_file = os.path.join(STATE_DIR, "state")
# Convert set to list.
temp = {}
for k, v in hosts_data.items():
temp[k] = list(v)
with open(state_file, 'wt') as fp:
fp.write(json.dumps(temp))
os.chmod(state_file, stat.S_IREAD | stat.S_IWRITE)
def monitor_hosts():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
stored_hosts_data = {}
try:
stored_hosts_data =_load_hosts_data()
except Exception as e:
_output_error(str(e))
return
curr_hosts_data = {}
try:
curr_hosts_data = _get_hosts()
except Exception as e:
_output_error(str(e))
return
# Compare stored data with current one.
for stored_entry_ip in stored_hosts_data.keys():
# Extract current entry belonging to the same ip.
if stored_entry_ip not in curr_hosts_data.keys():
hostname = socket.gethostname()
message = "Host name for IP '%s' was deleted on host '%s'." \
% (stored_entry_ip, hostname)
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["ip"] = stored_entry_ip
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
continue
# Check host entry was removed.
for host in stored_hosts_data[stored_entry_ip]:
if host not in curr_hosts_data[stored_entry_ip]:
hostname = socket.gethostname()
message = "Host name entry for IP '%s' was removed on host '%s'.\n\n" % (stored_entry_ip, hostname)
message += "Entry: %s" % host
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["ip"] = stored_entry_ip
optional_data["host_entry"] = host
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
# Check host entry was added.
for host in curr_hosts_data[stored_entry_ip]:
if host not in stored_hosts_data[stored_entry_ip]:
hostname = socket.gethostname()
message = "Host name entry for IP '%s' was added on host '%s'.\n\n" % (stored_entry_ip, hostname)
message += "Entry: %s" % host
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["ip"] = stored_entry_ip
optional_data["host_entry"] = host
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
# Check new data was added.
for curr_entry_ip in curr_hosts_data.keys():
if curr_entry_ip not in stored_hosts_data.keys():
hostname = socket.gethostname()
message = "New host name was added for IP '%s' on host '%s'.\n\n" \
% (curr_entry_ip, hostname)
message += "Entries:\n"
for host in curr_hosts_data[curr_entry_ip]:
message += host
message += "\n"
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["ip"] = curr_entry_ip
optional_data["hosts"] = list(curr_hosts_data[curr_entry_ip])
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
try:
_store_hosts_data(curr_hosts_data)
except Exception as e:
_output_error(str(e))
if __name__ == '__main__':
monitor_hosts()

218
scripts/monitor_ld_preload.py Executable file
View file

@ -0,0 +1,218 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Monitor /etc/ld.so.preload for changes to detect malicious attempts to alter the control flow of binaries.
Requirements:
None
"""
import os
import json
import socket
import stat
from typing import Set
from lib.alerts import raise_alert_alertr, raise_alert_mail
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR, STATE_DIR
from config.monitor_ld_preload import ACTIVATED
STATE_DIR = os.path.join(os.path.dirname(__file__), STATE_DIR, os.path.basename(__file__))
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
STATE_DIR = os.path.join("/tmp", os.path.basename(__file__))
MAIL_SUBJECT = "[Security] Monitoring /etc/ld.so.preload on host '%s'" % socket.gethostname()
class MonitorLdPreloadException(Exception):
def __init__(self, msg: str):
self._msg = msg
def __str__(self):
return self._msg
def _get_ld_preload() -> Set[str]:
path = "/etc/ld.so.preload"
ld_data = set()
if os.path.isfile(path):
with open(path, 'rt') as fp:
for line in fp:
if line.strip() == "":
continue
ld_data.add(line.strip())
return ld_data
def _load_ld_preload_data() -> Set[str]:
state_file = os.path.join(STATE_DIR, "state")
ld_data = set()
if os.path.isfile(state_file):
data = None
try:
with open(state_file, 'rt') as fp:
data = fp.read()
if data is None:
raise MonitorLdPreloadException("Read state data is None.")
temp = json.loads(data)
ld_data = set(temp)
except Exception as e:
raise MonitorLdPreloadException("State data: '%s'; Exception: '%s'" % (str(data), str(e)))
return ld_data
def _output_error(msg: str):
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if print_output:
print(msg)
else:
hostname = socket.gethostname()
message = "Error monitoring /etc/ld.so.preload on host '%s': %s" \
% (hostname, msg)
if ALERTR_FIFO:
optional_data = dict()
optional_data["error"] = True
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
def _store_ld_preload_data(ld_data: Set[str]):
# Create state dir if it does not exist.
if not os.path.exists(STATE_DIR):
os.makedirs(STATE_DIR)
state_file = os.path.join(STATE_DIR, "state")
# Convert set to list.
temp = list(ld_data)
with open(state_file, 'wt') as fp:
fp.write(json.dumps(temp))
os.chmod(state_file, stat.S_IREAD | stat.S_IWRITE)
def monitor_ld_preload():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
stored_ld_data = set()
try:
stored_ld_data =_load_ld_preload_data()
except Exception as e:
_output_error(str(e))
return
curr_ld_data = set()
try:
curr_ld_data = _get_ld_preload()
except Exception as e:
_output_error(str(e))
return
# Compare stored data with current one.
for stored_entry in stored_ld_data:
if stored_entry not in curr_ld_data:
hostname = socket.gethostname()
message = "LD_PRELOAD entry '%s' was deleted on host '%s'." \
% (stored_entry, hostname)
if print_output:
print(message)
if ALERTR_FIFO:
optional_data = dict()
optional_data["entry"] = stored_entry
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
continue
# Check new data was added.
for curr_entry in curr_ld_data:
if curr_entry not in stored_ld_data:
hostname = socket.gethostname()
message = "LD_PRELOAD entry '%s' was added on host '%s'.\n\n" \
% (curr_entry, hostname)
if print_output:
print(message)
if ALERTR_FIFO:
optional_data = dict()
optional_data["entry"] = curr_entry
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
try:
_store_ld_preload_data(curr_ld_data)
except Exception as e:
_output_error(str(e))
if __name__ == '__main__':
monitor_ld_preload()

251
scripts/monitor_passwd.py Executable file
View file

@ -0,0 +1,251 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Monitor /etc/passwd for changes to detect malicious attempts to hijack/change users.
Requirements:
None
"""
import os
import json
import socket
import stat
from typing import Dict
from lib.alerts import raise_alert_alertr, raise_alert_mail
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR, STATE_DIR
from config.monitor_passwd import ACTIVATED
STATE_DIR = os.path.join(os.path.dirname(__file__), STATE_DIR, os.path.basename(__file__))
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
STATE_DIR = os.path.join("/tmp", os.path.basename(__file__))
MAIL_SUBJECT = "[Security] Monitoring /etc/passwd on host '%s'" % socket.gethostname()
class MonitorPasswdException(Exception):
def __init__(self, msg: str):
self._msg = msg
def __str__(self):
return self._msg
def _get_passwd() -> Dict[str, str]:
passwd_data = {}
with open("/etc/passwd", 'rt') as fp:
for line in fp:
line = line.strip()
if line == "":
continue
entry = line.strip().split(":")
user = entry[0]
passwd_data[user] = line
return passwd_data
def _load_passwd_data() -> Dict[str, str]:
state_file = os.path.join(STATE_DIR, "state")
passwd_data = {}
if os.path.isfile(state_file):
data = None
try:
with open(state_file, 'rt') as fp:
data = fp.read()
if data is None:
raise MonitorPasswdException("Read state data is None.")
passwd_data = json.loads(data)
except Exception as e:
raise MonitorPasswdException("State data: '%s'; Exception: '%s'" % (str(data), str(e)))
return passwd_data
def _output_error(msg: str):
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if print_output:
print(msg)
else:
hostname = socket.gethostname()
message = "Error monitoring /etc/passwd on host '%s': %s" \
% (hostname, msg)
if ALERTR_FIFO:
optional_data = dict()
optional_data["error"] = True
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
def _store_passwd_data(passwd_data: Dict[str, str]):
# Create state dir if it does not exist.
if not os.path.exists(STATE_DIR):
os.makedirs(STATE_DIR)
state_file = os.path.join(STATE_DIR, "state")
with open(state_file, 'wt') as fp:
fp.write(json.dumps(passwd_data))
os.chmod(state_file, stat.S_IREAD | stat.S_IWRITE)
def monitor_hosts():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
stored_passwd_data = {}
try:
stored_passwd_data =_load_passwd_data()
except Exception as e:
_output_error(str(e))
return
curr_passwd_data = {}
try:
curr_passwd_data = _get_passwd()
except Exception as e:
_output_error(str(e))
return
# Compare stored data with current one.
for stored_entry_user in stored_passwd_data.keys():
# Extract current entry belonging to the same user.
if stored_entry_user not in curr_passwd_data.keys():
hostname = socket.gethostname()
message = "User '%s' was deleted on host '%s'." \
% (stored_entry_user, hostname)
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["user"] = stored_entry_user
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
continue
# Check entry was modified.
if stored_passwd_data[stored_entry_user] != curr_passwd_data[stored_entry_user]:
hostname = socket.gethostname()
message = "Passwd entry for user '%s' was modified on host '%s'.\n\n" % (stored_entry_user, hostname)
message += "Old entry: %s\n" % stored_passwd_data[stored_entry_user]
message += "New entry: %s" % curr_passwd_data[stored_entry_user]
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["user"] = stored_entry_user
optional_data["old_entry"] = stored_passwd_data[stored_entry_user]
optional_data["new_entry"] = curr_passwd_data[stored_entry_user]
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
# Check new data was added.
for curr_entry_user in curr_passwd_data.keys():
if curr_entry_user not in stored_passwd_data.keys():
hostname = socket.gethostname()
message = "User '%s' was added on host '%s'.\n\n" \
% (curr_entry_user, hostname)
message += "Entry: %s" % curr_passwd_data[curr_entry_user]
if print_output:
print(message)
print("#"*80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["user"] = curr_entry_user
optional_data["entry"] = curr_passwd_data[curr_entry_user]
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
try:
_store_passwd_data(curr_passwd_data)
except Exception as e:
_output_error(str(e))
if __name__ == '__main__':
monitor_hosts()

View file

@ -0,0 +1,371 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Monitor ~/.ssh/authorized_keys for changes to detect malicious backdoor attempts.
Requirements:
None
"""
import os
import json
import stat
import socket
from typing import List, Tuple, Dict, Any
from lib.alerts import raise_alert_alertr, raise_alert_mail
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR, STATE_DIR
from config.monitor_ssh_authorized_keys import ACTIVATED
STATE_DIR = os.path.join(os.path.dirname(__file__), STATE_DIR, os.path.basename(__file__))
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
STATE_DIR = os.path.join("/tmp", os.path.basename(__file__))
MAIL_SUBJECT = "[Security] Monitoring SSH authorized_keys on host '%s'" % socket.gethostname()
class MonitorSSHException(Exception):
def __init__(self, msg: str):
self._msg = msg
def __str__(self):
return self._msg
def _get_home_dirs_from_passwd() -> List[Tuple[str, str]]:
user_home_list = []
try:
with open("/etc/passwd", 'rt') as fp:
for line in fp:
line_split = line.split(":")
user_home_list.append((line_split[0], line_split[5]))
except Exception as e:
raise MonitorSSHException(str(e))
return user_home_list
def _get_system_ssh_data() -> List[Dict[str, Any]]:
ssh_data = []
user_home_list = _get_home_dirs_from_passwd()
for user, home in user_home_list:
# Monitor "authorized_keys2" too since SSH also checks this file for keys (even though it is deprecated).
for authorized_file_name in ["authorized_keys", "authorized_keys2"]:
authorized_keys_file = os.path.join(home, ".ssh", authorized_file_name)
if os.path.isfile(authorized_keys_file):
ssh_user_data = {"user": user,
"authorized_keys_file": authorized_keys_file,
"authorized_keys_entries": _parse_authorized_keys_file(authorized_keys_file)}
ssh_data.append(ssh_user_data)
return ssh_data
def _load_ssh_data() -> List[Dict[str, Any]]:
state_file = os.path.join(STATE_DIR, "state")
ssh_data = []
if os.path.isfile(state_file):
data = None
try:
with open(state_file, 'rt') as fp:
data = fp.read()
if data is None:
raise MonitorSSHException("Read state data is None.")
ssh_data = json.loads(data)
except Exception as e:
raise MonitorSSHException("State data: '%s'; Exception: '%s'" % (str(data), str(e)))
return ssh_data
def _output_error(msg: str):
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if print_output:
print(msg)
else:
hostname = socket.gethostname()
message = "Error monitoring SSH authorized_keys on host '%s': %s" \
% (hostname, msg)
if ALERTR_FIFO:
optional_data = dict()
optional_data["error"] = True
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
def _parse_authorized_keys_file(authorized_keys_file: str) -> List[str]:
entries = set()
try:
with open(authorized_keys_file, 'rt') as fp:
for line in fp:
entries.add(line.strip())
except Exception as e:
raise MonitorSSHException("Unable to parse file '%s'; Exception: '%s'" % (authorized_keys_file, str(e)))
return list(entries)
def _store_ssh_data(ssh_data: List[Dict[str, Any]]):
# Create state dir if it does not exist.
if not os.path.exists(STATE_DIR):
os.makedirs(STATE_DIR)
state_file = os.path.join(STATE_DIR, "state")
with open(state_file, 'wt') as fp:
fp.write(json.dumps(ssh_data))
os.chmod(state_file, stat.S_IREAD | stat.S_IWRITE)
def monitor_ssh_authorized_keys():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
stored_ssh_data = []
curr_ssh_data = []
try:
stored_ssh_data = _load_ssh_data()
curr_ssh_data = _get_system_ssh_data()
except Exception as e:
_output_error(str(e))
return
# Check if any authorized_keys file is world writable.
for curr_entry in curr_ssh_data:
authorized_keys_file = curr_entry["authorized_keys_file"]
file_stat = os.stat(authorized_keys_file)
if file_stat.st_mode & stat.S_IWOTH:
hostname = socket.gethostname()
message = "SSH authorized_keys file for user '%s' is world writable on host '%s'." \
% (curr_entry["user"], hostname)
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["username"] = curr_entry["user"]
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
# Compare stored data with current one.
for stored_entry in stored_ssh_data:
# Extract current entry belonging to the same user.
curr_user_entry = None
for curr_entry in curr_ssh_data:
if stored_entry["user"] == curr_entry["user"]:
curr_user_entry = curr_entry
break
if curr_user_entry is None:
hostname = socket.gethostname()
message = "SSH authorized_keys file for user '%s' was deleted on host '%s'." \
% (stored_entry["user"], hostname)
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["username"] = stored_entry["user"]
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
continue
# Check authorized_keys path has changed.
if stored_entry["authorized_keys_file"] != curr_user_entry["authorized_keys_file"]:
hostname = socket.gethostname()
message = "SSH authorized_keys location for user '%s' changed from '%s' to '%s' on host '%s'." \
% (stored_entry["user"],
stored_entry["authorized_keys_file"],
curr_user_entry["authorized_keys_file"],
hostname)
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["username"] = stored_entry["user"]
optional_data["from"] = stored_entry["authorized_keys_file"]
optional_data["to"] = curr_user_entry["authorized_keys_file"]
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
# Check authorized_key was removed.
for authorized_key in stored_entry["authorized_keys_entries"]:
if authorized_key not in curr_user_entry["authorized_keys_entries"]:
hostname = socket.gethostname()
message = "SSH authorized_keys entry was removed on host '%s'.\n\n" % hostname
message += "Entry: %s" % authorized_key
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["username"] = stored_entry["user"]
optional_data["authorized_keys_entry"] = authorized_key
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
# Check authorized_key was added.
for authorized_key in curr_user_entry["authorized_keys_entries"]:
if authorized_key not in stored_entry["authorized_keys_entries"]:
hostname = socket.gethostname()
message = "SSH authorized_keys entry was added on host '%s'.\n\n" % hostname
message += "Entry: %s" % authorized_key
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["username"] = stored_entry["user"]
optional_data["authorized_keys_entry"] = authorized_key
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
for curr_entry in curr_ssh_data:
found = False
for stored_entry in stored_ssh_data:
if curr_entry["user"] == stored_entry["user"]:
found = True
break
if not found:
hostname = socket.gethostname()
message = "New authorized_keys file was added for user '%s' on host '%s'.\n\n" \
% (curr_entry["user"], hostname)
message += "Entries:\n"
for authorized_key in curr_entry["authorized_keys_entries"]:
message += authorized_key
message += "\n"
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["username"] = curr_entry["user"]
optional_data["authorized_keys_entries"] = curr_entry["authorized_keys_entries"]
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
try:
_store_ssh_data(curr_ssh_data)
except Exception as e:
_output_error(str(e))
if __name__ == '__main__':
monitor_ssh_authorized_keys()

101
scripts/search_dev_shm.py Executable file
View file

@ -0,0 +1,101 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Search for binaries and scripts in /dev/shm.
Malware that tries to hide is often stored there.
Requirements:
None
Reference:
https://twitter.com/CraigHRowland/status/1268863172825346050?s=20
https://twitter.com/CraigHRowland/status/1269196509079166976
"""
import os
import socket
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR
from config.search_dev_shm import ACTIVATED
from lib.alerts import raise_alert_alertr, raise_alert_mail
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
def search_suspicious_files():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
# Get all suspicious ELF files.
fd = os.popen("find /dev/shm -type f -exec file -p '{}' \\; | grep ELF")
elf_raw = fd.read().strip()
fd.close()
# Get all suspicious script files.
fd = os.popen("find /dev/shm -type f -exec file -p '{}' \\; | grep script")
script_raw = fd.read().strip()
fd.close()
suspicious_files = []
if elf_raw.strip():
suspicious_files.extend(elf_raw.strip().split("\n"))
if script_raw.strip():
suspicious_files.extend(script_raw.strip().split("\n"))
for suspicious_file in suspicious_files:
if print_output:
print("SUSPICIOUS")
print(suspicious_file)
print("")
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["suspicious_file"] = suspicious_file
optional_data["hostname"] = hostname
message = "File in /dev/shm on host '%s' suspicious.\n\n" % hostname
message += suspicious_file
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Suspicious file found on '%s'" % hostname
message = "File in /dev/shm on host '%s' suspicious.\n\n" % hostname
message += suspicious_file
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
if __name__ == '__main__':
search_suspicious_files()

306
scripts/search_immutable_files.py Executable file
View file

@ -0,0 +1,306 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Searches for immutable files in the filesystem.
Requirements:
None
"""
import os
import json
import socket
import stat
from typing import Dict, Any, List, Tuple
from lib.alerts import raise_alert_alertr, raise_alert_mail
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR, STATE_DIR
from config.search_immutable_files import ACTIVATED, SEARCH_IN_STEPS, SEARCH_LOCATIONS, \
IMMUTABLE_DIRECTORY_WHITELIST, IMMUTABLE_FILE_WHITELIST
STATE_DIR = os.path.join(os.path.dirname(__file__), STATE_DIR, os.path.basename(__file__))
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
SEARCH_IN_STEPS = False
SEARCH_LOCATIONS = ["/"]
IMMUTABLE_DIRECTORY_WHITELIST = []
IMMUTABLE_FILE_WHITELIST = []
STATE_DIR = os.path.join("/tmp", os.path.basename(__file__))
MAIL_SUBJECT = "[Security] Searching immutable files on host '%s'" % socket.gethostname()
class SearchImmutableException(Exception):
def __init__(self, msg: str):
self._msg = msg
def __str__(self):
return self._msg
def _load_state() -> Dict[str, Any]:
state_file = os.path.join(STATE_DIR, "state")
state_data = {"next_step": 0}
if os.path.isfile(state_file):
data = None
try:
with open(state_file, 'rt') as fp:
data = fp.read()
if data is None:
raise SearchImmutableException("Read state data is None.")
state_data = json.loads(data)
except Exception as e:
raise SearchImmutableException("State data: '%s'; Exception: '%s'" % (str(data), str(e)))
return state_data
def _output_error(msg: str):
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if print_output:
print(msg)
else:
hostname = socket.gethostname()
message = "Error searching immutable files on host on host '%s': %s" \
% (hostname, msg)
if ALERTR_FIFO:
optional_data = dict()
optional_data["error"] = True
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
def _process_directory_whitelist(immutable_files: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
if not IMMUTABLE_DIRECTORY_WHITELIST:
return immutable_files
# Extract the components of the whitelist paths (pre-process it to reduces processing steps).
whitelist_path_components_list = []
for whitelist_entry in IMMUTABLE_DIRECTORY_WHITELIST:
whitelist_path = os.path.normpath(whitelist_entry)
whitelist_path_components = []
while True:
whitelist_path, component = os.path.split(whitelist_path)
if not component:
break
whitelist_path_components.insert(0, component)
whitelist_path_components_list.append(whitelist_path_components)
new_immutable_files = []
for immutable_file in immutable_files:
is_whitelisted = False
# Extract the components of the path to the immutable file.
immutable_path = os.path.dirname(os.path.normpath(immutable_file[0]))
immutable_path_components = []
while True:
immutable_path, component = os.path.split(immutable_path)
if not component:
break
immutable_path_components.insert(0, component)
for whitelist_path_components in whitelist_path_components_list:
# Skip case such as "whitelist: /usr/local/bin" and "immutable path: /usr"
if len(whitelist_path_components) > len(immutable_path_components):
continue
# NOTE: this check also works if "/" is whitelisted, since the whitelist components are empty and
# thus the file is counted as whitelisted.
is_whitelisted = True
for i in range(len(whitelist_path_components)):
if whitelist_path_components[i] != immutable_path_components[i]:
is_whitelisted = False
if is_whitelisted:
break
if not is_whitelisted:
new_immutable_files.append(immutable_file)
return new_immutable_files
def _process_file_whitelist(immutable_files: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
if not IMMUTABLE_FILE_WHITELIST:
return immutable_files
new_immutable_files = []
for immutable_file in immutable_files:
is_whitelisted = False
for whitelist_entry in IMMUTABLE_FILE_WHITELIST:
if os.path.samefile(immutable_file[0], whitelist_entry):
is_whitelisted = True
break
if not is_whitelisted:
new_immutable_files.append(immutable_file)
return new_immutable_files
def _store_state(state_data: Dict[str, Any]):
# Create state dir if it does not exist.
if not os.path.exists(STATE_DIR):
os.makedirs(STATE_DIR)
state_file = os.path.join(STATE_DIR, "state")
with open(state_file, 'wt') as fp:
fp.write(json.dumps(state_data))
os.chmod(state_file, stat.S_IREAD | stat.S_IWRITE)
def search_immutable_files():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
state_data = {}
try:
state_data = _load_state()
except Exception as e:
_output_error(str(e))
return
# Reset step if we do not search in steps but everything.
if not SEARCH_IN_STEPS:
state_data["next_step"] = 0
if not SEARCH_LOCATIONS:
SEARCH_LOCATIONS.append("/")
# Gather all search locations.
search_locations = []
# If SEARCH_IN_STEPS is active, build a list of directories to search in
if SEARCH_IN_STEPS:
for search_location in SEARCH_LOCATIONS:
# Add parent directory as non-recursive search location in order to search in it without going deeper.
# Tuple with directory as first element and recursive search as second element.
search_locations.append((search_location, False))
# Add all containing sub-directories as recursive search locations.
elements = os.listdir(search_location)
elements.sort()
for element in elements:
path = os.path.join(search_location, element)
if os.path.isdir(path):
# Tuple with directory as first element and recursive search as second element.
search_locations.append((path, True))
# If we do not search in separated steps, just add each directory as a recursive search location.
else:
for search_location in SEARCH_LOCATIONS:
# Tuple with directory as first element and recursive search as second element.
search_locations.append((search_location, True))
# Reset index if it is outside the search locations.
if state_data["next_step"] >= len(search_locations):
state_data["next_step"] = 0
while True:
search_location, is_recursive = search_locations[state_data["next_step"]]
# Get all immutable files.
if is_recursive:
fd = os.popen("lsattr -R -a %s 2> /dev/null | sed -rn '/^[aAcCdDeijPsStTu\\-]{4}i/p'"
% search_location)
else:
fd = os.popen("lsattr -a %s 2> /dev/null | sed -rn '/^[aAcCdDeijPsStTu\\-]{4}i/p'"
% search_location)
output_raw = fd.read().strip()
fd.close()
if output_raw != "":
immutable_files = []
output_list = output_raw.split("\n")
for output_entry in output_list:
output_entry_list = output_entry.split(" ")
# Notify and skip line if sanity check fails.
if len(output_entry_list) != 2:
_output_error("Unable to process line '%s'" % output_entry)
continue
attributes = output_entry_list[0]
file_location = output_entry_list[1]
immutable_files.append((file_location, attributes))
immutable_files = _process_directory_whitelist(immutable_files)
immutable_files = _process_file_whitelist(immutable_files)
hostname = socket.gethostname()
message = "Immutable files found on host '%s'.\n\n" % hostname
message += "\n".join(["File: %s; Attributes: %s" % (x[0], x[1]) for x in immutable_files])
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["immutable_files"] = output_raw.split("\n")
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
state_data["next_step"] += 1
# Stop search if we are finished.
if SEARCH_IN_STEPS or state_data["next_step"] >= len(search_locations):
break
try:
_store_state(state_data)
except Exception as e:
_output_error(str(e))
if __name__ == '__main__':
search_immutable_files()

92
scripts/search_memfd_create.py Executable file
View file

@ -0,0 +1,92 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Malware uses calls such as memfd_create() to create an anonymous file in RAM that can be run.
Requirements:
None
Reference:
https://www.sandflysecurity.com/blog/detecting-linux-memfd_create-fileless-malware-with-command-line-forensics/
"""
import os
import socket
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR
from config.search_memfd_create import ACTIVATED
from lib.alerts import raise_alert_alertr, raise_alert_mail
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = True
def search_deleted_memfd_files():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
# Get all suspicious ELF files.
fd = os.popen("ls -laR /proc/*/exe 2> /dev/null | grep memfd:.*\(deleted\)")
suspicious_exe_raw = fd.read().strip()
fd.close()
suspicious_exes = []
if suspicious_exe_raw.strip():
suspicious_exes.extend(suspicious_exe_raw.strip().split("\n"))
for suspicious_exe in suspicious_exes:
if print_output:
print("SUSPICIOUS")
print(suspicious_exe)
print("")
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["suspicious_exe"] = suspicious_exe
optional_data["hostname"] = hostname
message = "Deleted memfd file on host '%s' found.\n\n" % hostname
message += suspicious_exe
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Deleted memfd file on '%s'" % hostname
message = "Deleted memfd file on host '%s' found.\n\n" % hostname
message += suspicious_exe
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
if __name__ == '__main__':
search_deleted_memfd_files()

136
scripts/search_non_kthreads.py Executable file
View file

@ -0,0 +1,136 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Malware will name itself with [brackets] to impersonate a Linux kernel thread.
Any Linux process that looks like a [kernel thread] should have an empty maps file.
Site note:
when using ps auxwf | grep "\[" they are children of [kthreadd]
Requirements:
None
Reference:
https://twitter.com/CraigHRowland/status/1232399132632813568
https://www.sandflysecurity.com/blog/detecting-linux-kernel-process-masquerading-with-command-line-forensics/
"""
import os
import socket
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR
from config.search_non_kthreads import NON_KTHREAD_WHITELIST, ACTIVATED
from lib.alerts import raise_alert_alertr, raise_alert_mail
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
NON_KTHREAD_WHITELIST = []
ACTIVATED = True
def search_suspicious_process():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
# Iterate over all processes that have a "[".
fd = os.popen("ps auxw | grep \\\\[ | awk '{print $2}'")
pids_raw = fd.read().strip()
fd.close()
for pid in pids_raw.split("\n"):
# Get process name of pid.
fd = os.popen("ps u -p %s" % pid)
ps_output = fd.read().strip()
fd.close()
fd = os.popen("ps u -p %s | awk '{$1=$2=$3=$4=$5=$6=$7=$8=$9=$10=\"\"; print $0}'" % pid)
process_name_raw = fd.read().strip()
fd.close()
for process_name in process_name_raw.split("\n"):
process_name = process_name.strip()
# Ignore COMMAND since it is part of the headline of ps output.
if process_name == "COMMAND":
continue
# Check if we have whitelisted the process
# (e.g., [lxc monitor] /var/lib/lxc satellite).
elif process_name in NON_KTHREAD_WHITELIST:
continue
# Only consider process names that start with a "["
# (e.g., "avahi-daemon: running [towelie.local]"" does not)
elif process_name.startswith("["):
if print_output:
print("Checking pid: %s (%s) - " % (pid, process_name),
end="")
file_path = "/proc/%s/maps" % pid
try:
with open(file_path, 'rt') as fp:
data = fp.read()
if data == "":
if print_output:
print("ok")
continue
except Exception as e:
print("Exception")
print(e)
print("")
continue
if print_output:
print("SUSPICIOUS")
print(ps_output)
print("")
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["pid"] = pid
optional_data["ps_output"] = ps_output
optional_data["hostname"] = hostname
message = "Process with pid '%s' on host '%s' suspicious.\n\n" % (pid, hostname)
message += ps_output
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Suspicious process found on '%s'" % hostname
message = "Process with pid '%s' on host '%s' suspicious.\n\n" % (pid, hostname)
message += ps_output
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
if __name__ == '__main__':
search_suspicious_process()

52
scripts/test_alert.py Executable file
View file

@ -0,0 +1,52 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
If scripts are executed via cronjob, this script helps to check if the alert functions work.
Requirements:
None
"""
import socket
# Read configuration and and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR
from config.test_alert import ACTIVATED
from lib.alerts import raise_alert_alertr, raise_alert_mail
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
ACTIVATED = False
if __name__ == '__main__':
if ACTIVATED:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["hostname"] = hostname
message = "Alert test on host '%s'." % hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Alert test on '%s'" % hostname
message = "Alert test on host '%s'." % hostname
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)

102
scripts/verify_deb_packages.py Executable file
View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# # Licensed under the MIT License.
"""
Short summary:
Use `debsums` to verify the integrity of installed deb packages using /var/lib/dpkg/info/*.md5sums.
Requirements:
`debsums` installed on system
Reference:
https://www.sandflysecurity.com/blog/detecting-linux-binary-file-poisoning/
"""
import os
import socket
from typing import List
# Read configuration and library functions.
try:
from config.config import ALERTR_FIFO, FROM_ADDR, TO_ADDR
from config.verify_deb_packages import ACTIVATED, DEBSUMS_EXE, FILE_WHITELIST
from lib.alerts import raise_alert_alertr, raise_alert_mail
except:
ALERTR_FIFO = None
FROM_ADDR = None
TO_ADDR = None
DEBSUMS_EXE = "debsums"
FILE_WHITELIST = []
ACTIVATED = True
MAIL_SUBJECT = "[Security] Verifying deb package files on host '%s'" % socket.gethostname()
def _process_whitelist(changed_files: List[str]) -> List[str]:
if not FILE_WHITELIST:
return changed_files
new_changed_files = []
for changed_file in changed_files:
if changed_file in FILE_WHITELIST:
continue
new_changed_files.append(changed_file)
return new_changed_files
def verify_deb_packages():
# Decide where to output results.
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
if not ACTIVATED:
if print_output:
print("Module deactivated.")
return
fd = os.popen("%s -c" % DEBSUMS_EXE)
output_raw = fd.read().strip()
fd.close()
if output_raw != "":
changed_files = output_raw.split("\n")
changed_files = _process_whitelist(changed_files)
if changed_files:
hostname = socket.gethostname()
message = "Changed deb package files found on host '%s'.\n\n" % hostname
message += "\n".join(["File: %s" % x for x in changed_files])
if print_output:
print(message)
print("#" * 80)
if ALERTR_FIFO:
optional_data = dict()
optional_data["immutable_files"] = output_raw.split("\n")
optional_data["hostname"] = hostname
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
raise_alert_mail(FROM_ADDR,
TO_ADDR,
MAIL_SUBJECT,
message)
if __name__ == '__main__':
verify_deb_packages()

184
start_search.py Executable file
View file

@ -0,0 +1,184 @@
#!/usr/bin/env python3
# written by sqall
# twitter: https://twitter.com/sqall01
# blog: https://h4des.org
# github: https://github.com/sqall01
#
# Licensed under the MIT License.
import os
import subprocess
import socket
import time
from scripts.config.config import START_PROCESS_TIMEOUT, TO_ADDR, FROM_ADDR, ALERTR_FIFO
from scripts.lib.alerts import raise_alert_alertr, raise_alert_mail
if __name__ == '__main__':
print_output = False
if ALERTR_FIFO is None and FROM_ADDR is None and TO_ADDR is None:
print_output = True
script_dir = os.path.dirname(os.path.abspath(__file__)) + "/scripts/"
for script in os.listdir(script_dir):
# Execute all python scripts.
if script[-3:] == ".py" and script != "__init__.py":
if print_output:
print("Executing %s" % script)
to_execute = script_dir + script
process = None
try:
process = subprocess.Popen(to_execute,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.wait(START_PROCESS_TIMEOUT)
# Catch timeout.
except subprocess.TimeoutExpired:
if print_output:
print("Script '%s' timed out." % script)
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["script"] = script
optional_data["hostname"] = hostname
message = "Script '%s' on host '%s' timed out." % (script, hostname)
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Script '%s' on '%s' timed out" % (script, hostname)
message = "Script '%s' on host '%s' timed out." % (script, hostname)
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
# Catch any execution error.
except Exception as e:
if print_output:
print("Executing script '%s' raised error: %s" % (script, str(e)))
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["script"] = script
optional_data["hostname"] = hostname
message = "Executing script '%s' on host '%s' raised error: %s" % (script, hostname, str(e))
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Executing script '%s' on '%s' raised error" % (script, hostname)
message = "Executing script '%s' on host '%s' raised error: %s" % (script, hostname, str(e))
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
continue
exit_code = process.poll()
# Process did not terminate yet.
if exit_code is None:
process.terminate()
time.sleep(5)
exit_code = process.poll()
# Kill process if not exited.
if exit_code != -15:
if print_output:
print("Script '%s' did not terminate. Killing it." % script)
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["script"] = script
optional_data["hostname"] = hostname
message = "Script '%s' on host '%s' did not terminate. Killing it." % (script, hostname)
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Script '%s' on '%s' did not terminate" % (script, hostname)
message = "Script '%s' on host '%s' did not terminate. Killing it." % (script, hostname)
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
# noinspection PyBroadException
try:
process.kill()
except:
pass
# Process executed successfully.
elif exit_code == 0:
if print_output:
stdout, stderr = process.communicate()
print(stdout.decode("ascii"))
print("")
continue
# Process encountered error.
else:
if print_output:
print("Script '%s' exited with exit code: %d" % (script, exit_code))
else:
if ALERTR_FIFO is not None:
hostname = socket.gethostname()
optional_data = dict()
optional_data["script"] = script
optional_data["hostname"] = hostname
message = "Script '%s' on host '%s' exited with exit code '%d'." % (script, hostname, exit_code)
optional_data["message"] = message
raise_alert_alertr(ALERTR_FIFO,
optional_data)
if FROM_ADDR is not None and TO_ADDR is not None:
hostname = socket.gethostname()
subject = "[Security] Script '%s' on '%s' unsuccessful" % (script, hostname)
message = "Script '%s' on host '%s' exited with exit code '%d'." % (script, hostname, exit_code)
raise_alert_mail(FROM_ADDR,
TO_ADDR,
subject,
message)
# noinspection PyBroadException
try:
process.kill()
except:
pass