Knowing 'who is who' with Wazuh and WhoIsXML API

ยท

6 min read

WhoisXML API is a domain data provider that offers a range of domain-related services such as domain research, monitoring, IP geolocation, and threat intelligence.
By integrating WhoisXML API with Wazuh, you can enhance your security monitoring capabilities and protect your organization against cyber threats. With WhoisXML API, you can enrich Wazuh's security alerts with domain-related data, such as domain ownership, availability, and historical records. By combining this data with Wazuh's threat detection and response capabilities, you can quickly identify and respond to security incidents before they can cause any damage.
Whether you're in the cybersecurity, financial, or e-commerce industry, integrating WhoisXML API with Wazuh can help you improve your security posture and stay ahead of emerging threats.

This article discusses how to set up Wazuh to connect with the WhoisXML API through the use of an integrator. The following topics are covered:

  • Configuring the integrator tool for custom integration.

  • Developing a Python script that can analyze alerts and use the WhoisXML API to verify the IP address provided in the log.

Setting up the Rule

In this case, we aim to produce alerts that include WhoisXML information whenever a public IP address attempts SSH authentication against a network endpoint.
To achieve this we create rules within the /var/ossec/etc/rules/local_rules.xml file in our Wazuh Manager. These rules will trigger an alert in case of a failed SSH authentication attempt from a public IP.

<!-- User Failed Authentication from Public IPv4 -->
<group name="local,syslog,sshd,">
 <rule id="100004" level="10">
    <if_sid>5760</if_sid>
    <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
    <description>sshd: Authentication failed from a public IP address > $(srcip).</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>
</group>

When an alert is triggered by rule ID 100004, the Wazuh integration script will initiate a request to the WhoisXML API endpoint to retrieve details regarding the IP address specified in the alert.

Setting up a custom integration

To configure the integration with WhoisXML, you need to modify the ossec.conf configuration file of the Wazuh manager by adding the integration block with the following content:

  <integration>
    <name>custom-whois.py</name>
    <hook_url>https://www.whoisxmlapi.com/whoisserver/WhoisService</hook_url>
    <api_key>YOUR-OWN-API-KEY</api_key>
    <level>10</level>
    <rule_id>100004</rule_id>
    <alert_format>json</alert_format>
  </integration>

Here are the integration block parameters along with their descriptions:

  • name: The name of the custom script used for integration. All custom scripts should start with the prefix custom-

  • hook_url: Includes the API URL provided by WhoisXML

  • api_key: Includes the key of the API that enables its use

  • rule_id: Sets the rules that trigger the integration

  • alert_format: Indicates the format in which the script receives the alerts (JSON format is recommended). If this parameter is not set, the script will receive the alerts in full_log format

Adding the integration Script

We will create a file named custom-whois.py in the /var/ossec/integrations/ directory of the Wazuh server.

Here's the full Python script:

#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2022, Wazuh Inc.
# Template by WhatDoesKmean

import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM

try:
    import requests
    from requests.auth import HTTPBasicAuth
except Exception as e:
    print("No module 'requests' found. Install: pip install requests")
    sys.exit(1)

# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

print(pwd)
#exit()

json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)

def main(args):
    debug("# Starting")
    # Read args
    alert_file_location = args[1]
    apikey = args[2]
    debug("# API Key")
    debug(apikey)
    debug("# File location")
    debug(alert_file_location)

    # Load alert. Parse JSON object.
    with open(alert_file_location) as alert_file:
        json_alert = json.load(alert_file)
    debug("# Processing alert")
    debug(json_alert)

    # Request whois info
    msg = request_whois_info(json_alert,apikey)
    # If positive match, send event to Wazuh Manager
    if msg:
        send_event(msg, json_alert["agent"])

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)
    print(msg)
    f = open(log_file,"a")
    f.write(str(msg))
    f.close()

def collect(data):

  domainName = data['domainName']

  registrarName = data['registrarName']

  contactEmail = data['contactEmail']

  domainNameExt = data['domainNameExt']

  estimatedDomainAge = data['estimatedDomainAge']

  registryData = data['registryData']['strippedText']


  return domainName,registrarName,contactEmail,domainNameExt,estimatedDomainAge,registryData

def in_database(data, srcip):
  result = data['domainName']
  if result == 0:
    return False
  return True

def query_api(domainName, apikey):
  params = {"domainName": domainName, "apiKey": apikey, "outputFormat":"JSON"}
  url = "https://www.whoisxmlapi.com/whoisserver/WhoisService"
  response = requests.get(url, params=params)

  if response.status_code == 200:
      json_response = response.json()["WhoisRecord"]
      if "estimatedDomainAge" not in json_response:
        json_response["estimatedDomainAge"] = "NotFound"

      data = json_response
      return data
  else:
      alert_output = {}
      alert_output["whois"] = {}
      alert_output["integration"] = "custom-whois"
      json_response = response.json()
      debug("# Error: The whois encountered an error")
      alert_output["whois"]["error"] = response.status_code
      alert_output["whois"]["description"] = json_response["errors"][0]["detail"]
      send_event(alert_output)
      exit(0)

