synawk

Machine Learning para detectar malware.

Actualmente hacer un malware indetectable se ha vuelto una tarea simple; cualquiera puede hacerlo, incluso usando solo tools. Quizá la solución a este problema tenga que ir más allá de la heurística y comenzar a usar modelos de IA para poder detectar nuevas amenazas.

Esta vez voy a mostrar (de manera básica) de que forma se podría implementar Machine Learning para detectar malwares usando python3. Para empezar primero debo saber como funciona un malware; es decir que es lo que lo hace malicioso y basado en esto poder extraer esos atributos. Está de más decir que no hablo del propósito del malware, si no desde un enfoque técnico; que hace que sea detectado actualmente como malware y de que forma se podría no solo automatizar si no también futuras amenazas aun no desarrolladas.

Bien, lo primero es saber que un malware es un software que invoca funciones que instancia objetos y demás cosas que hace un software convencional. La diferencia está en el propósito de la misma; es decir para que fue hecho. Una aplicación convencional normalmente no es detectada porque esta firmada, autorizada o simplemente no hace nada “comprometedor”; pero es muy probable que esta app realice acciones incluso mas intrusivas que un malware. Para poner un ejemplo, Una aplicación de escritorio remoto. Este tipo de aplicaciones tienen un proceso complejo invocando funciones y procedimientos que en otro contexto podrían ser maliciosos. Por otro lado un malware puede hacer lo mismo o menos, pero su propósito es netamente malicioso e intrusivo. Entonces dicho esto queda claro que no es tan simple detectar cuando se trata o no de un malware, sobre todo en un análisis dinámico.

Entonces para este tipo de problemas se necesita una solución no determinista. Para esto voy a crear un modelo en python3 usando scikit-learn que luego de entrenarlo con atributos de malwares y apps convencionales sepa reconocer estos patrones y pueda detectar las diferencias.

machine learning fail

Ahora viene lo interesante, cuales son los atributos que voy a entrenar al modelo para determinar si es un malware o no. Pero antes a que voy a considerar atributos a mejor dicho que son atributos en este contexto.

Por ejemplo una particularidad de muchos malwares es la entropía del código; y esto esto se debe a los packers o algunos tipos de ofuscación. Dicho de manera simple: el código es ilegible para el sistema que analiza el malware. Esta definitivamente es un atributo a considerar; si bien es cierto muchas apps son ofuscadas para proteger su código; es un buen indicador pero no el único ya que adicionando más atributos se puede generar un patrón de detección. Otro buen atributo sería que funciones usa el malware; funciones comunes en malwares son GetProcAddress o LoadLibrary incluso funciones de lectura y escritura en ciertos directorios. Por otro lado tenemos el nombre de las secciones que en su mayoría de veces cumple conciertas características, como por ejemplo tienen de nombre .text .data .rsc y así (upx por ejemplo si se ha usado). Pues bien, estos son los atributos que debemos considerar al momento de entrenar al modelo; pero claro no son los únicos.

Implementación en Python

Ya con lo anterior dicho se pueden hacer una idea de lo que se necesita hacer. Los pasos que seguiré para implementar el anti-malware serán:

1. Obtener una colección grande de malwares.
2. Obtener una colección grande para aplicaciones convencionales (no malwares)
3. Extraer los mismo atributos en ambos set de datos.
4. Evaluar diferentes modelos con la data recolectada y obtener su score.
5. Seleccionar el modelo con mejor score y guardarlo para un uso posterior.

Primero inspeccionaré un pefile malicioso para tener la base de lo que voy a extraer y poder generar una función que extraiga esos atributos de mi colección de malwares. Voy a tomar como ejemplo una variante del malware SPYBANKER; del cual voy a extraer algunos atributos para intentar reconocer patrones que puedan ser útiles para poder entrenar al modelo.

iat malware

Al extraer algunas de las funciones que el código importa se pueden reconocer algunas. Funciones para añadir un valor al registro; muy probablemente para hacer que el malware inicie al cargar. Funciones para escritura y también funciones como LoadLibraryY GetProcAddressque se usan mucho para evitar cargar dependencias; en este caso podría ser para las funciones de socket ya que no vi ninguna por ahí.

Ahora si luego de mucha teoría viene la implementación. Lo primero será instalar el paquete pefile que me va a permitir obtener esta información. No expliqué a detalle la composición de un pefile pero puedes revisarlo en la doc. de Microsoft. https://docs.microsoft.com/en-us/windows/win32/debug/pe-format.

