Продолжение серии: Интеграция кластера Wazuh с Ollama — Часть 4. Настройка и внедрение
В прошлой части мы рассмотрели основные принципы создания интеграций. Теперь настало время объединить все элементы и завершить интеграцию Wazuh с Ollama.
Готовый код для интеграции с Ollama
Сначала установим необходимые зависимости для Ollama.
/var/ossec/framework/python/bin/pip3 install ollama
Ранее я уже описывал установку Wazuh с помощью Docker Compose, но просто развернуть его не получится.
Для корректной работы потребуется создать собственный образ Wazuh, однако процесс пересборки образа мы в этом посте рассматривать не будем.
Теперь переходим к написанию кода. Вы можете выбрать любую удобную IDE и среду для разработки.
Создаем основной файл интеграции — custom-integration-ollama.py
и добавляем необходимые импорты.
#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client
Далее объявим несколько глобальных переменных. Использовать их или нет — решать вам.
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)
По умолчанию рекомендуется отключать режим debug. Теперь создадим функцию для ведения отладочных логов.
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
integration_logs = open(log_file, "a")
integration_logs.write(str(msg))
integration_logs.close()
Также добавим внутреннюю проверку в базе данных на наличие IP-адреса.
def in_database(data, srcip):
result = data["src_ip"]
if result == 0:
return False
return True
Для отправки событий в Wazuh мы будем использовать unix-socket
, как уже упоминалось ранее.
Переменная socket_addr
будет хранить полный путь до unix-socket Wazuh, который обычно находится по пути /var/ossec/queue/sockets/queue
.
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = "1:ollama:{0}".format(json.dumps(msg))
else:
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
Теперь давайте немного объясним код:
if not agent or agent["id"] == "000":
- Если агент не указан или его ID равен “000”, создается строка с сообщением, в которой отсутствует информация об агенте.
В противном случае формируется строка, содержащая ID, имя, IP-адрес агента и само сообщение (событие).
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
Создаем сокет с помощью sock = socket(AF_UNIX, SOCK_DGRAM)
и подключаемся к нему через sock.connect(socket_addr)
.
Затем отправляем данные с помощью sock.send(string.encode())
, предварительно закодировав строку в байты. Не забываем закрывать сокет командой sock.close()
.
Теперь перейдем к созданию функции для работы с Ollama API.
def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
client = Client(
host=ollama_host,
)
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
},
],
)
return response.model_dump().get('message').get('content')
Функция принимает следующие аргументы:
src_ip
: IP-адрес, информацию о котором нужно получить.ollama_host
: Адрес Ollama, по умолчанию http://localhost:11434.ollama_model
: Модель, которую будем использовать, по умолчаниюllama3.2
. Убедитесь, что указанная модель доступна в Ollama, иначе функция не будет работать.
Чтобы получить модель, достаточно выполнить следующую команду:
ollama pull llama3.2
Построчное объяснение работы функции:
Создаем клиента с адресом хоста по умолчанию или указываем другой, если это необходимо.
client = Client(
host=ollama_host,
)
Затем отправляем в чат prompt, который будет содержать IP-адрес, полученный из события безопасности.
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {}'.format(src_ip),
},
])
После этого возвращаем ответ от Ollama с помощью response.model_dump().get('message').get('content')
.
Затем необходимо создать функцию, которая будет формировать новое событие, принимая во внимание ответ от Ollama.
def get_ollama_info(alert):
alert_output = {}
# Exit if the alert does not contain a source IP address.
if "srcip" not in alert["data"]:
return 0
# Request info from Ollama
data = query_ollama_api(alert["data"]["srcip"])
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
return alert_output
Вызов функции будет осуществляться следующим образом:
if __name__ == "__main__":
debug("# Starting")
alert_file_location = sys.argv[1]
# Ollama base domain
ollama_domain = sys.argv[3]
debug("# File location")
debug(alert_file_location)
# Load alerts for parsing JSON objects.
with open(alert_file_location) as alert_file:
alert = json.load(alert_file)
debug("# Processing alert")
debug(alert)
msg = get_ollama_info(alert)
# If a positive match is detected, send the event to the Wazuh Manager.
if msg:
send_event(msg, alert["agent"])
Полный скрипт
#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
integration_logs = open(log_file, "a")
integration_logs.write(str(msg))
integration_logs.close()
def in_database(data, srcip):
result = data["src_ip"]
if result == 0:
return False
return True
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = "1:ollama:{0}".format(json.dumps(msg))
else:
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
def get_ollama_info(alert):
alert_output = {}
# Exit if the alert does not contain a source IP address.
if "srcip" not in alert["data"]:
return 0
# Request info from Ollama
data = query_ollama_api(alert["data"]["srcip"])
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
return alert_output
def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
client = Client(
host=ollama_host,
)
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
},
],
)
return response.model_dump().get('message').get('content')
if __name__ == "__main__":
debug("# Starting")
alert_file_location = sys.argv[1]
# Ollama base domain
ollama_domain = sys.argv[3]
debug("# File location")
debug(alert_file_location)
# Load alerts for parsing JSON objects.
with open(alert_file_location) as alert_file:
alert = json.load(alert_file)
debug("# Processing alert")
debug(alert)
msg = get_ollama_info(alert)
# If a positive match is detected, send the event to the Wazuh Manager.
if msg:
send_event(msg, alert["agent"])
Создаем файл /var/ossec/integrations/custom-integration-ollama.py
с приведенным выше кодом.
Не забудьте установить соответствующие права на файл:
chmod 750 /var/ossec/integrations/custom-integration-ollama.py
chown root:wazuh /var/ossec/integrations/custom-integration-ollama.py
Теперь, когда скрипт интеграции для Ollama готов, необходимо завершить настройку интеграции с Wazuh:
Настройка ossec.conf
В файле /var/ossec/etc/ossec.conf
добавьте следующие строки:
<integration>
<name>custom-integration-ollama.py</name>
<hook_url>http://localhost:11434</hook_url>
<level>10</level>
<rule_id>100004,100005</rule_id>
<alert_format>json</alert_format>
</integration>
Интеграция будет срабатывать для правил с id 100004
и 100005
.
Настройка правила
В файле /var/ossec/etc/rules/local_rules.xml
добавьте следующие строки:
<group name="local,syslog,sshd,">
<rule id="100004" level="10">
<if_sid>5760</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
<rule id="100005" level="10">
<if_sid>5710</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
<group name="local,syslog,sshd,">
<rule id="100007" level="10">
<field name="ollama.srcip">\.+</field>
<description>[OLLAMA] IP address $(ollama.srcip) trying to connect to the network.</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
После этого, перезапустите службу Wazuh:
Если не используется docker compose
systemctl restart wazuh-manager.service
Если используется docker compose
docker compose stop
docker compose up -d --force-recreate
That’s it!
Now, when rules with IDs 100004
or 100005
are activated, the attacker’s IP address will be sent to Ollama, and subsequently forwarded to Wazuh via a Unix socket, along with more details about the attacker’s IP.
You can view the results in the Wazuh Dashboard, where events will be shown with the response from Ollama taken into account.
Смотрите также
- Применение RAG для работы с документацией Wazuh: Пошаговое руководство (Часть 2)
- Применение RAG для работы с документацией Wazuh: Пошаговое руководство (Часть 1)
- Усиление Wazuh с помощью Ollama: Повышение кибербезопасности (Часть 3)
- Усиление Wazuh с помощью Ollama: Повышение кибербезопасности (Часть 2)
- Усиление Wazuh с помощью Ollama: Повышение кибербезопасности (Часть 1)