Создание системы обнаружения угроз на основе ML с использованием датасетов honeypot на Hugging Face
Введение
Представьте себе: вы анализируете логи безопасности и видите тысячи событий в день. Какие из них реально опасны? Какие можно проигнорировать? Традиционные методы обнаружения на основе сигнатур уже не справляются с современным потоком угроз — злоумышленники научились их обходить быстрее, чем мы успеваем их обновлять.
Вот тут-то и приходит на помощь машинное обучение! Представьте систему, которая автоматически изучает поведенческие паттерны атакующих и предсказывает новые угрозы еще до того, как они попадут в базы сигнатур. Звучит как фантастика? А вот и нет!
Данные honeypot — это золотая жила для таких систем. В отличие от скучных академических датасетов, honeypot собирают настоящие данные о реальных атаках. Это как подслушивать разговоры хакеров в их естественной среде обитания — бесценная информация для понимания их методов и паттернов поведения.
В этом руководстве мы пройдем весь путь — от сырых данных honeypot до работающей системы обнаружения угроз. Готовы превратить хаос событий безопасности в понятную картину угроз? Тогда поехали!
Что скрывается в данных honeypot?
Прежде чем превращать данные в умные модели, давайте разберемся — что же на самом деле собирают наши honeypot-ловушки? Представьте honeypot как скрытую камеру в мире кибернапападений. Вот какие “кадры” мы получаем:
Данные сетевых потоков
Сырые сетевые соединения содержат фундаментальную информацию о паттернах атак:
- IP-адреса и порты источника/назначения: Географические паттерны и паттерны таргетинга сервисов
- Информация о протоколах: Использование TCP/UDP, протоколы прикладного уровня
- Статистика потоков: Количество пакетов, объемы байт, длительность сессий
- Временные данные: Метки времени соединений, интервалы сессий
События прикладного уровня
Взаимодействия более высокого уровня предоставляют поведенческие инсайты:
- Попытки входа: Паттерны credential stuffing, брутфорс
- Выполнение команд: Shell-команды, развертывание вредоносных программ
- Файловые операции: Активности загрузки/скачивания, попытки эксфильтрации данных
- Протокол-специфичные действия: HTTP-запросы, SSH-сессии, запросы к БД
Обогащенные метаданные
Дополнительный контекст улучшает сырые данные:
- Геолокация: Информация о стране, регионе, ASN
- Threat Intelligence: Репутация IP, известные сигнатуры вредоносных программ
- Поведенческие паттерны: Кластеризация сессий, атрибуция кампаний атак
Превращаем хаос в порядок: очистка данных
Сырые данные honeypot — это как необработанная нефть: полно потенциала, но сначала нужно прилично поработать над очисткой. Представьте, что вы детектив, разбирающий улики — некоторые показания свидетелей недостоверны, даты не сходятся, а некоторые записи просто дублируются. Вот как мы наводим порядок в этом хаосе:
1. Валидация и санитизация данных
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def validate_network_data(df):
# Маппинг имен колонок (датасет использует 'dest_port' вместо 'dst_port')
if 'dest_port' in df.columns and 'dst_port' not in df.columns:
df['dst_port'] = df['dest_port']
# Конвертация временных меток с обработкой UTC для смешанных часовых поясов
if '@timestamp' in df.columns and 'timestamp' not in df.columns:
df['timestamp'] = pd.to_datetime(df['@timestamp'], format='mixed', errors='coerce', utc=True)
elif 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], format='mixed', errors='coerce', utc=True)
# Конвертация портов в числовые значения, обработка строковых портов
for port_col in ['src_port', 'dst_port']:
if port_col in df.columns:
df[port_col] = pd.to_numeric(df[port_col], errors='coerce')
# Удаление неверных IP-адресов
if 'src_ip' in df.columns:
df['src_ip'] = df['src_ip'].astype(str)
df = df[df['src_ip'].str.match(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$', na=False)]
# Валидация диапазонов портов (после конвертации в числовые)
if 'src_port' in df.columns:
df = df[df['src_port'].notna() & (df['src_port'] >= 1) & (df['src_port'] <= 65535)]
if 'dst_port' in df.columns:
df = df[df['dst_port'].notna() & (df['dst_port'] >= 1) & (df['dst_port'] <= 65535)]
# Удаление строк с неверными временными метками - делаем current_time timezone aware
if 'timestamp' in df.columns:
current_time = pd.Timestamp.now(tz='UTC')
df = df[df['timestamp'].notna() & (df['timestamp'] <= current_time)]
return df
2. Обработка пропущенных данных и выбросов
def preprocess_security_events(df):
# Обработка пропущенных геолокационных данных
df['country'].fillna('Unknown', inplace=True)
df['asn'].fillna(0, inplace=True)
# Ограничение экстремальных выбросов в числовых признаках
for col in ['bytes_sent', 'bytes_received', 'session_duration']:
q99 = df[col].quantile(0.99)
df[col] = df[col].clip(upper=q99)
# Удаление дубликатов событий (оставляем первое вхождение)
df.drop_duplicates(subset=['src_ip', 'dst_port', 'timestamp'], keep='first', inplace=True)
return df
Создание признаков для обнаружения угроз
А теперь самое интересное — превращаем сырые данные в “пищу” для наших ML-моделей! Это как готовить блюдо из отдельных ингредиентов: каждый признак — это специя, которая добавляет свой особый вкус в общее понимание атак. Давайте посмотрим, какие “рецепты” работают лучше всего:
1. Временные признаки
Временные паттерны критичны для идентификации кампаний атак и поведенческих аномалий:
def create_temporal_features(df):
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# Время с первого соединения от того же источника
df['first_seen'] = df.groupby('src_ip')['timestamp'].transform('min')
df['hours_since_first_seen'] = (df['timestamp'] - df['first_seen']).dt.total_seconds() / 3600
# Признаки частоты соединений
df['daily_connection_count'] = df.groupby(['src_ip', df['timestamp'].dt.date])['src_ip'].transform('count')
return df
2. Признаки поведенческой агрегации
Статистические сводки выявляют паттерны атакующих:
def create_behavioral_features(df):
# Агрегации по исходному IP
source_stats = df.groupby('src_ip').agg({
'dst_port': ['nunique', 'count'],
'bytes_sent': ['mean', 'std', 'max'],
'session_duration': ['mean', 'median'],
'protocol': lambda x: x.mode().iloc[0] if len(x) > 0 else 'unknown'
}).reset_index()
# Выравнивание названий колонок
source_stats.columns = ['src_ip', 'unique_ports', 'total_connections',
'avg_bytes', 'std_bytes', 'max_bytes',
'avg_duration', 'median_duration', 'primary_protocol']
# Объединение с исходным датасетом
df = df.merge(source_stats, on='src_ip', how='left')
# Индикаторы сканирования портов
df['port_diversity'] = df['unique_ports'] / df['total_connections']
df['is_port_scanner'] = (df['unique_ports'] > 10).astype(int)
return df
3. Географические и сетевые признаки
Географические паттерны помогают идентифицировать координированные атаки:
def create_geographic_features(df):
# Оценка угроз на уровне стран
country_threat_scores = df.groupby('country').agg({
'src_ip': 'nunique',
'attack_type': lambda x: (x != 'benign').sum()
}).reset_index()
country_threat_scores['threat_ratio'] = (
country_threat_scores['attack_type'] / country_threat_scores['src_ip']
)
df = df.merge(country_threat_scores[['country', 'threat_ratio']],
on='country', how='left')
# ASN-основанные признаки
asn_stats = df.groupby('asn').agg({
'src_ip': 'nunique',
'bytes_sent': 'mean'
}).reset_index()
df = df.merge(asn_stats, on='asn', suffixes=('', '_asn_avg'))
return df
Выбираем идеальную ML-модель для охоты на угрозы
Время выбирать наше оружие! Как в видеоиграх, где для разных боссов нужно разное оружие, так и в машинном обучении — для разных типов угроз работают разные подходы. Давайте разберемся, какая “экипировка” лучше всего подойдет для наших задач:
1. Бинарная классификация: Атака против нормального трафика
Для базового обнаружения угроз начните с бинарной классификации:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
def train_binary_classifier(df):
# Подготовка признаков
feature_cols = ['hour', 'day_of_week', 'unique_ports', 'total_connections',
'avg_bytes', 'port_diversity', 'threat_ratio']
X = df[feature_cols].fillna(0)
y = (df['attack_type'] != 'benign').astype(int)
# Разделение данных
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Обучение модели
rf_model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
random_state=42
)
rf_model.fit(X_train, y_train)
# Оценка
y_pred = rf_model.predict(X_test)
print(classification_report(y_test, y_pred))
return rf_model
2. Многоклассовая классификация атак
Для детальной категоризации угроз:
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBClassifier
def train_multiclass_classifier(df):
# Кодирование типов атак
le = LabelEncoder()
df['attack_label'] = le.fit_transform(df['attack_type'])
feature_cols = ['hour', 'day_of_week', 'unique_ports', 'total_connections',
'avg_bytes', 'std_bytes', 'port_diversity', 'avg_duration']
X = df[feature_cols].fillna(0)
y = df['attack_label']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# XGBoost для многоклассовой классификации
xgb_model = XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
random_state=42
)
xgb_model.fit(X_train, y_train)
y_pred = xgb_model.predict(X_test)
print(classification_report(y_test, y_pred, target_names=le.classes_))
return xgb_model, le
3. Обнаружение аномалий для Zero-Day угроз
Необучаемое обучение идентифицирует ранее невиданные паттерны атак:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
def train_anomaly_detector(df):
# Используем только нормальный трафик для обучения
benign_data = df[df['attack_type'] == 'benign']
feature_cols = ['unique_ports', 'total_connections', 'avg_bytes',
'port_diversity', 'avg_duration', 'hours_since_first_seen']
X_benign = benign_data[feature_cols].fillna(0)
# Масштабирование признаков
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_benign)
# Обучение isolation forest
iso_forest = IsolationForest(
n_estimators=100,
contamination=0.05, # Ожидаем 5% аномалий
random_state=42
)
iso_forest.fit(X_scaled)
# Тестирование на полном датасете
X_all = df[feature_cols].fillna(0)
X_all_scaled = scaler.transform(X_all)
anomaly_scores = iso_forest.decision_function(X_all_scaled)
df['anomaly_score'] = anomaly_scores
df['is_anomaly'] = iso_forest.predict(X_all_scaled) == -1
return iso_forest, scaler
От лаборатории к боевым условиям: развертывание модели
Метрики производительности для моделей безопасности
Модели безопасности требуют специализированных метрик оценки:
from sklearn.metrics import precision_recall_curve, roc_curve, auc
import matplotlib.pyplot as plt
def evaluate_security_model(y_true, y_pred_proba):
# Кривая точность-полнота (лучше для несбалансированных данных)
precision, recall, pr_thresholds = precision_recall_curve(y_true, y_pred_proba)
pr_auc = auc(recall, precision)
# ROC-кривая
fpr, tpr, roc_thresholds = roc_curve(y_true, y_pred_proba)
roc_auc = auc(fpr, tpr)
# Поиск оптимального порога для минимальных ложных срабатываний
optimal_idx = np.argmax(precision * recall)
optimal_threshold = pr_thresholds[optimal_idx]
print(f"PR-AUC: {pr_auc:.3f}")
print(f"ROC-AUC: {roc_auc:.3f}")
print(f"Оптимальный порог: {optimal_threshold:.3f}")
return optimal_threshold
Пайплайн вывода в реальном времени
Развертывание моделей для обнаружения угроз в реальном времени:
import joblib
from datetime import datetime
class ThreatDetectionPipeline:
def __init__(self, model_path, scaler_path=None):
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path) if scaler_path else None
def preprocess_event(self, event):
# Преобразование сырого события в вектор признаков
features = {
'hour': datetime.fromisoformat(event['timestamp']).hour,
'unique_ports': event.get('unique_ports', 1),
'total_connections': event.get('connection_count', 1),
'avg_bytes': event.get('bytes_sent', 0),
'port_diversity': event.get('port_diversity', 0),
}
feature_vector = np.array(list(features.values())).reshape(1, -1)
if self.scaler:
feature_vector = self.scaler.transform(feature_vector)
return feature_vector
def predict_threat(self, event):
features = self.preprocess_event(event)
# Получение вероятности предсказания
threat_probability = self.model.predict_proba(features)[0][1]
return {
'threat_probability': float(threat_probability),
'is_threat': threat_probability > 0.5,
'risk_level': 'высокий' if threat_probability > 0.8 else 'средний' if threat_probability > 0.5 else 'низкий'
}
# Пример использования
pipeline = ThreatDetectionPipeline('threat_model.pkl', 'feature_scaler.pkl')
sample_event = {
'timestamp': '2024-01-15T14:30:00',
'src_ip': '192.168.1.100',
'dst_port': 22,
'bytes_sent': 1024,
'unique_ports': 5,
'connection_count': 15
}
result = pipeline.predict_threat(sample_event)
print(f"Оценка угрозы: {result}")
Лучшие практики и соображения
1. Качество датасета и маркировка
- Валидация истинности: Регулярная валидация логов honeypot против известных сигнатур атак
- Непрерывная маркировка: Реализация автоматизированных пайплайнов маркировки для новых типов атак
- Свежесть данных: Переобучение моделей с недавними данными атак для поддержания эффективности
2. Мониторинг дрейфа модели
- Отслеживание производительности: Мониторинг метрик точности/полноты в продакшене
- Распределение признаков: Обнаружение сдвигов в распределениях входных признаков
- Автоматическое переобучение: Настройка пайплайнов для переобучения моделей при деградации производительности
3. Интеграция с операциями безопасности
- Интеграция SIEM: Экспорт предсказаний модели в информационные системы безопасности
- Настройка оповещений: Корректировка порогов на основе толерантности к риску организации
- Человек в петле: Предоставление механизмов для обратной связи аналитиков безопасности
Работа с публичными датасетами honeypot
Коллекция датасетов: арсенал для изучения угроз
Чтобы показать вам силу этих методов на практике, я собрал целую коллекцию реальных датасетов honeypot на Hugging Face. Каждый датасет — это отдельная “история” о том, как атакующие ведут себя в дикой природе. Давайте познакомимся с этим арсеналом данных:
Большой босс: cyber-security-events-full
pyToshka/cyber-security-events-full
- Размер: 772K событий — настоящий гигант для серьезных экспериментов
- Что внутри: Полнометражный фильм о кибератаках с богатым набором признаков
- Фишки: Сетевые потоки, поведенческие паттерны, географические данные, репутация IP
- Для кого: Идеален для обучения production-ready моделей обнаружения угроз
- Особенность: Это как энциклопедия атак — здесь есть все!
Временной аналитик: attacks-daily
- Размер: 676K записей — фокус на временных паттернах
- Что внутри: Ежедневная хроника атак с временными метками
- Фишки: Временные ряды атак, сезонные паттерны, циклы активности
- Для кого: Перфектен для предсказания “когда” произойдет следующая атака
- Особенность: Показывает, что у хакеров тоже есть режим дня!
Компактный тренажер: cyber-security-events
pyToshka/cyber-security-events
- Размер: 15.1K событий — идеальный размер для быстрых экспериментов
- Что внутри: Кураторская выборка самых интересных атак
- Фишки: Сбалансированный набор различных типов атак
- Для кого: Отлично подходит для первых шагов и прототипирования
- Особенность: Как starter pack для ML-исследователей!
Специалист по вторжениям: network-intrusion-detection
pyToshka/network-intrusion-detection
- Размер: 100 записей — небольшой, но меткий
- Что внутри: Высококачественные примеры сетевых вторжений
- Фишки: Четкие классификации, образцы для IDS-систем
- Для кого: Специально для разработчиков систем обнаружения вторжений
- Особенность: Каждая запись — это учебный пример того, как НЕ надо защищать сеть
Совет от автора: Начните с
cyber-security-eventsдля изучения основ, затем переходите кattacks-dailyдля временного анализа, и заканчивайтеcyber-security-events-fullдля серьезных экспериментов. Это как прохождение уровней в игре — от новичка до эксперта!
Рекомендации по выбору датасетов
При использовании публично доступных датасетов honeypot с платформ вроде Hugging Face, рассмотрите эти практические подходы:
Критерии выбора датасетов
- Свежесть данных: Выбирайте датасеты с недавними паттернами атак
- Объем и разнообразие: Обеспечьте достаточное количество образцов для разных типов атак
- Документация: Ищите хорошо документированные датасеты с четкими описаниями признаков
- Лицензирование: Проверьте соответствующие права использования для вашего случая
Пример интеграции с датасетами Hugging Face
from datasets import load_dataset
# Загрузка комплексного датасета событий безопасности
dataset = load_dataset("pyToshka/cyber-security-events-full")
# Преобразование в pandas для более легких манипуляций
df = dataset['train'].to_pandas()
# Применение пайплайна предобработки
df = validate_network_data(df)
df = create_features(df)
# Обучение модели
model = train_classifier(df)
Совместимость с macOS M1/M2/M4
При запуске этого кода на компьютерах Apple Silicon (M1, M2, M4) могут возникнуть проблемы с установкой XGBoost. Вот как их решить:
Установка зависимостей для Apple Silicon
# Установка OpenMP runtime (необходим для XGBoost)
brew install libomp
# Установка Python пакетов
pip install pandas numpy scikit-learn datasets xgboost
# Если возникают проблемы с XGBoost, попробуйте:
pip uninstall xgboost
pip install xgboost --no-cache-dir
Решение распространенных проблем
Проблема: XGBoostError: XGBoost Library (libxgboost.dylib) could not be loaded
Решение:
brew install libomp
export LDFLAGS="-L/opt/homebrew/opt/libomp/lib"
export CPPFLAGS="-I/opt/homebrew/opt/libomp/include"
Проблема: Проблемы с производительностью на Apple Silicon Решение: Убедитесь, что используете нативную ARM64 установку Python, а не x86_64 через Rosetta.
Дополнительное чтение
- Mitigation Anomaly Revelation Keeper (MARK) - Платформа анализа угроз на основе ИИ
- Интеграция Wazuh с Ollama: Часть 1 - SIEM с усилением ИИ для обнаружения угроз
- Как настроить пользовательскую интеграцию между Wazuh и MARK - Продвинутая автоматизация безопасности
- Чек-лист соответствия Amazon EKS SOC 2 Type II Часть 1 - Соответствие корпоративной безопасности