def request_whois_info(alert, apikey):
    alert_output = {}
    # If there is no source ip address present in the alert. Exit.
    if not "srcip" in alert["data"]:
        return(0)

    # Request info using whois API
    data = query_api(alert["data"]["srcip"], apikey)
    # Create alert
    alert_output["whois"] = {}
    alert_output["integration"] = "custom-whois"
    alert_output["whois"]["found"] = 0
    alert_output["whois"]["source"] = {}
    alert_output["whois"]["source"]["alert_id"] = alert["id"]
    alert_output["whois"]["source"]["rule"] = alert["rule"]["id"]
    alert_output["whois"]["source"]["description"] = alert["rule"]["description"]
    alert_output["whois"]["source"]["full_log"] = alert["full_log"]
    alert_output["whois"]["source"]["srcip"] = alert["data"]["srcip"]
    srcip = alert["data"]["srcip"]

    # Check if whois has any info about the srcip
    if in_database(data, srcip):
      alert_output["whois"]["found"] = 1
    # Info about the IP found in whois
    if alert_output["whois"]["found"] == 1:
        domainName,registrarName,contactEmail,domainNameExt,estimatedDomainAge,registryData = collect(data)

        # Populate JSON Output object with whois request

        alert_output["whois"]["domainName"] = domainName

        alert_output["whois"]["registrarName"] = registrarName

        alert_output["whois"]["contactEmail"] = contactEmail

        alert_output["whois"]["domainNameExt"] = domainNameExt

        alert_output["whois"]["estimatedDomainAge"] = estimatedDomainAge

        alert_output["whois"]["registryData"] = registryData

        debug(alert_output)

    return(alert_output)

def send_event(msg, agent = None):
    if not agent or agent["id"] == "000":
        string = '1:whois:{0}'.format(json.dumps(msg))
    else:
        string = '1:[{0}] ({1}) {2}->whois:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))

    debug(string)
    sock = socket(AF_UNIX, SOCK_DGRAM)
    sock.connect(socket_addr)
    sock.send(string.encode())
    sock.close()

if __name__ == "__main__":
    try:
        # Read arguments
        bad_arguments = False
        if len(sys.argv) >= 4:
            msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
            debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
        else:
            msg = '{0} Wrong arguments'.format(now)
            bad_arguments = True

        # Logging the call
        f = open(log_file, 'a')
        f.write(str(msg) + '\n')
        f.close()

        if bad_arguments:
            debug("# Exiting: Bad arguments.")
            sys.exit(1)

        # Main function
        main(sys.argv)

    except Exception as e:
        debug(str(e))
        raise

The above code retrieves the source IP from an alert and utilizes the WhoisXML API to retrieve the IP's WhoIs data.

Now we need to modify the file's ownership and group to root:wazuh and grant execution permissions:
chmod 750 /var/ossec/integrations/custom-whois.py
chown root:wazuh /var/ossec/integrations/custom-whois.py

To apply the changes, proceed with restarting the Wazuh manager:
systemctl restart wazuh-manager
or
/var/ossec/bin/wazuh-control restart

Adding the custom Rule

Now we can generate alerts for a public IP address that attempted a failed SSH authentication.

To achieve this, we need to add a custom rule to the file /var/ossec/etc/rules/local_rules.xml and restart the manager for the changes to take effect.

<group name="local,syslog,sshd,">
  <rule id="100005" level="10">
    <field name="whois.domainName">\.+</field>
    <description>IP address $(whois.domainName) trying to connect to your network.</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>
</group>

To test the integration, the logs sample logs are:
May 3 15:46:00 TEST_HOST sshd[321]: Failed password for whatdoeskmean from 116.237.130.82 port 8000 ssh2

May 3 16:58:00 TEST_HOST sshd[123]: Failed password for whatdoeskmean from 5.255.99.205 port 9997 ssh2

To test these rules, the logs can be injected into an endpoint that is registered with the Wazuh manager.

To inject logs on a Linux endpoint:
echo "May 03 15:46:00 TEST_HOST sshd[321]: Failed password for whatdoeskmean from 116.237.130.82 port 8000 ssh2" >> /var/log/secure

After the logs are injected, we can observe the outcome of the log test on the Wazuh dashboard:

WhoIsXML integration alert (triggered by rule 100004):

Now, you know! ๐Ÿ˜‰

ย