Malware Analysis Toolkit
Malware Analysis Toolkit
A comprehensive toolkit for malware analysis, reverse engineering, and threat research.
Repository: GitHub - Malware Analysis Toolkit
Overview
This toolkit provides security researchers and analysts with the necessary tools to perform both static and dynamic malware analysis in a safe, controlled environment.
Key Features
🔬 Static Analysis
- File Format Analysis: PE, ELF, Mach-O file inspection
- String Extraction: Automated string extraction and analysis
- Hash Calculation: MD5, SHA1, SHA256, fuzzy hashing
- Signature Detection: YARA rule matching and creation
⚡ Dynamic Analysis
- Sandbox Integration: Cuckoo Sandbox compatibility
- Behavioral Monitoring: API call monitoring and analysis
- Network Traffic: Network behavior analysis
- File System Changes: Real-time file system monitoring
Technical Implementation
Core Analysis Engine
#!/usr/bin/env python3
"""
Malware Analysis Toolkit
Author: Gaurav Singh
"""
import os
import hashlib
import yara
import pefile
import json
import subprocess
from datetime import datetime
import requests
class MalwareAnalyzer:
def __init__(self, sample_path):
self.sample_path = sample_path
self.sample_name = os.path.basename(sample_path)
self.analysis_results = {}
self.yara_rules_path = "yara_rules/"
def calculate_hashes(self):
"""Calculate MD5, SHA1, SHA256 hashes"""
hashes = {}
with open(self.sample_path, 'rb') as f:
data = f.read()
hashes['md5'] = hashlib.md5(data).hexdigest()
hashes['sha1'] = hashlib.sha1(data).hexdigest()
hashes['sha256'] = hashlib.sha256(data).hexdigest()
hashes['file_size'] = len(data)
self.analysis_results['hashes'] = hashes
return hashes
def extract_strings(self, min_length=4):
"""Extract ASCII and Unicode strings"""
strings = []
try:
# Use strings command for better results
result = subprocess.run(['strings', '-n', str(min_length), self.sample_path],
capture_output=True, text=True)
ascii_strings = result.stdout.split('\n')
# Extract Unicode strings
result = subprocess.run(['strings', '-e', 'l', '-n', str(min_length), self.sample_path],
capture_output=True, text=True)
unicode_strings = result.stdout.split('\n')
strings = {
'ascii': [s for s in ascii_strings if s.strip()],
'unicode': [s for s in unicode_strings if s.strip()]
}
except Exception as e:
print(f"Error extracting strings: {e}")
strings = {'ascii': [], 'unicode': []}
self.analysis_results['strings'] = strings
return strings
def pe_analysis(self):
"""Analyze PE file structure"""
if not self.sample_path.lower().endswith(('.exe', '.dll', '.scr')):
return None
try:
pe = pefile.PE(self.sample_path)
pe_info = {
'machine': hex(pe.FILE_HEADER.Machine),
'timestamp': datetime.fromtimestamp(pe.FILE_HEADER.TimeDateStamp).isoformat(),
'characteristics': hex(pe.FILE_HEADER.Characteristics),
'entry_point': hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint),
'image_base': hex(pe.OPTIONAL_HEADER.ImageBase),
'sections': [],
'imports': [],
'exports': []
}
# Analyze sections
for section in pe.sections:
section_info = {
'name': section.Name.decode().rstrip('\x00'),
'virtual_address': hex(section.VirtualAddress),
'virtual_size': section.Misc_VirtualSize,
'raw_size': section.SizeOfRawData,
'entropy': section.get_entropy()
}
pe_info['sections'].append(section_info)
# Analyze imports
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode()
functions = []
for imp in entry.imports:
if imp.name:
functions.append(imp.name.decode())
pe_info['imports'].append({
'dll': dll_name,
'functions': functions
})
# Analyze exports
if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
if exp.name:
pe_info['exports'].append(exp.name.decode())
self.analysis_results['pe_analysis'] = pe_info
return pe_info
except Exception as e:
print(f"PE analysis error: {e}")
return None
def yara_scan(self):
"""Scan with YARA rules"""
if not os.path.exists(self.yara_rules_path):
return []
matches = []
try:
for rule_file in os.listdir(self.yara_rules_path):
if rule_file.endswith('.yar'):
rule_path = os.path.join(self.yara_rules_path, rule_file)
rules = yara.compile(rule_path)
rule_matches = rules.match(self.sample_path)
for match in rule_matches:
matches.append({
'rule': match.rule,
'tags': match.tags,
'strings': [(s.identifier, s.instances) for s in match.strings]
})
except Exception as e:
print(f"YARA scan error: {e}")
self.analysis_results['yara_matches'] = matches
return matches
def virustotal_check(self, api_key):
"""Check sample hash against VirusTotal"""
if not api_key:
return None
hashes = self.analysis_results.get('hashes', {})
sha256 = hashes.get('sha256')
if not sha256:
return None
url = f"https://www.virustotal.com/vtapi/v2/file/report"
params = {'apikey': api_key, 'resource': sha256}
try:
response = requests.get(url, params=params)
vt_data = response.json()
if vt_data.get('response_code') == 1:
vt_results = {
'detection_ratio': f"{vt_data.get('positives', 0)}/{vt_data.get('total', 0)}",
'scan_date': vt_data.get('scan_date'),
'permalink': vt_data.get('permalink'),
'scans': vt_data.get('scans', {})
}
self.analysis_results['virustotal'] = vt_results
return vt_results
except Exception as e:
print(f"VirusTotal check error: {e}")
return None
def behavioral_analysis(self):
"""Basic behavioral indicators analysis"""
behaviors = []
strings = self.analysis_results.get('strings', {})
all_strings = strings.get('ascii', []) + strings.get('unicode', [])
# Check for suspicious strings
suspicious_patterns = {
'network': ['http://', 'https://', 'ftp://', 'tcp://', 'udp://'],
'registry': ['HKEY_', 'SOFTWARE\\', 'CurrentVersion'],
'files': ['temp', 'system32', 'documents', 'startup'],
'crypto': ['base64', 'encrypt', 'decrypt', 'key', 'cipher'],
'anti_analysis': ['debugger', 'analysis', 'sandbox', 'virtual']
}
for category, patterns in suspicious_patterns.items():
found_patterns = []
for string in all_strings:
for pattern in patterns:
if pattern.lower() in string.lower():
found_patterns.append(string)
if found_patterns:
behaviors.append({
'category': category,
'indicators': found_patterns[:10] # Limit to first 10
})
self.analysis_results['behavioral_indicators'] = behaviors
return behaviors
def generate_report(self, output_format='json'):
"""Generate comprehensive analysis report"""
report = {
'sample_info': {
'name': self.sample_name,
'path': self.sample_path,
'analysis_date': datetime.now().isoformat()
},
'analysis_results': self.analysis_results
}
if output_format == 'json':
return json.dumps(report, indent=2)
elif output_format == 'html':
return self.generate_html_report(report)
else:
return str(report)
def generate_html_report(self, report_data):
"""Generate HTML report"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Malware Analysis Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background: #2c3e50; color: white; padding: 20px; border-radius: 5px; }
.section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
.hash { font-family: monospace; background: #f8f9fa; padding: 5px; }
.danger { color: #e74c3c; font-weight: bold; }
.warning { color: #f39c12; font-weight: bold; }
.info { color: #3498db; }
</style>
</head>
<body>
<div class="header">
<h1>🛡️ Malware Analysis Report</h1>
<p>Sample: {sample_name}</p>
<p>Analysis Date: {analysis_date}</p>
</div>
<div class="section">
<h2>File Hashes</h2>
<p><strong>MD5:</strong> <span class="hash">{md5}</span></p>
<p><strong>SHA1:</strong> <span class="hash">{sha1}</span></p>
<p><strong>SHA256:</strong> <span class="hash">{sha256}</span></p>
<p><strong>File Size:</strong> {file_size} bytes</p>
</div>
<!-- Additional sections would be populated here -->
</body>
</html>
"""
hashes = report_data['analysis_results'].get('hashes', {})
return html_template.format(
sample_name=report_data['sample_info']['name'],
analysis_date=report_data['sample_info']['analysis_date'],
md5=hashes.get('md5', 'N/A'),
sha1=hashes.get('sha1', 'N/A'),
sha256=hashes.get('sha256', 'N/A'),
file_size=hashes.get('file_size', 'N/A')
)
def main():
import argparse
parser = argparse.ArgumentParser(description="Malware Analysis Toolkit")
parser.add_argument("sample", help="Path to malware sample")
parser.add_argument("--output", "-o", help="Output file for report")
parser.add_argument("--format", "-f", choices=['json', 'html'], default='json', help="Report format")
parser.add_argument("--vt-api-key", help="VirusTotal API key")
args = parser.parse_args()
if not os.path.exists(args.sample):
print(f"Error: Sample file not found: {args.sample}")
return
print(f"[+] Analyzing sample: {args.sample}")
analyzer = MalwareAnalyzer(args.sample)
# Perform analysis
print("[*] Calculating hashes...")
analyzer.calculate_hashes()
print("[*] Extracting strings...")
analyzer.extract_strings()
print("[*] Performing PE analysis...")
analyzer.pe_analysis()
print("[*] Running YARA scan...")
analyzer.yara_scan()
print("[*] Analyzing behavioral indicators...")
analyzer.behavioral_analysis()
if args.vt_api_key:
print("[*] Checking VirusTotal...")
analyzer.virustotal_check(args.vt_api_key)
# Generate report
print("[*] Generating report...")
report = analyzer.generate_report(args.format)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"[+] Report saved to: {args.output}")
else:
print(report)
if __name__ == "__main__":
main()
Sandbox Integration
class CuckooSandboxAPI:
def __init__(self, api_url, api_key=None):
self.api_url = api_url.rstrip('/')
self.api_key = api_key
def submit_sample(self, file_path, analysis_options=None):
"""Submit sample to Cuckoo Sandbox"""
url = f"{self.api_url}/tasks/create/file"
files = {'file': open(file_path, 'rb')}
data = analysis_options or {}
if self.api_key:
data['api_key'] = self.api_key
response = requests.post(url, files=files, data=data)
if response.status_code == 200:
return response.json()['task_id']
else:
raise Exception(f"Submission failed: {response.text}")
def get_report(self, task_id, report_format='json'):
"""Retrieve analysis report"""
url = f"{self.api_url}/tasks/report/{task_id}/{report_format}"
params = {}
if self.api_key:
params['api_key'] = self.api_key
response = requests.get(url, params=params)
if response.status_code == 200:
return response.json() if report_format == 'json' else response.text
else:
raise Exception(f"Report retrieval failed: {response.text}")
Installation & Usage
# Clone the repository
git clone https://github.com/GauravSingh-CyberSecurity/malware-analysis-toolkit.git
cd malware-analysis-toolkit
# Install dependencies
pip install -r requirements.txt
# Install system dependencies (Ubuntu/Debian)
sudo apt-get install yara strings
# Basic analysis
python malware_analyzer.py sample.exe --output report.json
# Analysis with VirusTotal check
python malware_analyzer.py sample.exe --vt-api-key YOUR_API_KEY --format html --output report.html
# Batch analysis
python batch_analyzer.py samples_directory/ --output-dir reports/
YARA Rules Integration
rule Suspicious_Executable {
meta:
description = "Detects suspicious executable characteristics"
author = "Gaurav Singh"
date = "2024-01-01"
strings:
$api1 = "CreateRemoteThread" ascii
$api2 = "WriteProcessMemory" ascii
$api3 = "VirtualAllocEx" ascii
$reg1 = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run" ascii
$net1 = "http://" ascii
$net2 = "tcp://" ascii
condition:
(2 of ($api*)) or (1 of ($reg*) and 1 of ($net*))
}
rule Cryptocurrency_Miner {
meta:
description = "Detects cryptocurrency mining malware"
strings:
$str1 = "stratum+tcp://" ascii
$str2 = "mining.pool" ascii
$str3 = "xmrig" ascii nocase
$str4 = "cryptonight" ascii nocase
condition:
any of them
}