Continuing the Series: Integrating a Wazuh Cluster with Ollama — Part 4. Configuration and Implementation
Related: Check out our Wazuh LLM fine-tuned model for specialized security event analysis.
In the previous section, we explored the core principles of building integrations. Now, it’s time to bring all the pieces together and finalize the integration of Wazuh with Ollama.
Prerequisites and Dependencies
Before implementing the integration, ensure you have the necessary environment ready.
Installing Required Dependencies
Let’s start by installing the essential dependencies required for Ollama.
/var/ossec/framework/python/bin/pip3 install ollama
Earlier, I walked you through setting up Wazuh using Docker Compose, but simply launching it won’t suffice.
For it to function properly, you’ll need to build a custom Wazuh image. That said, we won’t dive into the image rebuilding process in this post.
Building the Integration Script
Now, let’s move on to coding. Feel free to pick your preferred IDE and development environment.
Creating the Main Integration File
Start by creating the main integration file - custom-integration-ollama.py and include the required imports.
#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client
Next, let’s define a few global variables. Whether you choose to use them or not is entirely up to you.
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)
Implementing Helper Functions
By default, it’s a good idea to disable debug mode. Now, let’s create a function to handle debug logging.
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
integration_logs = open(log_file, "a")
integration_logs.write(str(msg))
integration_logs.close()
Additionally, let’s include an internal database check to verify the presence of an IP address.
def in_database(data, srcip):
result = data["src_ip"]
if result == 0:
return False
return True
To send events to Wazuh, we’ll leverage unix-socket, as noted earlier.
The socket_addr variable will store the full path to the Wazuh unix-socket, typically located at /var/ossec/queue/sockets/queue.
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = "1:ollama:{0}".format(json.dumps(msg))
else:
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
Now, let’s break down the code a bit:
if not agent or agent["id"] == "000": - If no agent is specified or its ID is “000”, a message string is generated without agent details.
Otherwise, a string is formed that includes the agent’s ID, name, IP address, and the actual message (event).
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
We create a socket using sock = socket(AF_UNIX, SOCK_DGRAM) and connect to it with sock.connect(socket_addr).
Then, we send the data using sock.send(string.encode()), first encoding the string into bytes. Don’t forget to close the socket with sock.close().
Next, let’s move on to building a function to interact with the Ollama API.
def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
client = Client(
host=ollama_host,
)
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
},
],
)
return response.model_dump().get('message').get('content')
The function takes the following arguments:
src_ip: The IP address for which we need to retrieve information.ollama_host: The Ollama address, defaulting to http://localhost:11434.ollama_model: The model we’ll use, set tollama3.2by default. Make sure the specified model is available in Ollama, or the function won’t work.
To fetch the model, simply run the following command:
ollama pull llama3.2
Line-by-line explanation of the function’s operation:
We create a client using the default host address or specify a different one if needed.
client = Client(
host=ollama_host,
)
Next, we send a prompt to the chat, including the IP address retrieved from the security event.
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {}'.format(src_ip),
},
])
After that, we return the response from Ollama using response.model_dump().get('message').get('content').
Next, we need to create a function that generates a new event, taking into account the response from Ollama.
def get_ollama_info(alert):
alert_output = {}
# Exit if the alert does not contain a source IP address.
if "srcip" not in alert["data"]:
return 0
# Request info from Ollama
data = query_ollama_api(alert["data"]["srcip"])
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
return alert_output
The function will be called as follows:
if __name__ == "__main__":
debug("# Starting")
alert_file_location = sys.argv[1]
# Ollama base domain
ollama_domain = sys.argv[3]
debug("# File location")
debug(alert_file_location)
# Load alerts for parsing JSON objects.
with open(alert_file_location) as alert_file:
alert = json.load(alert_file)
debug("# Processing alert")
debug(alert)
msg = get_ollama_info(alert)
# If a positive match is detected, send the event to the Wazuh Manager.
if msg:
send_event(msg, alert["agent"])
Here is the complete script:
#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
integration_logs = open(log_file, "a")
integration_logs.write(str(msg))
integration_logs.close()
def in_database(data, srcip):
result = data["src_ip"]
if result == 0:
return False
return True
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = "1:ollama:{0}".format(json.dumps(msg))
else:
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
def get_ollama_info(alert):
alert_output = {}
# Exit if the alert does not contain a source IP address.
if "srcip" not in alert["data"]:
return 0
# Request info from Ollama
data = query_ollama_api(alert["data"]["srcip"])
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
return alert_output
def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
client = Client(
host=ollama_host,
)
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
},
],
)
return response.model_dump().get('message').get('content')
if __name__ == "__main__":
debug("# Starting")
alert_file_location = sys.argv[1]
# Ollama base domain
ollama_domain = sys.argv[3]
debug("# File location")
debug(alert_file_location)
# Load alerts for parsing JSON objects.
with open(alert_file_location) as alert_file:
alert = json.load(alert_file)
debug("# Processing alert")
debug(alert)
msg = get_ollama_info(alert)
# If a positive match is detected, send the event to the Wazuh Manager.
if msg:
send_event(msg, alert["agent"])
Create the file /var/ossec/integrations/custom-integration-ollama.py with the provided code.
Ensure you set the appropriate permissions for the file:
chmod 750 /var/ossec/integrations/custom-integration-ollama.py
chown root:wazuh /var/ossec/integrations/custom-integration-ollama.py
Now that the Ollama integration script is ready, complete the setup with Wazuh:
Configuring ossec.conf
Add the following lines to the /var/ossec/etc/ossec.conf file:
<integration>
<name>custom-integration-ollama.py</name>
<hook_url>http://localhost:11434</hook_url>
<level>10</level>
<rule_id>100004,100005</rule_id>
<alert_format>json</alert_format>
</integration>
The integration will trigger for rules with IDs 100004 and 100005.
Configuring Rules
Add the following lines to the /var/ossec/etc/rules/local_rules.xml file:
<group name="local,syslog,sshd,">
<rule id="100004" level="10">
<if_sid>5760</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
<rule id="100005" level="10">
<if_sid>5710</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
<group name="local,syslog,sshd,">
<rule id="100007" level="10">
<field name="ollama.srcip">\.+</field>
<description>[OLLAMA] IP address $(ollama.srcip) trying to connect to the network.</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
Afterward, restart the Wazuh service:
If not using Docker Compose:
systemctl restart wazuh-manager.service
If using Docker Compose:
docker compose stop
docker compose up -d --force-recreate
That’s it!
Now, whenever rules with IDs 100004 or 100005 are activated, the attacker’s IP address will be forwarded to Ollama and subsequently relayed to Wazuh via a Unix socket.
The results can be viewed in the Wazuh Dashboard, where events will be displayed considering the response from Ollama.
Series Navigation:
- Part 1: Introduction to Integration
- Part 2: Deploying Wazuh
- Part 3: Creating Integration
- Part 4: Configuration & Implementation (you are here)
- Part 5: Local Ollama in Wazuh Dashboard