from pefile import PE

pe = PE("malware01")
pe.parse_data_directories()

for entry in pe.DIRECTORY_ENTRY_IMPORT:
  for imp in entry.imports:
    print(hex(imp.address), imp.name)

De este modo voy a obtener la lista completa de las funciones que tiene el código. Pero aquí ya está el primer problema; no puedo pasarle a mi modelo de ML texto, es decir tengo que enviarle una representación “numérica” de esto. El resultado en formato simple debería ser algo así:

dataset machine learning

Es decir voy a tener en columnas todos los valores de DOS_HEADER , IMAGE_NT_HEADERS y IMAGE_OPTIONAL_HEADERS. Luego tener una columna que defina si existen o no funciones sospechosas; como mencione previamente, esta columna por si solo no dice nada; el valor se halla al “combinarse” con las demás columnas. Y finalmente lo mas importante mi etiqueta de clasificación que me indica si es un malware o no. Con este set de datos entrenaré al modelo.

from pefile import PE

pe = PE("malware01")

"""
> Obtener funciones del pefile
"""
pe.parse_data_directories()

for entry in pe.DIRECTORY_ENTRY_IMPORT:
  for imp in entry.imports:
    print(hex(imp.address), imp.name)
    

"""
> Obtener atributos estáticos del pefile
Solo voy a extraer los atributos que considero significativos
El objeto debe tener las mismas columnas que todos los casos es por eso 
que no itero las cabceras o uso dump_dict

"""
def get_attr(file):
	data = {
		 "e_magic": file.DOS_HEADER.e_magic,
		 "e_cblp": file.DOS_HEADER.e_cblp,
		 "e_cp": file.DOS_HEADER.e_cp,
		 "e_minalloc": file.DOS_HEADER.e_minalloc,
		 "e_maxalloc": file.DOS_HEADER.e_maxalloc,
		 "e_lfanew": file.DOS_HEADER.e_lfanew,

		 "Machine": file.FILE_HEADER.Machine,
		 "NumberOfSections": file.FILE_HEADER.NumberOfSections,
		 "TimeDateStamp": file.FILE_HEADER.TimeDateStamp,
		 "PointerToSymbolTable": file.FILE_HEADER.PointerToSymbolTable,
		 "NumberOfSymbols": file.FILE_HEADER.NumberOfSymbols,
		 "SizeOfOptionalHeader": file.FILE_HEADER.SizeOfOptionalHeader,
		 "Characteristics": file.FILE_HEADER.Characteristics,

		 "Magic": file.OPTIONAL_HEADER.Magic,
		 "MajorLinkerVersion": file.OPTIONAL_HEADER.MajorLinkerVersion,
		 "MinorLinkerVersion": file.OPTIONAL_HEADER.MinorLinkerVersion,
		 "SizeOfCode": file.OPTIONAL_HEADER.SizeOfCode,
		 "SizeOfInitializedData": file.OPTIONAL_HEADER.SizeOfInitializedData,
		 "SizeOfUninitializedData": file.OPTIONAL_HEADER.SizeOfUninitializedData,
		 "AddressOfEntryPoint": file.OPTIONAL_HEADER.AddressOfEntryPoint,
		 "BaseOfCode": file.OPTIONAL_HEADER.BaseOfCode,
		 "BaseOfData": file.OPTIONAL_HEADER.BaseOfData,
		 "ImageBase": file.OPTIONAL_HEADER.ImageBase,
		 "SectionAlignment": file.OPTIONAL_HEADER.SectionAlignment,
		 "FileAlignment": file.OPTIONAL_HEADER.FileAlignment,
		 "SizeOfImage": file.OPTIONAL_HEADER.SizeOfImage,
		 "SizeOfHeaders": file.OPTIONAL_HEADER.SizeOfHeaders,
		 "CheckSum": file.OPTIONAL_HEADER.CheckSum,
		 "Subsystem": file.OPTIONAL_HEADER.Subsystem,
		 "DllCharacteristics": file.OPTIONAL_HEADER.DllCharacteristics,
		 "SizeOfStackReserve": file.OPTIONAL_HEADER.SizeOfStackReserve,
		 "SizeOfStackCommit": file.OPTIONAL_HEADER.SizeOfStackCommit,
		 "SizeOfHeapReserve": file.OPTIONAL_HEADER.SizeOfHeapReserve,
		 "SizeOfHeapCommit": file.OPTIONAL_HEADER.SizeOfHeapCommit,
		 "NumberOfRvaAndSizes": file.OPTIONAL_HEADER.NumberOfRvaAndSizes
	}
	return data
