Усиление Wazuh с помощью Ollama: Повышение кибербезопасности (Часть 4)

Продолжение серии: Интеграция кластера Wazuh с Ollama — Часть 4. Настройка и внедрение

В прошлой части мы рассмотрели основные принципы создания интеграций. Теперь настало время объединить все элементы и завершить интеграцию Wazuh с Ollama.

Готовый код для интеграции с Ollama

Сначала установим необходимые зависимости для Ollama.

/var/ossec/framework/python/bin/pip3 install ollama

Ранее я уже описывал установку Wazuh с помощью Docker Compose, но просто развернуть его не получится.

Для корректной работы потребуется создать собственный образ Wazuh, однако процесс пересборки образа мы в этом посте рассматривать не будем.

Теперь переходим к написанию кода. Вы можете выбрать любую удобную IDE и среду для разработки.

Создаем основной файл интеграции — custom-integration-ollama.py и добавляем необходимые импорты.

#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client

Далее объявим несколько глобальных переменных. Использовать их или нет — решать вам.

# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)

По умолчанию рекомендуется отключать режим debug. Теперь создадим функцию для ведения отладочных логов.

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)
    integration_logs = open(log_file, "a")
    integration_logs.write(str(msg))
    integration_logs.close()

Также добавим внутреннюю проверку в базе данных на наличие IP-адреса.

def in_database(data, srcip):
    result = data["src_ip"]
    if result == 0:
        return False
    return True

Для отправки событий в Wazuh мы будем использовать unix-socket, как уже упоминалось ранее.

Переменная socket_addr будет хранить полный путь до unix-socket Wazuh, который обычно находится по пути /var/ossec/queue/sockets/queue.

def send_event(msg, agent=None):
    if not agent or agent["id"] == "000":
        string = "1:ollama:{0}".format(json.dumps(msg))
    else:
        string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
            agent["id"],
            agent["name"],
            agent["ip"] if "ip" in agent else "any",
            json.dumps(msg),
        )
    debug(string)
    sock = socket(AF_UNIX, SOCK_DGRAM)
    sock.connect(socket_addr)
    sock.send(string.encode())
    sock.close()

Теперь давайте немного объясним код:

if not agent or agent["id"] == "000": - Если агент не указан или его ID равен “000”, создается строка с сообщением, в которой отсутствует информация об агенте.

В противном случае формируется строка, содержащая ID, имя, IP-адрес агента и само сообщение (событие).

string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
        agent["id"],
        agent["name"],
        agent["ip"] if "ip" in agent else "any",
        json.dumps(msg),
)

Создаем сокет с помощью sock = socket(AF_UNIX, SOCK_DGRAM) и подключаемся к нему через sock.connect(socket_addr).

Затем отправляем данные с помощью sock.send(string.encode()), предварительно закодировав строку в байты. Не забываем закрывать сокет командой sock.close().

Теперь перейдем к созданию функции для работы с Ollama API.

def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
    client = Client(
            host=ollama_host,
    )
    response = client.chat(model=ollama_model, messages=[
            {
                'role': 'user',
                'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
            },
        ],
    )
    return response.model_dump().get('message').get('content')

Функция принимает следующие аргументы:

  • src_ip: IP-адрес, информацию о котором нужно получить.

  • ollama_host: Адрес Ollama, по умолчанию http://localhost:11434.

  • ollama_model: Модель, которую будем использовать, по умолчанию llama3.2. Убедитесь, что указанная модель доступна в Ollama, иначе функция не будет работать.

Чтобы получить модель, достаточно выполнить следующую команду:

ollama pull llama3.2

Построчное объяснение работы функции:

Создаем клиента с адресом хоста по умолчанию или указываем другой, если это необходимо.

client = Client(
    host=ollama_host,
)

Затем отправляем в чат prompt, который будет содержать IP-адрес, полученный из события безопасности.

response = client.chat(model=ollama_model, messages=[
    {
        'role': 'user',
        'content': 'Tell me more about IP {}'.format(src_ip),
    },
])

После этого возвращаем ответ от Ollama с помощью response.model_dump().get('message').get('content').

Затем необходимо создать функцию, которая будет формировать новое событие, принимая во внимание ответ от Ollama.

def get_ollama_info(alert):
    alert_output = {}
    # Exit if the alert does not contain a source IP address.
    if "srcip" not in alert["data"]:
        return 0
    # Request info from Ollama
    data = query_ollama_api(alert["data"]["srcip"])
    # Create alert
    alert_output["ollama"] = {}
    alert_output["integration"] = "custom-ollama"
    alert_output["ollama"]["found"] = 0
    alert_output["ollama"]["source"] = {}
    alert_output["ollama"]["source"]["alert_id"] = alert["id"]
    alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
    alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
    alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
    alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
    alert_output["ollama"]["info"] = data
    alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
    alert_output["ollama"]["info"] = data
    return alert_output

