ML-система обнаружения угроз с датасетами honeypot

Создание системы обнаружения угроз на основе ML с использованием датасетов honeypot на Hugging Face

Введение

Представьте себе: вы анализируете логи безопасности и видите тысячи событий в день. Какие из них реально опасны? Какие можно проигнорировать? Традиционные методы обнаружения на основе сигнатур уже не справляются с современным потоком угроз — злоумышленники научились их обходить быстрее, чем мы успеваем их обновлять.

Вот тут-то и приходит на помощь машинное обучение! Представьте систему, которая автоматически изучает поведенческие паттерны атакующих и предсказывает новые угрозы еще до того, как они попадут в базы сигнатур. Звучит как фантастика? А вот и нет!

Данные honeypot — это золотая жила для таких систем. В отличие от скучных академических датасетов, honeypot собирают настоящие данные о реальных атаках. Это как подслушивать разговоры хакеров в их естественной среде обитания — бесценная информация для понимания их методов и паттернов поведения.

В этом руководстве мы пройдем весь путь — от сырых данных honeypot до работающей системы обнаружения угроз. Готовы превратить хаос событий безопасности в понятную картину угроз? Тогда поехали!

Что скрывается в данных honeypot?

Прежде чем превращать данные в умные модели, давайте разберемся — что же на самом деле собирают наши honeypot-ловушки? Представьте honeypot как скрытую камеру в мире кибернапападений. Вот какие “кадры” мы получаем:

Данные сетевых потоков

Сырые сетевые соединения содержат фундаментальную информацию о паттернах атак:

  • IP-адреса и порты источника/назначения: Географические паттерны и паттерны таргетинга сервисов
  • Информация о протоколах: Использование TCP/UDP, протоколы прикладного уровня
  • Статистика потоков: Количество пакетов, объемы байт, длительность сессий
  • Временные данные: Метки времени соединений, интервалы сессий

События прикладного уровня

Взаимодействия более высокого уровня предоставляют поведенческие инсайты:

  • Попытки входа: Паттерны credential stuffing, брутфорс
  • Выполнение команд: Shell-команды, развертывание вредоносных программ
  • Файловые операции: Активности загрузки/скачивания, попытки эксфильтрации данных
  • Протокол-специфичные действия: HTTP-запросы, SSH-сессии, запросы к БД

Обогащенные метаданные

Дополнительный контекст улучшает сырые данные:

  • Геолокация: Информация о стране, регионе, ASN
  • Threat Intelligence: Репутация IP, известные сигнатуры вредоносных программ
  • Поведенческие паттерны: Кластеризация сессий, атрибуция кампаний атак

Превращаем хаос в порядок: очистка данных

Сырые данные honeypot — это как необработанная нефть: полно потенциала, но сначала нужно прилично поработать над очисткой. Представьте, что вы детектив, разбирающий улики — некоторые показания свидетелей недостоверны, даты не сходятся, а некоторые записи просто дублируются. Вот как мы наводим порядок в этом хаосе:

1. Валидация и санитизация данных

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def validate_network_data(df):
    # Маппинг имен колонок (датасет использует 'dest_port' вместо 'dst_port')
    if 'dest_port' in df.columns and 'dst_port' not in df.columns:
        df['dst_port'] = df['dest_port']

    # Конвертация временных меток с обработкой UTC для смешанных часовых поясов
    if '@timestamp' in df.columns and 'timestamp' not in df.columns:
        df['timestamp'] = pd.to_datetime(df['@timestamp'], format='mixed', errors='coerce', utc=True)
    elif 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'], format='mixed', errors='coerce', utc=True)

    # Конвертация портов в числовые значения, обработка строковых портов
    for port_col in ['src_port', 'dst_port']:
        if port_col in df.columns:
            df[port_col] = pd.to_numeric(df[port_col], errors='coerce')

    # Удаление неверных IP-адресов
    if 'src_ip' in df.columns:
        df['src_ip'] = df['src_ip'].astype(str)
        df = df[df['src_ip'].str.match(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$', na=False)]

    # Валидация диапазонов портов (после конвертации в числовые)
    if 'src_port' in df.columns:
        df = df[df['src_port'].notna() & (df['src_port'] >= 1) & (df['src_port'] <= 65535)]
    if 'dst_port' in df.columns:
        df = df[df['dst_port'].notna() & (df['dst_port'] >= 1) & (df['dst_port'] <= 65535)]

    # Удаление строк с неверными временными метками - делаем current_time timezone aware
    if 'timestamp' in df.columns:
        current_time = pd.Timestamp.now(tz='UTC')
        df = df[df['timestamp'].notna() & (df['timestamp'] <= current_time)]

    return df

2. Обработка пропущенных данных и выбросов

def preprocess_security_events(df):
    # Обработка пропущенных геолокационных данных
    df['country'].fillna('Unknown', inplace=True)
    df['asn'].fillna(0, inplace=True)

    # Ограничение экстремальных выбросов в числовых признаках
    for col in ['bytes_sent', 'bytes_received', 'session_duration']:
        q99 = df[col].quantile(0.99)
        df[col] = df[col].clip(upper=q99)

    # Удаление дубликатов событий (оставляем первое вхождение)
    df.drop_duplicates(subset=['src_ip', 'dst_port', 'timestamp'], keep='first', inplace=True)

    return df

Создание признаков для обнаружения угроз

А теперь самое интересное — превращаем сырые данные в “пищу” для наших ML-моделей! Это как готовить блюдо из отдельных ингредиентов: каждый признак — это специя, которая добавляет свой особый вкус в общее понимание атак. Давайте посмотрим, какие “рецепты” работают лучше всего:

1. Временные признаки

Временные паттерны критичны для идентификации кампаний атак и поведенческих аномалий:

def create_temporal_features(df):
    df['hour'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)

    # Время с первого соединения от того же источника
    df['first_seen'] = df.groupby('src_ip')['timestamp'].transform('min')
    df['hours_since_first_seen'] = (df['timestamp'] - df['first_seen']).dt.total_seconds() / 3600

    # Признаки частоты соединений
    df['daily_connection_count'] = df.groupby(['src_ip', df['timestamp'].dt.date])['src_ip'].transform('count')

    return df

2. Признаки поведенческой агрегации

Статистические сводки выявляют паттерны атакующих:

def create_behavioral_features(df):
    # Агрегации по исходному IP
    source_stats = df.groupby('src_ip').agg({
        'dst_port': ['nunique', 'count'],
        'bytes_sent': ['mean', 'std', 'max'],
        'session_duration': ['mean', 'median'],
        'protocol': lambda x: x.mode().iloc[0] if len(x) > 0 else 'unknown'
    }).reset_index()

    # Выравнивание названий колонок
    source_stats.columns = ['src_ip', 'unique_ports', 'total_connections',
                           'avg_bytes', 'std_bytes', 'max_bytes',
                           'avg_duration', 'median_duration', 'primary_protocol']

    # Объединение с исходным датасетом
    df = df.merge(source_stats, on='src_ip', how='left')

    # Индикаторы сканирования портов
    df['port_diversity'] = df['unique_ports'] / df['total_connections']
    df['is_port_scanner'] = (df['unique_ports'] > 10).astype(int)

    return df

3. Географические и сетевые признаки

Географические паттерны помогают идентифицировать координированные атаки:

def create_geographic_features(df):
    # Оценка угроз на уровне стран
    country_threat_scores = df.groupby('country').agg({
        'src_ip': 'nunique',
        'attack_type': lambda x: (x != 'benign').sum()
    }).reset_index()
    country_threat_scores['threat_ratio'] = (
        country_threat_scores['attack_type'] / country_threat_scores['src_ip']
    )

    df = df.merge(country_threat_scores[['country', 'threat_ratio']],
                 on='country', how='left')

    # ASN-основанные признаки
    asn_stats = df.groupby('asn').agg({
        'src_ip': 'nunique',
        'bytes_sent': 'mean'
    }).reset_index()

    df = df.merge(asn_stats, on='asn', suffixes=('', '_asn_avg'))

    return df

Выбираем идеальную ML-модель для охоты на угрозы

Время выбирать наше оружие! Как в видеоиграх, где для разных боссов нужно разное оружие, так и в машинном обучении — для разных типов угроз работают разные подходы. Давайте разберемся, какая “экипировка” лучше всего подойдет для наших задач:

1. Бинарная классификация: Атака против нормального трафика

Для базового обнаружения угроз начните с бинарной классификации:

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

def train_binary_classifier(df):
    # Подготовка признаков
    feature_cols = ['hour', 'day_of_week', 'unique_ports', 'total_connections',
                   'avg_bytes', 'port_diversity', 'threat_ratio']

    X = df[feature_cols].fillna(0)
    y = (df['attack_type'] != 'benign').astype(int)

    # Разделение данных
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Обучение модели
    rf_model = RandomForestClassifier(
        n_estimators=100,
        max_depth=10,
        min_samples_split=5,
        random_state=42
    )

    rf_model.fit(X_train, y_train)

    # Оценка
    y_pred = rf_model.predict(X_test)
    print(classification_report(y_test, y_pred))

    return rf_model

2. Многоклассовая классификация атак

Для детальной категоризации угроз:

from sklearn.preprocessing import LabelEncoder
from xgboost import XGBClassifier

def train_multiclass_classifier(df):
    # Кодирование типов атак
    le = LabelEncoder()
    df['attack_label'] = le.fit_transform(df['attack_type'])

    feature_cols = ['hour', 'day_of_week', 'unique_ports', 'total_connections',
                   'avg_bytes', 'std_bytes', 'port_diversity', 'avg_duration']

    X = df[feature_cols].fillna(0)
    y = df['attack_label']

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # XGBoost для многоклассовой классификации
    xgb_model = XGBClassifier(
        n_estimators=200,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        random_state=42
    )

    xgb_model.fit(X_train, y_train)

    y_pred = xgb_model.predict(X_test)
    print(classification_report(y_test, y_pred, target_names=le.classes_))

    return xgb_model, le

3. Обнаружение аномалий для Zero-Day угроз

Необучаемое обучение идентифицирует ранее невиданные паттерны атак:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

def train_anomaly_detector(df):
    # Используем только нормальный трафик для обучения
    benign_data = df[df['attack_type'] == 'benign']

    feature_cols = ['unique_ports', 'total_connections', 'avg_bytes',
                   'port_diversity', 'avg_duration', 'hours_since_first_seen']

    X_benign = benign_data[feature_cols].fillna(0)

    # Масштабирование признаков
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_benign)

    # Обучение isolation forest
    iso_forest = IsolationForest(
        n_estimators=100,
        contamination=0.05,  # Ожидаем 5% аномалий
        random_state=42
    )

    iso_forest.fit(X_scaled)

    # Тестирование на полном датасете
    X_all = df[feature_cols].fillna(0)
    X_all_scaled = scaler.transform(X_all)
    anomaly_scores = iso_forest.decision_function(X_all_scaled)

    df['anomaly_score'] = anomaly_scores
    df['is_anomaly'] = iso_forest.predict(X_all_scaled) == -1

    return iso_forest, scaler

От лаборатории к боевым условиям: развертывание модели

Метрики производительности для моделей безопасности

Модели безопасности требуют специализированных метрик оценки:

from sklearn.metrics import precision_recall_curve, roc_curve, auc
import matplotlib.pyplot as plt

def evaluate_security_model(y_true, y_pred_proba):
    # Кривая точность-полнота (лучше для несбалансированных данных)
    precision, recall, pr_thresholds = precision_recall_curve(y_true, y_pred_proba)
    pr_auc = auc(recall, precision)

    # ROC-кривая
    fpr, tpr, roc_thresholds = roc_curve(y_true, y_pred_proba)
    roc_auc = auc(fpr, tpr)

    # Поиск оптимального порога для минимальных ложных срабатываний
    optimal_idx = np.argmax(precision * recall)
    optimal_threshold = pr_thresholds[optimal_idx]

    print(f"PR-AUC: {pr_auc:.3f}")
    print(f"ROC-AUC: {roc_auc:.3f}")
    print(f"Оптимальный порог: {optimal_threshold:.3f}")

    return optimal_threshold

Пайплайн вывода в реальном времени

Развертывание моделей для обнаружения угроз в реальном времени:

import joblib
from datetime import datetime

class ThreatDetectionPipeline:
    def __init__(self, model_path, scaler_path=None):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path) if scaler_path else None

    def preprocess_event(self, event):
        # Преобразование сырого события в вектор признаков
        features = {
            'hour': datetime.fromisoformat(event['timestamp']).hour,
            'unique_ports': event.get('unique_ports', 1),
            'total_connections': event.get('connection_count', 1),
            'avg_bytes': event.get('bytes_sent', 0),
            'port_diversity': event.get('port_diversity', 0),
        }

        feature_vector = np.array(list(features.values())).reshape(1, -1)

        if self.scaler:
            feature_vector = self.scaler.transform(feature_vector)

        return feature_vector

    def predict_threat(self, event):
        features = self.preprocess_event(event)

        # Получение вероятности предсказания
        threat_probability = self.model.predict_proba(features)[0][1]

        return {
            'threat_probability': float(threat_probability),
            'is_threat': threat_probability > 0.5,
            'risk_level': 'высокий' if threat_probability > 0.8 else 'средний' if threat_probability > 0.5 else 'низкий'
        }