print(get_attr(pe))

Hasta este punto solamente estoy capturando los atributos estáticos del pefile; sin embargo hay algunos más que debo agregar, el problema es que estos atributos que faltan son colecciones. Por ejemplo tenemos las secciones, cada una tiene un nombre y n atributos (VirtualAddress, SizeOfRawData, etc). Entonces debo de alguna forma establecer estas colecciones como atributos estándares; por ejemplo para el caso de Characteristicssabemos que define el tipo de la sección si tiene código, si es ejecutable etc. Con esta información puedo crear nuevos atributos, y precisamente esa es la idea; generar nuevos a tributos a partir de un análisis del pefile. Tomando como ejemplo este último atributo tenemos la siguiente información

Characteristics malware

Este atributo termina siendo relevante ya que determina si una sección es de escritura, lectura o ejecución (obviamente determina más cosas pero para este ejemplo me importan solo esas 3). Así que un atributo adicional es obtener el máximo valor, que es lo mismo decir la sección que tiene más permisos y también cuantas secciones iguales al máximo tipo de permisos existen. Esto último es porque la mayoría de malwares utiliza diferentes secciones con permisos para ejecución.

Ps: Quizá otra forma sea tomar la representación de execute, read y write, y manejarlo como una valor binario; es decir cuantas secciones son de escritura cuantos de lectura y cuantos de ejecución. No lo he probado pero podría funcionar mejor.

Otro valor que puedo obtener y resulta relevante es la entropía; en síntesis es la “complejidad” o desorden que tiene la sección; este atributo lo puedo obtener con get_entropy. Y bueno agregando de esta forma los demás atributos por sección ya quedaría algo asi.

secs = pe.sections
len_secs = len(secs)
Misc = [x.Misc for x in secs ] if len(secs) > 0 else [0]
Misc_PhysicalAddress = [x.Misc_PhysicalAddress for x in secs ] if len_secs > 0 else [0]
Misc_VirtualSize = [x.Misc_VirtualSize for x in secs ] if len_secs > 0 else [0]
VirtualAddress = [x.VirtualAddress for x in secs ] if len_secs > 0 else [0]
SizeOfRawData = [x.SizeOfRawData for x in secs ] if len_secs > 0 else [0]
PointerToRawData = [x.PointerToRawData for x in secs ] if len_secs > 0 else [0]
Characteristics = [x.Characteristics for x in secs ] if len_secs > 0 else [0]
entropy = [x.get_entropy() for x in secs ] if len_secs > 0 else [0]
{
    ...
  
 "SectionsLength" : len(secs),
 "Misc_min": min(Misc),
 "Misc_max": max(Misc),
 "Misc_PhysicalAddress_min": min(Misc_PhysicalAddress),
 "Misc_PhysicalAddress_max": max(Misc_PhysicalAddress),
 "Misc_VirtualSize_min": min(Misc_VirtualSize),
 "Misc_VirtualSize_max": max(Misc_VirtualSize),
 "SizeOfRawData_min": min(SizeOfRawData),
 "SizeOfRawData_max": max(SizeOfRawData),
 "PointerToRawData_min": min(PointerToRawData),
 "PointerToRawData_max": max(PointerToRawData),
 "Characteristics_min": min(Characteristics),
 "Characteristics_max": max(Characteristics),
 "entropy_min": min(entropy),
 "entropy_max": max(entropy)
    ...
  
}

...

Ahora sí; ya tengo los atributos que por defecto tiene un pefile, toca construir los demás indicadores; por ejemplo las funciones sospechosas etc. Para poder obtener que las funciones mas comunes en malwares necesito una colección de malwares; diferentes samples.

from pefile import PE
from os import listdir
from os.path import isfile, join
from collections import Counter

dataset_path = "./dataset/"
malwares = [f for f in listdir(dataset_path) if isfile(join(dataset_path, f))]

fn = []
for k, mw in enumerate(malwares):
	try:
		pe = PE(dataset_path + mw)
		pe.parse_data_directories()

		for entry in pe.DIRECTORY_ENTRY_IMPORT:
		  for imp in entry.imports:
		  	if imp.name is not None:
	  			fn.append(imp.name)
	except:
		pass
for n in Counter(fn).most_common(20):
	print(n)

funciones comunes en malwares

