Malware Analysis Toolkit

A comprehensive toolkit for malware analysis, reverse engineering, and threat research.

Repository: GitHub - Malware Analysis Toolkit

Overview

This toolkit provides security researchers and analysts with the necessary tools to perform both static and dynamic malware analysis in a safe, controlled environment.

Key Features

🔬 Static Analysis

  • File Format Analysis: PE, ELF, Mach-O file inspection
  • String Extraction: Automated string extraction and analysis
  • Hash Calculation: MD5, SHA1, SHA256, fuzzy hashing
  • Signature Detection: YARA rule matching and creation

⚡ Dynamic Analysis

  • Sandbox Integration: Cuckoo Sandbox compatibility
  • Behavioral Monitoring: API call monitoring and analysis
  • Network Traffic: Network behavior analysis
  • File System Changes: Real-time file system monitoring

Technical Implementation

Core Analysis Engine

#!/usr/bin/env python3
"""
Malware Analysis Toolkit
Author: Gaurav Singh
"""

import os
import hashlib
import yara
import pefile
import json
import subprocess
from datetime import datetime
import requests

class MalwareAnalyzer:
    def __init__(self, sample_path):
        self.sample_path = sample_path
        self.sample_name = os.path.basename(sample_path)
        self.analysis_results = {}
        self.yara_rules_path = "yara_rules/"
        
    def calculate_hashes(self):
        """Calculate MD5, SHA1, SHA256 hashes"""
        hashes = {}
        
        with open(self.sample_path, 'rb') as f:
            data = f.read()
            hashes['md5'] = hashlib.md5(data).hexdigest()
            hashes['sha1'] = hashlib.sha1(data).hexdigest()
            hashes['sha256'] = hashlib.sha256(data).hexdigest()
            hashes['file_size'] = len(data)
        
        self.analysis_results['hashes'] = hashes
        return hashes
    
    def extract_strings(self, min_length=4):
        """Extract ASCII and Unicode strings"""
        strings = []
        
        try:
            # Use strings command for better results
            result = subprocess.run(['strings', '-n', str(min_length), self.sample_path], 
                                  capture_output=True, text=True)
            ascii_strings = result.stdout.split('\n')
            
            # Extract Unicode strings
            result = subprocess.run(['strings', '-e', 'l', '-n', str(min_length), self.sample_path], 
                                  capture_output=True, text=True)
            unicode_strings = result.stdout.split('\n')
            
            strings = {
                'ascii': [s for s in ascii_strings if s.strip()],
                'unicode': [s for s in unicode_strings if s.strip()]
            }
            
        except Exception as e:
            print(f"Error extracting strings: {e}")
            strings = {'ascii': [], 'unicode': []}
        
        self.analysis_results['strings'] = strings
        return strings
    
    def pe_analysis(self):
        """Analyze PE file structure"""
        if not self.sample_path.lower().endswith(('.exe', '.dll', '.scr')):
            return None
            
        try:
            pe = pefile.PE(self.sample_path)
            
            pe_info = {
                'machine': hex(pe.FILE_HEADER.Machine),
                'timestamp': datetime.fromtimestamp(pe.FILE_HEADER.TimeDateStamp).isoformat(),
                'characteristics': hex(pe.FILE_HEADER.Characteristics),
                'entry_point': hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint),
                'image_base': hex(pe.OPTIONAL_HEADER.ImageBase),
                'sections': [],
                'imports': [],
                'exports': []
            }
            
            # Analyze sections
            for section in pe.sections:
                section_info = {
                    'name': section.Name.decode().rstrip('\x00'),
                    'virtual_address': hex(section.VirtualAddress),
                    'virtual_size': section.Misc_VirtualSize,
                    'raw_size': section.SizeOfRawData,
                    'entropy': section.get_entropy()
                }
                pe_info['sections'].append(section_info)
            
            # Analyze imports
            if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
                for entry in pe.DIRECTORY_ENTRY_IMPORT:
                    dll_name = entry.dll.decode()
                    functions = []
                    for imp in entry.imports:
                        if imp.name:
                            functions.append(imp.name.decode())
                    pe_info['imports'].append({
                        'dll': dll_name,
                        'functions': functions
                    })
            
            # Analyze exports
            if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
                for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
                    if exp.name:
                        pe_info['exports'].append(exp.name.decode())
            
            self.analysis_results['pe_analysis'] = pe_info
            return pe_info
            
        except Exception as e:
            print(f"PE analysis error: {e}")
            return None
    
    def yara_scan(self):
        """Scan with YARA rules"""
        if not os.path.exists(self.yara_rules_path):
            return []
            
        matches = []
        
        try:
            for rule_file in os.listdir(self.yara_rules_path):
                if rule_file.endswith('.yar'):
                    rule_path = os.path.join(self.yara_rules_path, rule_file)
                    rules = yara.compile(rule_path)
                    
                    rule_matches = rules.match(self.sample_path)
                    for match in rule_matches:
                        matches.append({
                            'rule': match.rule,
                            'tags': match.tags,
                            'strings': [(s.identifier, s.instances) for s in match.strings]
                        })
        except Exception as e:
            print(f"YARA scan error: {e}")
        
        self.analysis_results['yara_matches'] = matches
        return matches
    
    def virustotal_check(self, api_key):
        """Check sample hash against VirusTotal"""
        if not api_key:
            return None
            
        hashes = self.analysis_results.get('hashes', {})
        sha256 = hashes.get('sha256')
        
        if not sha256:
            return None
        
        url = f"https://www.virustotal.com/vtapi/v2/file/report"
        params = {'apikey': api_key, 'resource': sha256}
        
        try:
            response = requests.get(url, params=params)
            vt_data = response.json()
            
            if vt_data.get('response_code') == 1:
                vt_results = {
                    'detection_ratio': f"{vt_data.get('positives', 0)}/{vt_data.get('total', 0)}",
                    'scan_date': vt_data.get('scan_date'),
                    'permalink': vt_data.get('permalink'),
                    'scans': vt_data.get('scans', {})
                }
                
                self.analysis_results['virustotal'] = vt_results
                return vt_results
            
        except Exception as e:
            print(f"VirusTotal check error: {e}")
        
        return None
    
    def behavioral_analysis(self):
        """Basic behavioral indicators analysis"""
        behaviors = []
        strings = self.analysis_results.get('strings', {})
        all_strings = strings.get('ascii', []) + strings.get('unicode', [])
        
        # Check for suspicious strings
        suspicious_patterns = {
            'network': ['http://', 'https://', 'ftp://', 'tcp://', 'udp://'],
            'registry': ['HKEY_', 'SOFTWARE\\', 'CurrentVersion'],
            'files': ['temp', 'system32', 'documents', 'startup'],
            'crypto': ['base64', 'encrypt', 'decrypt', 'key', 'cipher'],
            'anti_analysis': ['debugger', 'analysis', 'sandbox', 'virtual']
        }
        
        for category, patterns in suspicious_patterns.items():
            found_patterns = []
            for string in all_strings:
                for pattern in patterns:
                    if pattern.lower() in string.lower():
                        found_patterns.append(string)
            
            if found_patterns:
                behaviors.append({
                    'category': category,
                    'indicators': found_patterns[:10]  # Limit to first 10
                })
        
        self.analysis_results['behavioral_indicators'] = behaviors
        return behaviors
    
    def generate_report(self, output_format='json'):
        """Generate comprehensive analysis report"""
        report = {
            'sample_info': {
                'name': self.sample_name,
                'path': self.sample_path,
                'analysis_date': datetime.now().isoformat()
            },
            'analysis_results': self.analysis_results
        }
        
        if output_format == 'json':
            return json.dumps(report, indent=2)
        elif output_format == 'html':
            return self.generate_html_report(report)
        else:
            return str(report)
    
    def generate_html_report(self, report_data):
        """Generate HTML report"""
        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Malware Analysis Report</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                .header { background: #2c3e50; color: white; padding: 20px; border-radius: 5px; }
                .section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
                .hash { font-family: monospace; background: #f8f9fa; padding: 5px; }
                .danger { color: #e74c3c; font-weight: bold; }
                .warning { color: #f39c12; font-weight: bold; }
                .info { color: #3498db; }
            </style>
        </head>
        <body>
            <div class="header">
                <h1>🛡️ Malware Analysis Report</h1>
                <p>Sample: {sample_name}</p>
                <p>Analysis Date: {analysis_date}</p>
            </div>
            
            <div class="section">
                <h2>File Hashes</h2>
                <p><strong>MD5:</strong> <span class="hash">{md5}</span></p>
                <p><strong>SHA1:</strong> <span class="hash">{sha1}</span></p>
                <p><strong>SHA256:</strong> <span class="hash">{sha256}</span></p>
                <p><strong>File Size:</strong> {file_size} bytes</p>
            </div>
            
            <!-- Additional sections would be populated here -->
            
        </body>
        </html>
        """
        
        hashes = report_data['analysis_results'].get('hashes', {})
        return html_template.format(
            sample_name=report_data['sample_info']['name'],
            analysis_date=report_data['sample_info']['analysis_date'],
            md5=hashes.get('md5', 'N/A'),
            sha1=hashes.get('sha1', 'N/A'),
            sha256=hashes.get('sha256', 'N/A'),
            file_size=hashes.get('file_size', 'N/A')
        )

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description="Malware Analysis Toolkit")
    parser.add_argument("sample", help="Path to malware sample")
    parser.add_argument("--output", "-o", help="Output file for report")
    parser.add_argument("--format", "-f", choices=['json', 'html'], default='json', help="Report format")
    parser.add_argument("--vt-api-key", help="VirusTotal API key")
    
    args = parser.parse_args()
    
    if not os.path.exists(args.sample):
        print(f"Error: Sample file not found: {args.sample}")
        return
    
    print(f"[+] Analyzing sample: {args.sample}")
    
    analyzer = MalwareAnalyzer(args.sample)
    
    # Perform analysis
    print("[*] Calculating hashes...")
    analyzer.calculate_hashes()
    
    print("[*] Extracting strings...")
    analyzer.extract_strings()
    
    print("[*] Performing PE analysis...")
    analyzer.pe_analysis()
    
    print("[*] Running YARA scan...")
    analyzer.yara_scan()
    
    print("[*] Analyzing behavioral indicators...")
    analyzer.behavioral_analysis()
    
    if args.vt_api_key:
        print("[*] Checking VirusTotal...")
        analyzer.virustotal_check(args.vt_api_key)
    
    # Generate report
    print("[*] Generating report...")
    report = analyzer.generate_report(args.format)
    
    if args.output:
        with open(args.output, 'w') as f:
            f.write(report)
        print(f"[+] Report saved to: {args.output}")
    else:
        print(report)

if __name__ == "__main__":
    main()

Sandbox Integration

class CuckooSandboxAPI:
    def __init__(self, api_url, api_key=None):
        self.api_url = api_url.rstrip('/')
        self.api_key = api_key
        
    def submit_sample(self, file_path, analysis_options=None):
        """Submit sample to Cuckoo Sandbox"""
        url = f"{self.api_url}/tasks/create/file"
        
        files = {'file': open(file_path, 'rb')}
        data = analysis_options or {}
        
        if self.api_key:
            data['api_key'] = self.api_key
        
        response = requests.post(url, files=files, data=data)
        
        if response.status_code == 200:
            return response.json()['task_id']
        else:
            raise Exception(f"Submission failed: {response.text}")
    
    def get_report(self, task_id, report_format='json'):
        """Retrieve analysis report"""
        url = f"{self.api_url}/tasks/report/{task_id}/{report_format}"
        
        params = {}
        if self.api_key:
            params['api_key'] = self.api_key
        
        response = requests.get(url, params=params)
        
        if response.status_code == 200:
            return response.json() if report_format == 'json' else response.text
        else:
            raise Exception(f"Report retrieval failed: {response.text}")

Installation & Usage

# Clone the repository
git clone https://github.com/GauravSingh-CyberSecurity/malware-analysis-toolkit.git
cd malware-analysis-toolkit

# Install dependencies
pip install -r requirements.txt

# Install system dependencies (Ubuntu/Debian)
sudo apt-get install yara strings

# Basic analysis
python malware_analyzer.py sample.exe --output report.json

# Analysis with VirusTotal check
python malware_analyzer.py sample.exe --vt-api-key YOUR_API_KEY --format html --output report.html

# Batch analysis
python batch_analyzer.py samples_directory/ --output-dir reports/

YARA Rules Integration

rule Suspicious_Executable {
    meta:
        description = "Detects suspicious executable characteristics"
        author = "Gaurav Singh"
        date = "2024-01-01"
        
    strings:
        $api1 = "CreateRemoteThread" ascii
        $api2 = "WriteProcessMemory" ascii
        $api3 = "VirtualAllocEx" ascii
        $reg1 = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run" ascii
        $net1 = "http://" ascii
        $net2 = "tcp://" ascii
        
    condition:
        (2 of ($api*)) or (1 of ($reg*) and 1 of ($net*))
}

rule Cryptocurrency_Miner {
    meta:
        description = "Detects cryptocurrency mining malware"
        
    strings:
        $str1 = "stratum+tcp://" ascii
        $str2 = "mining.pool" ascii
        $str3 = "xmrig" ascii nocase
        $str4 = "cryptonight" ascii nocase
        
    condition:
        any of them
}