Вызов функции будет осуществляться следующим образом:

if __name__ == "__main__":
    debug("# Starting")
    alert_file_location = sys.argv[1]
    # Ollama base domain
    ollama_domain = sys.argv[3]
    debug("# File location")
    debug(alert_file_location)
    # Load alerts for parsing JSON objects.
    with open(alert_file_location) as alert_file:
        alert = json.load(alert_file)
    debug("# Processing alert")
    debug(alert)
    msg = get_ollama_info(alert)
    # If a positive match is detected, send the event to the Wazuh Manager.
    if msg:
        send_event(msg, alert["agent"])

Полный скрипт

#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client

# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)
    integration_logs = open(log_file, "a")
    integration_logs.write(str(msg))
    integration_logs.close()


def in_database(data, srcip):
    result = data["src_ip"]
    if result == 0:
        return False
    return True


def send_event(msg, agent=None):
    if not agent or agent["id"] == "000":
        string = "1:ollama:{0}".format(json.dumps(msg))
    else:
        string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
            agent["id"],
            agent["name"],
            agent["ip"] if "ip" in agent else "any",
            json.dumps(msg),
        )
    debug(string)
    sock = socket(AF_UNIX, SOCK_DGRAM)
    sock.connect(socket_addr)
    sock.send(string.encode())
    sock.close()


def get_ollama_info(alert):
    alert_output = {}
    # Exit if the alert does not contain a source IP address.
    if "srcip" not in alert["data"]:
        return 0
    # Request info from Ollama
    data = query_ollama_api(alert["data"]["srcip"])
    # Create alert
    alert_output["ollama"] = {}
    alert_output["integration"] = "custom-ollama"
    alert_output["ollama"]["found"] = 0
    alert_output["ollama"]["source"] = {}
    alert_output["ollama"]["source"]["alert_id"] = alert["id"]
    alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
    alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
    alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
    alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
    alert_output["ollama"]["info"] = data
    alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
    alert_output["ollama"]["info"] = data
    return alert_output


def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
    client = Client(
            host=ollama_host,
    )
    response = client.chat(model=ollama_model, messages=[
            {
                'role': 'user',
                'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
            },
        ],
    )
    return response.model_dump().get('message').get('content')


if __name__ == "__main__":
    debug("# Starting")
    alert_file_location = sys.argv[1]
    # Ollama base domain
    ollama_domain = sys.argv[3]
    debug("# File location")
    debug(alert_file_location)
    # Load alerts for parsing JSON objects.
    with open(alert_file_location) as alert_file:
        alert = json.load(alert_file)
    debug("# Processing alert")
    debug(alert)
    msg = get_ollama_info(alert)
    # If a positive match is detected, send the event to the Wazuh Manager.
    if msg:
        send_event(msg, alert["agent"])

Создаем файл /var/ossec/integrations/custom-integration-ollama.py с приведенным выше кодом.

Не забудьте установить соответствующие права на файл:

chmod 750 /var/ossec/integrations/custom-integration-ollama.py
chown root:wazuh /var/ossec/integrations/custom-integration-ollama.py

Теперь, когда скрипт интеграции для Ollama готов, необходимо завершить настройку интеграции с Wazuh:

Настройка ossec.conf

В файле /var/ossec/etc/ossec.conf добавьте следующие строки:

<integration>
    <name>custom-integration-ollama.py</name>
    <hook_url>http://localhost:11434</hook_url>
    <level>10</level>
    <rule_id>100004,100005</rule_id>
    <alert_format>json</alert_format>
</integration>

Интеграция будет срабатывать для правил с id 100004 и 100005.

Настройка правила

В файле /var/ossec/etc/rules/local_rules.xml добавьте следующие строки:

<group name="local,syslog,sshd,">
 <rule id="100004" level="10">
    <if_sid>5760</if_sid>
    <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
    <description>sshd: Authentication failed from a public IP address > $(srcip).</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>
 <rule id="100005" level="10">
    <if_sid>5710</if_sid>
    <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
    <description>sshd: Authentication failed from a public IP address > $(srcip).</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>
</group>

<group name="local,syslog,sshd,">
  <rule id="100007" level="10">
    <field name="ollama.srcip">\.+</field>
    <description>[OLLAMA] IP address $(ollama.srcip) trying to connect to the network.</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>
</group>

После этого, перезапустите службу Wazuh:

Если не используется docker compose

systemctl restart wazuh-manager.service 

Если используется docker compose

docker compose stop
docker compose up -d --force-recreate

That’s it!

Now, when rules with IDs 100004 or 100005 are activated, the attacker’s IP address will be sent to Ollama, and subsequently forwarded to Wazuh via a Unix socket, along with more details about the attacker’s IP.

You can view the results in the Wazuh Dashboard, where events will be shown with the response from Ollama taken into account.


Смотрите также