Luego de ejecutar un código en python que me extrae las funciones que los malwares dentro mi dataset; puedo obtener la imagen anterior. Y se puede ver que tiene sentido. Funciones como GetPorcAddress o LoadLibrary, tambien hay un Sleep y funciones para manipular archivos. Como ya menciones el solo hecho que usen estas funciones no los hace maliciosos; sin embargo en conjunto con el resto de atributos el modelo le va a generar un “peso” o relevancia según el entrenamiento. Entonces con esta información ya puedo crear un atributo nuevo que sea cuantas funciones comunes tiene; y para esto voy a guardar estas funciones en un txt para poder usarlo luego.

Así como este atributo nuevo que he construido se pueden hacer muchos más; por ejemplo si están usando packers o que dlls importa etc. Incluso se podría buscar “patrones” de algoritmos conocidos de ofuscación en el código; pero eso ya lo dejo a tu criterio.

Construyendo el dataset

Ya tengo un dataset para archivos maliciosos; lo que toca conseguir es un dataset para archivos no maliciosos. Para esto fue simple, tomé una máquina virtual de Windows 10 y otra de Windows server 2012 y extraje todos los pefiles. Finalmente el código tendrá esta forma:

from pefile import PE


suspicious_fn=[]
with open('suspicious_fn.txt') as f:
	suspicious_fn = f.readlines()
	suspicious_fn = [n.strip() for n in suspicious_fn]

"""
Solo voy a extraer los atributos que considero significativos
El objeto debe tener las mismas columnas que todos los casos es por eso 
que no itero las cabceras o uso dump_dict

"""

def get_attr(file, is_malware=1):
	secs = pe.sections
	len_secs = len(secs)
	Misc = [x.Misc for x in secs ] if len(secs) > 0 else [0]
	Misc_PhysicalAddress = [x.Misc_PhysicalAddress for x in secs ] if len_secs > 0 else [0]
	Misc_VirtualSize = [x.Misc_VirtualSize for x in secs ] if len_secs > 0 else [0]
	VirtualAddress = [x.VirtualAddress for x in secs ] if len_secs > 0 else [0]
	SizeOfRawData = [x.SizeOfRawData for x in secs ] if len_secs > 0 else [0]
	PointerToRawData = [x.PointerToRawData for x in secs ] if len_secs > 0 else [0]
	Characteristics = [x.Characteristics for x in secs ] if len_secs > 0 else [0]
	entropy = [x.get_entropy() for x in secs ] if len_secs > 0 else [0]
	 

	data = {
		 "e_magic": file.DOS_HEADER.e_magic,
		 "e_cblp": file.DOS_HEADER.e_cblp,
		 "e_cp": file.DOS_HEADER.e_cp,
		 "e_minalloc": file.DOS_HEADER.e_minalloc,
		 "e_maxalloc": file.DOS_HEADER.e_maxalloc,
		 "e_lfanew": file.DOS_HEADER.e_lfanew,

		 "Machine": file.FILE_HEADER.Machine,
		 "NumberOfSections": file.FILE_HEADER.NumberOfSections,
		 "TimeDateStamp": file.FILE_HEADER.TimeDateStamp,
		 "PointerToSymbolTable": file.FILE_HEADER.PointerToSymbolTable,
		 "NumberOfSymbols": file.FILE_HEADER.NumberOfSymbols,
		 "SizeOfOptionalHeader": file.FILE_HEADER.SizeOfOptionalHeader,
		 "Characteristics": file.FILE_HEADER.Characteristics,

		 "Magic": file.OPTIONAL_HEADER.Magic,
		 "MajorLinkerVersion": file.OPTIONAL_HEADER.MajorLinkerVersion,
		 "MinorLinkerVersion": file.OPTIONAL_HEADER.MinorLinkerVersion,
		 "SizeOfCode": file.OPTIONAL_HEADER.SizeOfCode,
		 "SizeOfInitializedData": file.OPTIONAL_HEADER.SizeOfInitializedData,
		 "SizeOfUninitializedData": file.OPTIONAL_HEADER.SizeOfUninitializedData,
		 "AddressOfEntryPoint": file.OPTIONAL_HEADER.AddressOfEntryPoint,
		 "BaseOfCode": file.OPTIONAL_HEADER.BaseOfCode,
		 "BaseOfData": file.OPTIONAL_HEADER.BaseOfData,
		 "ImageBase": file.OPTIONAL_HEADER.ImageBase,
		 "SectionAlignment": file.OPTIONAL_HEADER.SectionAlignment,
		 "FileAlignment": file.OPTIONAL_HEADER.FileAlignment,
		 "SizeOfImage": file.OPTIONAL_HEADER.SizeOfImage,
		 "SizeOfHeaders": file.OPTIONAL_HEADER.SizeOfHeaders,
		 "CheckSum": file.OPTIONAL_HEADER.CheckSum,
		 "Subsystem": file.OPTIONAL_HEADER.Subsystem,
		 "DllCharacteristics": file.OPTIONAL_HEADER.DllCharacteristics,
		 "SizeOfStackReserve": file.OPTIONAL_HEADER.SizeOfStackReserve,
		 "SizeOfStackCommit": file.OPTIONAL_HEADER.SizeOfStackCommit,
		 "SizeOfHeapReserve": file.OPTIONAL_HEADER.SizeOfHeapReserve,
		 "SizeOfHeapCommit": file.OPTIONAL_HEADER.SizeOfHeapCommit,
		 "NumberOfRvaAndSizes": file.OPTIONAL_HEADER.NumberOfRvaAndSizes,

		 "SectionsLength" : len(secs),
		 "Misc_min": min(Misc),
		 "Misc_max": max(Misc),
		 "Misc_PhysicalAddress_min": min(Misc_PhysicalAddress),
		 "Misc_PhysicalAddress_max": max(Misc_PhysicalAddress),
		 "Misc_VirtualSize_min": min(Misc_VirtualSize),
		 "Misc_VirtualSize_max": max(Misc_VirtualSize),
		 "SizeOfRawData_min": min(SizeOfRawData),
		 "SizeOfRawData_max": max(SizeOfRawData),
		 "PointerToRawData_min": min(PointerToRawData),
		 "PointerToRawData_max": max(PointerToRawData),
		 "Characteristics_min": min(Characteristics),
		 "Characteristics_max": max(Characteristics),

		 "entropy_min": min(entropy),
		 "entropy_max": max(entropy),
         "malware" : is_malware
	}

	sp_fn = 0
	try:
		for entry in file.DIRECTORY_ENTRY_IMPORT:
			for func in entry.imports:
				if func.name.decode('utf-8') in suspicious_fn:
					sp_fn+=1
		data['SuspiciousFunctions'] = sp_fn
	except AttributeError:
		data['SuspiciousFunctions'] = 0
	return data



