Continuing the Series: Integrating a Wazuh Cluster with Ollama — Part 4. Configuration and Implementation
In the previous section, we explored the core principles of building integrations. Now, it’s time to bring all the pieces together and finalize the integration of Wazuh with Ollama.
Complete Code for Ollama Integration
Let’s start by installing the essential dependencies required for Ollama.
/var/ossec/framework/python/bin/pip3 install ollama
Earlier, I walked you through setting up Wazuh using Docker Compose, but simply launching it won’t suffice.
For it to function properly, you’ll need to build a custom Wazuh image. That said, we won’t dive into the image rebuilding process in this post.
Now, let’s move on to coding. Feel free to pick your preferred IDE and development environment.
Start by creating the main integration file - custom-integration-ollama.py
and include the required imports.
#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client
Next, let’s define a few global variables. Whether you choose to use them or not is entirely up to you.
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)
By default, it’s a good idea to disable debug mode. Now, let’s create a function to handle debug logging.
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
integration_logs = open(log_file, "a")
integration_logs.write(str(msg))
integration_logs.close()
Additionally, let’s include an internal database check to verify the presence of an IP address.
def in_database(data, srcip):
result = data["src_ip"]
if result == 0:
return False
return True
To send events to Wazuh, we’ll leverage unix-socket
, as noted earlier.
The socket_addr
variable will store the full path to the Wazuh unix-socket, typically located at /var/ossec/queue/sockets/queue
.
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = "1:ollama:{0}".format(json.dumps(msg))
else:
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
Now, let’s break down the code a bit:
if not agent or agent["id"] == "000":
- If no agent is specified or its ID is “000”, a message string is generated without agent details.
Otherwise, a string is formed that includes the agent’s ID, name, IP address, and the actual message (event).
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
We create a socket using sock = socket(AF_UNIX, SOCK_DGRAM)
and connect to it with sock.connect(socket_addr)
.
Then, we send the data using sock.send(string.encode())
, first encoding the string into bytes. Don’t forget to close the socket with sock.close()
.
Next, let’s move on to building a function to interact with the Ollama API.
def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
client = Client(
host=ollama_host,
)
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
},
],
)
return response.model_dump().get('message').get('content')
The function takes the following arguments:
src_ip
: The IP address for which we need to retrieve information.ollama_host
: The Ollama address, defaulting to http://localhost:11434.ollama_model
: The model we’ll use, set tollama3.2
by default. Make sure the specified model is available in Ollama, or the function won’t work.
To fetch the model, simply run the following command:
ollama pull llama3.2
Line-by-line explanation of the function’s operation:
We create a client using the default host address or specify a different one if needed.
client = Client(
host=ollama_host,
)
Next, we send a prompt to the chat, including the IP address retrieved from the security event.
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {}'.format(src_ip),
},
])
After that, we return the response from Ollama using response.model_dump().get('message').get('content')
.
Next, we need to create a function that generates a new event, taking into account the response from Ollama.
def get_ollama_info(alert):
alert_output = {}
# Exit if the alert does not contain a source IP address.
if "srcip" not in alert["data"]:
return 0
# Request info from Ollama
data = query_ollama_api(alert["data"]["srcip"])
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
return alert_output
The function will be called as follows:
if __name__ == "__main__":
debug("# Starting")
alert_file_location = sys.argv[1]
# Ollama base domain
ollama_domain = sys.argv[3]
debug("# File location")
debug(alert_file_location)
# Load alerts for parsing JSON objects.
with open(alert_file_location) as alert_file:
alert = json.load(alert_file)
debug("# Processing alert")
debug(alert)
msg = get_ollama_info(alert)
# If a positive match is detected, send the event to the Wazuh Manager.
if msg:
send_event(msg, alert["agent"])
Here is the complete script:
#!/var/ossec/framework/python/bin/python3
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from ollama import Client
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# alert dictionary
alert = {}
# time now
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# log file for integrations logging
log_file = "{0}/logs/integrations.log".format(pwd)
# Wazuh socket address
socket_addr = "{0}/queue/sockets/queue".format(pwd)
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
integration_logs = open(log_file, "a")
integration_logs.write(str(msg))
integration_logs.close()
def in_database(data, srcip):
result = data["src_ip"]
if result == 0:
return False
return True
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = "1:ollama:{0}".format(json.dumps(msg))
else:
string = "1:[{0}] ({1}) {2}->ollama:{3}".format(
agent["id"],
agent["name"],
agent["ip"] if "ip" in agent else "any",
json.dumps(msg),
)
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
def get_ollama_info(alert):
alert_output = {}
# Exit if the alert does not contain a source IP address.
if "srcip" not in alert["data"]:
return 0
# Request info from Ollama
data = query_ollama_api(alert["data"]["srcip"])
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
alert_output["ollama"]["srcip"] = alert["data"]["srcip"]
alert_output["ollama"]["info"] = data
return alert_output
def query_ollama_api(src_ip, ollama_host='http://localhost:11434', ollama_model='llama3.2'):
client = Client(
host=ollama_host,
)
response = client.chat(model=ollama_model, messages=[
{
'role': 'user',
'content': 'Tell me more about IP {} in two sentences.'.format(src_ip),
},
],
)
return response.model_dump().get('message').get('content')
if __name__ == "__main__":
debug("# Starting")
alert_file_location = sys.argv[1]
# Ollama base domain
ollama_domain = sys.argv[3]
debug("# File location")
debug(alert_file_location)
# Load alerts for parsing JSON objects.
with open(alert_file_location) as alert_file:
alert = json.load(alert_file)
debug("# Processing alert")
debug(alert)
msg = get_ollama_info(alert)
# If a positive match is detected, send the event to the Wazuh Manager.
if msg:
send_event(msg, alert["agent"])
Create the file /var/ossec/integrations/custom-integration-ollama.py
with the provided code.
Ensure you set the appropriate permissions for the file:
chmod 750 /var/ossec/integrations/custom-integration-ollama.py
chown root:wazuh /var/ossec/integrations/custom-integration-ollama.py
Now that the Ollama integration script is ready, complete the setup with Wazuh:
Configuring ossec.conf
Add the following lines to the /var/ossec/etc/ossec.conf
file:
<integration>
<name>custom-integration-ollama.py</name>
<hook_url>http://localhost:11434</hook_url>
<level>10</level>
<rule_id>100004,100005</rule_id>
<alert_format>json</alert_format>
</integration>
The integration will trigger for rules with IDs 100004
and 100005
.
Configuring Rules
Add the following lines to the /var/ossec/etc/rules/local_rules.xml
file:
<group name="local,syslog,sshd,">
<rule id="100004" level="10">
<if_sid>5760</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
<rule id="100005" level="10">
<if_sid>5710</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
<group name="local,syslog,sshd,">
<rule id="100007" level="10">
<field name="ollama.srcip">\.+</field>
<description>[OLLAMA] IP address $(ollama.srcip) trying to connect to the network.</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
Afterward, restart the Wazuh service:
If not using Docker Compose:
systemctl restart wazuh-manager.service
If using Docker Compose:
docker compose stop
docker compose up -d --force-recreate
That’s it!
Now, whenever rules with IDs 100004
or 100005
are activated, the attacker’s IP address will be forwarded to Ollama and subsequently relayed to Wazuh via a Unix socket.
The results can be viewed in the Wazuh Dashboard, where events will be displayed considering the response from Ollama.
See also
- Applying RAG for Working with Wazuh Documentation: A Step-by-Step Guide (Part 2)
- Applying RAG for Wazuh Documentation: A Step-by-Step Guide (Part 1)
- Enhancing Wazuh with Ollama: Boosting Cybersecurity (Part 3)
- Enhancing Wazuh with Ollama: Boosting Cybersecurity (Part 2)
- Enhancing Wazuh with Ollama: A Cybersecurity Boost (Part 1)