Photo by Max Muselmann on Unsplash
Unleashing the Power of Wazuh: Uniting Forces with Google Bard
Step-By-Step
Imagine a security monitoring system that not only detects threats but also enriches them with AI in real-time, using advanced natural language processing capabilities. In my previous article I explored the power of integrating OpenAI's ChatGPT with Wazuh...
Today, I'm thrilled to introduce you to the next frontier of intelligent security operations: Integrating Google Bard in Wazuh.
In this new article, I'll be sharing the exciting potential of incorporating a cutting-edge natural language processing tool that offers unparalleled contextual understanding and semantic analysis. By integrating Google Bard with Wazuh, security teams can unlock a new level of threat intelligence.
Step 1
We need to create a rule that generates an alert when a non-private IP has attempted to log into our server. This allows us to distinguish malicious insiders and those attempting to gain access from outside the network.
Open the Wazuh manager local rules file /var/ossec/etc/rules/local_rules.xml
and add the below block:
<!-- User Failed Authentication from Public IPv4 -->
<group name="local,syslog,sshd,">
<rule id="100004" level="10">
<if_sid>5760</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address > $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
The <match></match>
block of the rule specifies that we want to perform a REGEX search to "detect" an IP address within the log.
Step 2
The below Python script takes the source IP that triggered our rule and sends it to the Google Bard endpoint to get IPโs information and insights.
#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2022, Wazuh Inc.
# Google Bard Integration template by @WhatDoesKmean
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
try:
import requests
from requests.auth import HTTPBasicAuth
except Exception as e:
print("No module 'requests' found. Install: pip install requests")
sys.exit(1)
# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
print(pwd)
#exit()
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)
def main(args):
debug("# Starting")
# Read args
alert_file_location = args[1]
apikey = args[2]
debug("# API Key")
debug(apikey)
debug("# File location")
debug(alert_file_location)
# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)
# Request gbard info
msg = request_gbard_info(json_alert,apikey)
# If positive match, send event to Wazuh Manager
if msg:
send_event(msg, json_alert["agent"])
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
print(msg)
f = open(log_file,"a")
f.write(str(msg))
f.close()
def collect(data):
srcip = data['srcip']
output = data['output']
return srcip, output
def in_database(data, srcip):
result = data['srcip']
if result == 0:
return False
return True
def query_api(srcip, apikey):
# Calling Google Bard API Endpoint
headers = { 'Authorization': 'Bearer ' + apikey,
'Content-Type': 'text/plain' }
json_data = { 'input': 'Can you provide information about the IP address ' + srcip + '?' }
response = requests.post('https://api.bardapi.dev/chat', headers=headers, json=json_data)
if response.status_code == 200:
ip = {"srcip": srcip}
new_json = {}
new_json = response.json()
new_json.update(ip)
json_response = new_json
#json_response = response.json()
data = json_response
return data
else:
alert_output = {}
alert_output["gbard"] = {}
alert_output["integration"] = "custom-gbard"
json_response = response.json()
debug("# Error: The gbard encountered an error")
alert_output["gbard"]["error"] = response.status_code
alert_output["gbard"]["description"] = json_response["errors"][0]["detail"]
send_event(alert_output)
exit(0)
def request_gbard_info(alert, apikey):
alert_output = {}
# If there is no source ip address present in the alert. Exit.
if not "srcip" in alert["data"]:
return(0)
# Request info using gbard API
data = query_api(alert["data"]["srcip"], apikey)
# Create alert
alert_output["gbard"] = {}
alert_output["integration"] = "custom-gbard"
alert_output["gbard"]["found"] = 0
alert_output["gbard"]["source"] = {}
alert_output["gbard"]["source"]["alert_id"] = alert["id"]
alert_output["gbard"]["source"]["rule"] = alert["rule"]["id"]
alert_output["gbard"]["source"]["description"] = alert["rule"]["description"]
alert_output["gbard"]["source"]["full_log"] = alert["full_log"]
alert_output["gbard"]["source"]["srcip"] = alert["data"]["srcip"]
srcip = alert["data"]["srcip"]
# Check if gbard has any info about the srcip
if in_database(data, srcip):
alert_output["gbard"]["found"] = 1
# Info about the IP found in gbard
if alert_output["gbard"]["found"] == 1:
srcip, output = collect(data)
# Populate JSON Output object with gbard request
alert_output["gbard"]["srcip"] = srcip
alert_output["gbard"]["output"] = output
debug(alert_output)
return(alert_output)
def send_event(msg, agent = None):
if not agent or agent["id"] == "000":
string = '1:gbard:{0}'.format(json.dumps(msg))
else:
string = '1:[{0}] ({1}) {2}->gbard:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True
# Logging the call
f = open(log_file, 'a')
f.write(str(msg) + '\n')
f.close()
if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)
# Main function
main(sys.argv)
except Exception as e:
debug(str(e))
raise
This script will be saved in the /var/ossec/integrations/
path of the Wazuh Manager as custom-gbard.py
The file execution permissions can be changed by the chmod command. Also, don't forget to use the chown command to change the file ownership as well.
In my case:chmod 750 /var/ossec/integrations/custom-gbard.py
chown root:wazuh /var/ossec/integrations/custom-gbard.py
Step 3
Now it's time to update the Wazuh manager configuration file /var/ossec/etc/ossec.conf
using the integration block below:
<!-- Google Bard Integration -->
<integration>
<name>custom-gbard.py</name>
<hook_url>https://api.bardapi.dev/chat</hook_url>
<api_key>YOUR-OWN-API-KEY</api_key>
<level>10</level>
<rule_id>100004</rule_id>
<alert_format>json</alert_format>
</integration>
This instructs the Wazuh Manager to call the Google Bard API endpoint anytime our rule id (100004), is triggered. You need to replace the <api_key>
block with your own.
Register for a FREE API key at https://www.bardapi.dev/app
Step 4
Now, we need to capture the response sent back to the Wazuh Manager so we can observe the information gathered by our Google Bard integration.
Open the Wazuh Manager local rules file at /var/ossec/etc/rules/local_rules.xml
and add the block below:
<!-- Google Bard IP capture rule -->
<group name="local,syslog,sshd,">
<rule id="100009" level="10">
<field name="gbard.srcip">\.+</field>
<description>IP address $(gbard.srcip) trying to connect to your network.</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>
Lastly, restart the Wazuh Manager.
Now, you know! ๐