# Пример использования
pipeline = ThreatDetectionPipeline('threat_model.pkl', 'feature_scaler.pkl')

sample_event = {
    'timestamp': '2024-01-15T14:30:00',
    'src_ip': '192.168.1.100',
    'dst_port': 22,
    'bytes_sent': 1024,
    'unique_ports': 5,
    'connection_count': 15
}

result = pipeline.predict_threat(sample_event)
print(f"Оценка угрозы: {result}")

Лучшие практики и соображения

1. Качество датасета и маркировка

  • Валидация истинности: Регулярная валидация логов honeypot против известных сигнатур атак
  • Непрерывная маркировка: Реализация автоматизированных пайплайнов маркировки для новых типов атак
  • Свежесть данных: Переобучение моделей с недавними данными атак для поддержания эффективности

2. Мониторинг дрейфа модели

  • Отслеживание производительности: Мониторинг метрик точности/полноты в продакшене
  • Распределение признаков: Обнаружение сдвигов в распределениях входных признаков
  • Автоматическое переобучение: Настройка пайплайнов для переобучения моделей при деградации производительности

3. Интеграция с операциями безопасности

  • Интеграция SIEM: Экспорт предсказаний модели в информационные системы безопасности
  • Настройка оповещений: Корректировка порогов на основе толерантности к риску организации
  • Человек в петле: Предоставление механизмов для обратной связи аналитиков безопасности