mw_dataset_path = "./dataset/malwares/"
dataset = []
malwares = [f for f in listdir(mw_dataset_path) if isfile(join(mw_dataset_path, f))]
for n in malwares:
	pe = PE(mw_dataset_path + n)
	dataset.append(get_attr(pe, 1))

dataset_path = "./dataset/pefiles/"
dataset = []
pefiles = [f for f in listdir(dataset_path) if isfile(join(dataset_path, f))]
for n in pefiles:
	pe = PE(dataset_path + n)
	dataset.append(get_attr(pe, 0))

Finalmente queda probar hace un análisis de los datos; quizá extraer la relevancia de las columnas y demás. Este trabajo suele ser el más importante (el tratamiento de datos) para que al ingresar al modelo el trabajo para este sea más simple. Pero como este blog no es de machine learning no lo haré. Solo voy a ejecutar el modelo (que se que funciona) para hacer pruebas de pefiles.

import pandas as pd 
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

data = pd.read_csv('./dataset_malwares.csv', sep=',')

X = data.drop(['malware'], axis=1)
y = data['malware']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

rf = RandomForestClassifier(n_estimators=70,  max_depth = 12, random_state=0)
rf.fit(X_train, y_train)

y_pred = rf.predict(X_test)
print(classification_report(y_pred, y_test))

Obteniendo finalmente un buen resultado en la predicción.

resultado score malware

Esto definitivamente es mejorable cambiando los atributos o agregando algunos más; como ya mencione incluso extraer atributos del código mismo. Ya con el modelo almacenado se puede comenzar a probar de forma aislada estos resultados.

deteccion antimalware machine learning

Conclusión

Al utilizar el modelo entrenado para detectar estas anomalías en los pefiles, el resultado no es perfecto para los archivos que no son malwares; como se ve en la imagen hay varios falsos positivos e incluso el umbral es bastante corto. Por el contrario, los malwares si son detectado en su totalidad como maliciosos. Como ya mencione este es mejorable quizá analizando algunos atributos que están de más y generan estos resultados; pero como un paso inicial a algo más elaborado lo veo suficiente.