Работа с публичными датасетами honeypot

Коллекция датасетов: арсенал для изучения угроз

Чтобы показать вам силу этих методов на практике, я собрал целую коллекцию реальных датасетов honeypot на Hugging Face. Каждый датасет — это отдельная “история” о том, как атакующие ведут себя в дикой природе. Давайте познакомимся с этим арсеналом данных:

Большой босс: cyber-security-events-full

pyToshka/cyber-security-events-full

  • Размер: 772K событий — настоящий гигант для серьезных экспериментов
  • Что внутри: Полнометражный фильм о кибератаках с богатым набором признаков
  • Фишки: Сетевые потоки, поведенческие паттерны, географические данные, репутация IP
  • Для кого: Идеален для обучения production-ready моделей обнаружения угроз
  • Особенность: Это как энциклопедия атак — здесь есть все!

Временной аналитик: attacks-daily

pyToshka/attacks-daily

  • Размер: 676K записей — фокус на временных паттернах
  • Что внутри: Ежедневная хроника атак с временными метками
  • Фишки: Временные ряды атак, сезонные паттерны, циклы активности
  • Для кого: Перфектен для предсказания “когда” произойдет следующая атака
  • Особенность: Показывает, что у хакеров тоже есть режим дня!

Компактный тренажер: cyber-security-events

pyToshka/cyber-security-events

  • Размер: 15.1K событий — идеальный размер для быстрых экспериментов
  • Что внутри: Кураторская выборка самых интересных атак
  • Фишки: Сбалансированный набор различных типов атак
  • Для кого: Отлично подходит для первых шагов и прототипирования
  • Особенность: Как starter pack для ML-исследователей!

Специалист по вторжениям: network-intrusion-detection

pyToshka/network-intrusion-detection

  • Размер: 100 записей — небольшой, но меткий
  • Что внутри: Высококачественные примеры сетевых вторжений
  • Фишки: Четкие классификации, образцы для IDS-систем
  • Для кого: Специально для разработчиков систем обнаружения вторжений
  • Особенность: Каждая запись — это учебный пример того, как НЕ надо защищать сеть

Совет от автора: Начните с cyber-security-events для изучения основ, затем переходите к attacks-daily для временного анализа, и заканчивайте cyber-security-events-full для серьезных экспериментов. Это как прохождение уровней в игре — от новичка до эксперта!

Рекомендации по выбору датасетов

При использовании публично доступных датасетов honeypot с платформ вроде Hugging Face, рассмотрите эти практические подходы:

Критерии выбора датасетов

  • Свежесть данных: Выбирайте датасеты с недавними паттернами атак
  • Объем и разнообразие: Обеспечьте достаточное количество образцов для разных типов атак
  • Документация: Ищите хорошо документированные датасеты с четкими описаниями признаков
  • Лицензирование: Проверьте соответствующие права использования для вашего случая

Пример интеграции с датасетами Hugging Face

from datasets import load_dataset

# Загрузка комплексного датасета событий безопасности
dataset = load_dataset("pyToshka/cyber-security-events-full")

# Преобразование в pandas для более легких манипуляций
df = dataset['train'].to_pandas()

# Применение пайплайна предобработки
df = validate_network_data(df)
df = create_features(df)

# Обучение модели
model = train_classifier(df)

Совместимость с macOS M1/M2/M4

При запуске этого кода на компьютерах Apple Silicon (M1, M2, M4) могут возникнуть проблемы с установкой XGBoost. Вот как их решить:

Установка зависимостей для Apple Silicon

# Установка OpenMP runtime (необходим для XGBoost)
brew install libomp

# Установка Python пакетов
pip install pandas numpy scikit-learn datasets xgboost

# Если возникают проблемы с XGBoost, попробуйте:
pip uninstall xgboost
pip install xgboost --no-cache-dir

Решение распространенных проблем

Проблема: XGBoostError: XGBoost Library (libxgboost.dylib) could not be loaded Решение:

brew install libomp
export LDFLAGS="-L/opt/homebrew/opt/libomp/lib"
export CPPFLAGS="-I/opt/homebrew/opt/libomp/include"

Проблема: Проблемы с производительностью на Apple Silicon Решение: Убедитесь, что используете нативную ARM64 установку Python, а не x86_64 через Rosetta.

Дополнительное чтение


Смотрите также