QRadar REST APIs with Logstash

Posted on 14 July 2022

Introduction

In this tutorial, we will learn how to build ETL pipelines using Logstash to programmatically fetch raw data from QRadar REST APIs, apply processing, and output into various formats and destinations.

Note: This tutorial assumes you have admin access to a live QRadar deployment. For the purpose of this tutorial, I am using QRadar Community Edition. Please follow my step-by-step guide - How to install IBM QRadar CE V7.3.3 on VirtualBox to get a basic QRadar deployment up and running in your lab environment.

Note: This tutorial also assumes you have some experience with Logstash. Please refer to A Practical Introduction to Logstash for a quick refresher.

Pre-requisites

  • QRadar with admin access

    I am using QRadar CE V7.3.3 as described above.

  • QRadar API Token

    On QRadar, the API Token is also known as a SEC Token and must be generated by the admin on the QRadar Console. Please refer here for a quick walkthrough.

  • Logstash

    I am using Logstash 8.1.3 on a CentOS 7 Linux VM.

    For more information about installing Logstash on your OS, please refer to Installing Logstash.

  • Elasticsearch

    I am using Elasticsearch 8.3.2 on a CentOS 7 Linux VM.

    For more information about installing Elasticsearch on your OS, please refer to Installing Elasticsearch.

  • MongoDB

    I am using MongoDB Community Edition 5.0.8 on a CentOS 7 Linux VM.

    For more information about installing MongoDB Community Edition on your OS, please refer to Install MongoDB Community Edition.

  • MongoDB output plugin for Logstash

    Install the plugin using the logstash-plugin utility:

    /usr/share/logstash/bin/logstash-plugin install --version=3.1.5 logstash-output-mongodb

    Note: I installed version 3.1.5 as I came across a known bug with the latest version. Your mileage may vary. Please review the plugin’s GitHub repo prior to installation and usage.

ETL & Logstash

According to IBM:

ETL, which stands for extract, transform and load, is a data integration process that combines data from multiple data sources into a single, consistent data store that is loaded into a data warehouse or other target system.

ETL provides the foundation for data analytics and machine learning workstreams. Through a series of business rules, ETL cleanses and organizes data in a way which addresses specific business intelligence needs, like monthly reporting, but it can also tackle more advanced analytics, which can improve back-end processes or end user experiences.

Why would we need to perform ETL operations on QRadar data?

One common use-case is to build reports and dashboards on external Business Intelligence (BI) tools and platforms. While QRadar comes with in-built reporting and dashboarding capabilities, it is often desirable to fuse and correlate data from various sources to generate further insights. In a SOC, this is typically done manually by harnessing reports generated by multiple systems (such as SIEM, SOAR, EDR, and Vulnerability Management, among many others). This can easily become a tiresome and repetitive approach to SOC reporting, especially when the same reports and dashboards must be produced and delivered on a daily, weekly, and/or monthly basis.

With a well-defined, automated approach to reporting in place, SOC teams can spend their focus on other critical activities, such as writing better detection rules, fine-tuning, and troubleshooting. This is where Logstash comes in.

According to Elastic:

Logstash is a free and open server-side data processing pipeline that ingests data from a multitude of sources, transforms it, and then sends it to your favorite “stash.”

By leveraging the capabilities of Logstash, we can easily fetch data from QRadar, dynamically transform as per our reporting requirements, and output into a variety of destinations (including files and databases).

Logstash Pipeline Configuration

According to Logstash documentation:

The Logstash event processing pipeline has three stages: inputsfiltersoutputs. Inputs generate events, filters modify them, and outputs ship them elsewhere. Inputs and outputs support codecs that enable you to encode or decode the data as it enters or exits the pipeline without having to use a separate filter.

Example #1: QRadar Rules to STDOUT

We will start with a simple goal to retrieve all the Rules deployed on QRadar and print them out to the standard output (STDOUT).

Input

Our goal in the input stage is to fetch raw JSON data from the QRadar Rules REST API endpoint. This involves making an HTTP request to the QRadar Console by supplying a valid SEC Token as a Header parameter. To achieve this, we will leverage the Logstash Http_poller input plugin.

Note: Unlike the MongoDB output plugin, the Http_poller input plugin is available by default and does not require manual installation.

Note: Use the command /usr/share/logstash/bin/logstash-plugin list to display all the installed plugins.

input {
        http_poller 
        {
            schedule => { cron => "* * * * *" }
            ssl_verification_mode => "none"
            urls => {
                qradar_rules_url => {
                    method => get
                    url => "https://192.168.56.144/api/analytics/rules"
                    headers => {
                        SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac"
                    }
                }
            }
        }
}

Let us go line-by-line in the above snippet and discuss the various configuration options.

  • schedule is specified to indicate how often Logstash polls the given URL. In the above snippet, we have used { cron => "* * * * *" } which indicates that Logstash must poll the QRadar Rules API endpoint URL once every minute.

  • ssl_verification_mode is specified to indicate if Logstash must verify the server certificates. In the above snippet, we have used "none" which indicates that Logstash must not perform verification of the QRadar Console certificate. To ensure better security, it is recommended to enable this option in production environments.

  • urls is specified to describe the URLs and their associated options. It is important to note that multiple URLs can be specified in one configuration file, if desired. Each URL specified in the configuration file requires a "name" which can be used to distinguish the outputs. In the above snippet, we have one URL configuration (qradar_rules_url) in which we specify method as get, url as "https://192.168.56.144/api/analytics/rules", and headers as { SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac" }.

Note: The complete QRadar API URL is provided on the QRadar Interactive API Documentation page corresponding to the endpoint.

Filter

Our goal in the filter stage is to limit the fields that are returned by the QRadar REST API endpoint. To achieve this, we will leverage the Prune filter plugin.

filter {
        prune {
            whitelist_names => ["^id$","^name$","^creation_date$","^enabled$"]
        }
}
  • whitelist_names is specified to indicate the fields that must be included in the output event. It is to be noted that the field names must be mentioned as an array of regular expressions. In the above snippet, we have specified the id, name, creation_date, and enabled fields to be included in the output event.

Note: Please refer to this section about the QRadar Rules API endpoint in my blog post titled QRadar REST APIs with Python to learn more about the QRadar Rules API endpoint including its returned fields, parameters, and JSON response.

Note: You can also choose to leverage the whitelist_values, blacklist_names, and blacklist_values configuration options.

Output

Our goal in the output stage is to simply print the processed event to the standard output (STDOUT). To achieve this, we will leverage the Stdout output plugin.

output {
        stdout {}
}
  • Although not specified in the above snippet, we can specify the codec configuration option to encode the output event accordingly. The default value is rubydebug.

Running the Configuration

We can combine the above snippets to create the below configuration file.

input {
        http_poller 
        {
            schedule => { cron => "* * * * *" }
            ssl_verification_mode => "none"
            urls => {
                qradar_rules_url => {
                    method => get
                    url => "https://192.168.56.144/api/analytics/rules"
                    headers => {
                        SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac"
                    }
                }
            }
        }
}
filter {
        prune {
            whitelist_names => ["^id$","^name$","^creation_date$","^enabled$"]
        }
}
output {
        stdout {}
}

As mentioned in the Specifying Pipelines section in A Practical Introduction to Logstash:

The easiest way to start Logstash is to have Logstash create a single pipeline based on a single configuration file that we specify through the -f command line parameter.

Assuming the above configuration file is saved as qradar-rules.conf, we can run it with Logstash using the command:

logstash -f /root/logstash-blog/qradar-rules.conf

Note: Please ensure that you specify the full path to the .conf file. By default, Logstash will attempt to find the .conf file in /usr/share/logstash/.

Note: If logstash is not found in the path, try using /usr/share/logstash/bin/logstash instead.

The output from Logstash is seen below. The output has been truncated considering the number of lines required to represent all the Rules.

{
               "id" => 100295,
             "name" => "Local L2R LDAP Server Scanner",
          "enabled" => true,
    "creation_date" => 1146812962422
}
{
               "id" => 100296,
             "name" => "First-Time User Access to Critical Asset",
          "enabled" => true,
    "creation_date" => 1440696183560
}
{
               "id" => 100297,
             "name" => "Malware or Virus Clean Failed",
          "enabled" => true,
    "creation_date" => 1280932510492
}
{
               "id" => 100302,
             "name" => "Excessive Failed Logins to Compliance IS",
          "enabled" => false,
    "creation_date" => 1123776255889
}
{
               "id" => 100303,
             "name" => "Auditing Services Changed on Compliance Host",
          "enabled" => false,
    "creation_date" => 1279294472002
}
.
.
.

This approach of using STDOUT as the output destination is valuable when developing and debugging Logstash configurations.

Example #2: QRadar Log Sources to MongoDB

In the previous section, we managed to make an API request to fetch QRadar Rules, whitelist required fields, and output to STDOUT.

In this section, we will take it a step further. Here, our goal is to fetch and persist all the Log Sources on QRadar as BSON documents within a MongoDB database collection.

Input

Our goal in the input stage is to fetch raw JSON data from the QRadar Log Sources REST API endpoint. Similar to the previous example, we will leverage the Logstash Http_poller input plugin to make an HTTP request to the QRadar Console by supplying a valid SEC Token as a Header parameter.

input {
        http_poller
        {
            schedule => { cron => "* * * * *" }
            ssl_verification_mode => "none"
            urls => {
                qradar_log_sources_url => {
                    method => get
                    url => "https://192.168.56.144/api/config/event_sources/log_source_management/log_sources"
                    headers => {
                        SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac"
                    }
                }
            }
        }
}

The configuration options in the above snippet are exactly the same as the previous example. The only change made is in the urls option, in which we specify url as "https://192.168.56.144/api/config/event_sources/log_source_management/log_sources".

Filter

We have multiple goals in the filter stage.

One goal is similar to the previous example - we want to limit the fields that are returned by the QRadar REST API endpoint. To achieve this, we will leverage the Prune filter plugin.

The other goal is to craft the output event with the exact fields required by MongoDB. One such field is _id.

According to MongoDB documentation:

In MongoDB, each document stored in a collection requires a unique _id field that acts as a primary key. If an inserted document omits the _id field, the MongoDB driver automatically generates an ObjectId for the _id field.

In our case, an API request to the QRadar Log Sources REST API endpoint returns multiple fields in the JSON response including a unique ID for each Log Source. We need to add a field called _id to the output event with the value of the unique Log Source ID for each Log Source. To achieve this, we will leverage the Mutate filter plugin.

Note: A complete list of returned fields are provided on the QRadar Interactive API Documentation page corresponding to the endpoint.

filter {
        mutate {
                add_field => {
                    "_id" => "%{[id]}"
                }
        }
        prune {
                whitelist_names => ["^@timestamp$","^_id$","^name$","^description$","^creation_date$","^enabled$"]
        }
}
  • add_field is specified to add a new field to the output event. In the above snippet, we have specified "_id" as the new field to be added which contains the value in the Log Source ID field "%{[id]}" from the input event.

  • whitelist_names is specified to indicate the fields that must be included in the output event. It is to be noted that the field names must be mentioned as an array of regular expressions. In the above snippet, we have specified the _id, name, description, creation_date, and enabled fields to be included in the output event.

Output

Our goal in the output stage is to persist the processed event to a specific collection within a MongoDB database. To achieve this, we will leverage the Mongodb output plugin as mentioned in the pre-requisites. We will also print the event to the standard output (STDOUT) for debugging purposes. For this, we will leverage the Stdout output plugin.

output {
        stdout {}
        mongodb {
            id => "my_mongodb_plugin_id"
            collection => "qradar_log_sources"
            database => "qradar"
            uri => "mongodb://localhost:27017"
        }
}
  • id is specified to add a unique ID to the plugin configuration. In the above snippet, we have specified id as "my_mongodb_plugin_id". This is optional, but recommended.

  • collection is specified to indicate the MongoDB collection to store the documents. In the above snippet, we have specified collection as "qradar_log_sources". If the collection does not exist, it is automatically created.

  • database is specified to indicate the MongoDB database containing the collection of documents. In the above snippet, we have specified database as "qradar". If the database does not exist, it is automatically created.

  • uri is specified to indicate the MongoDB connection string used to connect to the MongoDB server. In the above snippet, we have specified uri as "mongodb://localhost:27017".

Running the Configuration

We can combine the above snippets to create the below configuration file.

input {
        http_poller
        {
            schedule => { cron => "* * * * *" }
            ssl_verification_mode => "none"
            urls => {
                qradar_log_sources_url => {
                    method => get
                    url => "https://192.168.56.144/api/config/event_sources/log_source_management/log_sources"
                    headers => {
                        SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac"
                    }
                }
            }
        }
}
filter {
        mutate {
                add_field => {
                    "_id" => "%{[id]}"
                }
        }
        prune {
                whitelist_names => ["^@timestamp$","^_id$","^name$","^description$","^creation_date$","^enabled$"]
        }
}
output {
        stdout {}
        mongodb {
            id => "my_mongodb_plugin_id"
            collection => "qradar_log_sources"
            database => "qradar"
            uri => "mongodb://localhost:27017"
        }
}

Assuming the above configuration file is saved as qradar-log-sources.conf, we can run it with Logstash using the command:

logstash -f /root/logstash-blog/qradar-log-sources.conf

Note: Please ensure that you specify the full path to the .conf file. By default, Logstash will attempt to find the .conf file in /usr/share/logstash/.

Note: If logstash is not found in the path, try using /usr/share/logstash/bin/logstash instead.

The output from Logstash is seen below. The output has been truncated considering the number of lines required to represent all the Log Sources.

{
          "enabled" => true,
       "@timestamp" => 2022-06-05T12:49:00.191668Z,
      "description" => "WindowsAuthServer Device",
    "creation_date" => 1550780844476,
             "name" => "Experience Center: WindowsAuthServer @ EC: TIGER-PC",
              "_id" => "1462"
}
{
          "enabled" => true,
       "@timestamp" => 2022-06-05T12:49:00.191702Z,
      "description" => "WindowsAuthServer device",
    "creation_date" => 1550780906185,
             "name" => "Experience Center: WindowsAuthServer @ EC: MachineA",
              "_id" => "1512"
}
{
          "enabled" => true,
       "@timestamp" => 2022-06-05T12:49:00.191769Z,
      "description" => "AWS CloudTrail",
    "creation_date" => 1549879441512,
             "name" => "Experience Center: AWS Syslog @ 192.168.0.17",
              "_id" => "912"
}
{
          "enabled" => true,
       "@timestamp" => 2022-06-05T12:49:00.191801Z,
      "description" => "Cisco IronPort",
    "creation_date" => 1552586738421,
             "name" => "Experience Center: Cisco IronPort @ 192.168.0.15",
              "_id" => "1112"
}
.
.
.

Similarly, we can verify that the data was stored in MongoDB by connecting to the server using the MongoDB Shell (mongosh). The outputs of various queries are seen below.

> use qradar
switched to db qradar

> show collections
qradar_log_sources

> db.qradar_log_sources.countDocuments()
21

> db.qradar_log_sources.findOne()
{
    _id: '1262',
    description: 'WindowsAuthServer device',
    creation_date: Long("1540394721928"),
    enabled: true,
    name: 'Experience Center: WindowsAuthServer @ 172.16.0.4',
    '@timestamp': '"2022-06-05T14:18:01.445500Z"'
}

> db.qradar_log_sources.find({description: "WindowsAuthServer device"})
[
  {
    _id: '1262',
    description: 'WindowsAuthServer device',
    creation_date: Long("1540394721928"),
    enabled: true,
    name: 'Experience Center: WindowsAuthServer @ 172.16.0.4',
    '@timestamp': '"2022-06-05T14:18:01.445500Z"'
  },
  {
    _id: '1562',
    description: 'WindowsAuthServer device',
    creation_date: Long("1550780938011"),
    enabled: true,
    name: 'Experience Center: WindowsAuthServer @ EC: MachineB',
    '@timestamp': '"2022-06-05T14:18:01.446069Z"'
  },
  {
    _id: '1512',
    description: 'WindowsAuthServer device',
    creation_date: Long("1550780906185"),
    enabled: true,
    name: 'Experience Center: WindowsAuthServer @ EC: MachineA',
    '@timestamp': '"2022-06-05T14:18:01.446357Z"'
  }
]

As seen in the above snippets, we can now perform queries, aggregations, and other operations on our BSON documents within the MongoDB database collection. Furthermore, we can integrate MongoDB with Business Intelligence (BI) platforms to produce automated reports and dashboards.

Example #3: QRadar Offenses to Elasticsearch

In the previous section, we managed to fetch QRadar Log Sources by making an API request, add a new _id field, and persist JSON data records as BSON documents within a MongoDB database collection.

In this section, we will focus on a more complex goal. Here, our goal is to capture a subset of SSH login violations from all the Offenses generated on QRadar and ship only those Offenses to an Elasticsearch index. The desired Offenses contain the phrase “Bad Username” within their description fields.

Input

Our goal in the input stage is to fetch raw JSON data from the QRadar Offenses REST API endpoint. Similar to the previous examples, we will leverage the Logstash Http_poller input plugin to make an HTTP request to the QRadar Console by supplying a valid SEC Token as a Header parameter.

input {
        http_poller
        {
            schedule => { cron => "* * * * *" }
            ssl_verification_mode => "none"
            urls => {
                qradar_rules_url => {
                    method => get
                    url => "https://192.168.56.144/api/siem/offenses"
                    headers => {
                        SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac"
                    }
                }
            }
        }
}

The configuration options in the above snippet are exactly the same as the previous examples. The only change made is in the urls option, in which we specify url as "https://192.168.56.144/api/siem/offenses".

Filter

Similar to the previous example, we have multiple goals in the filter stage.

First of all, we want to limit the Offenses to the desired subset of SSH login violations. As mentioned above, these Offenses contain the phrase “Bad Username” within their description fields. To achieve this, we will leverage a conditional statement using the regexp (=~) comparison operator. In this manner, only those events that match the criteria are allowed through. The remaining events hit the else block. Since we are not interested in the other Offenses, we simply ignore (or drop) them. To achieve this, we will leverage the Drop filter plugin.

Next, we can define all the required transformations on the event.

One goal is to convert the start_time timestamp from the default format (milliseconds since the UNIX epoch) to a more human readable format (ISO 8601). To achieve this, we will leverage the Date filter plugin.

Since our example revolves around capturing SSH login violations, it is valuable to capture the username associated with each Offense in a separate field. To achieve this, we will leverage the Mutate filter plugin. Similarly, we will leverage the same plugin to modify the description field to include the username.

The final transformation goal is similar to the previous examples - we want to limit the fields that are returned by the QRadar REST API endpoint. To achieve this, we will leverage the Prune filter plugin.

Note: A complete list of returned fields are provided on the QRadar Interactive API Documentation page corresponding to the endpoint.

filter {
        if [description] =~ "Bad Username" {
            date {
                match => ["start_time", "UNIX_MS"]
                target => "start_time"
            }
            mutate {
                add_field => {
                    "username" => "%{[offense_source]}"
                }
                replace => {
                    "description" => "Bad Username Detected - %{offense_source}"
                }
            }
            prune {
                whitelist_names => ["^id$","^magnitude$","^start_time$","^username$","^description$","^categories$"]
            }
        }
        else {
           drop {}
        }
}
  • match and target are used in conjunction to parse a timestamp value and store it into a target field. In the above snippet, we have specified "start_time" to be parsed as "UNIX_MS" (milliseconds since the UNIX epoch). By default, the plugin will output the timestamp in ISO 8601 format. Since we mentioned the same field name ("start_time") in target, the value will simply be overwritten.

Note: According to Logstash documentation, the @timestamp field of the event is updated if target is not specified alongside match.

  • add_field is specified to add a new field to the output event. In the above snippet, we have specified "username" as the new field which contains the value of "offense_source" from the input event.

  • replace is specified to replace the value of an existing field, or add the field if it does not exist. In the above snippet, we have specified "description" with a new value of "Bad Username Detected - %{offense_source}" in which the %{offense_source} is substituted with the actual username associated with the Offense.

  • whitelist_names is specified to indicate the fields that must be included in the output event. It is to be noted that the field names must be mentioned as an array of regular expressions. In the above snippet, we have specified the id, magnitude, start_time, username, description, and categories fields to be included in the output event.

Output

Our goal in the output stage is to persist the processed event to a specific index within Elasticsearch. To achieve this, we will leverage the Elasticsearch output plugin. We will also print the event to the standard output (STDOUT) for debugging purposes. For this, we will leverage the Stdout output plugin.

Note: Unlike the MongoDB output plugin, the Elasticsearch output plugin is available by default and does not require manual installation.

Note: Use the command /usr/share/logstash/bin/logstash-plugin list to display all the installed plugins.

output {
        stdout {}
        elasticsearch {
            index => "bad-username-offenses"
            document_id => "%{[id]}"
            hosts => "https://127.0.0.1:9200"
            user => "elastic"
            password => "luKCzUWSLiL=Ah7rUanu"
            cacert => "/etc/elasticsearch/certs/http_ca.crt"
        }
}
  • index is specified to indicate the Elasticsearch index to store the documents. In the above snippet, we have specified index as "bad-username-offenses". If the index does not exist, it is automatically created.

  • document_id is specified to indicate the value to be used as document ID for documents in the Elasticsearch index. In the above snippet, we have specified that the value in the Offense ID field ("%{[id]}") must be used as the document ID.

  • hosts is specified to indicate the address of the Elasticsearch server. In the above snippet, we have specified hosts as "https://127.0.0.1:9200".

  • user is specified to indicate the username to be used for authentication to the Elasticsearch cluster. In the above snippet, we have specified user as "elastic".

Note: According to Elastic, it is not recommended to use the elastic superuser unless full access to the cluster is absolutely required. On self-managed deployments, it is advised to use the elastic user to create users that have the minimum necessary roles or privileges for their activities.

  • password is specified to indicate the password to be used for authentication to the Elasticsearch cluster. In the above snippet, we have specified password as "luKCzUWSLiL=Ah7rUanu".

  • cacert is specified to indicate the full path of the .cer or .pem file to validate the Elasticsearch server’s certificate. In the above snippet, we have specified cacert as "/etc/elasticsearch/certs/http_ca.crt".

Running the Configuration

We can combine the above snippets to create the below configuration file.

input {
        http_poller
        {
            schedule => { cron => "* * * * *" }
            ssl_verification_mode => "none"
            urls => {
                qradar_rules_url => {
                    method => get
                    url => "https://192.168.56.144/api/siem/offenses"
                    headers => {
                        SEC => "4150d602-11ba-4d55-b3de-b6ebfe8b93ac"
                    }
                }
            }
        }
}
filter {
        if [description] =~ "Bad Username" {
            date {
                match => ["start_time", "UNIX_MS"]
                target => "start_time"
            }
            mutate {
                add_field => {
                    "username" => "%{[offense_source]}"
                }
                replace => {
                    "description" => "Bad Username Detected - %{offense_source}"
                }
            }
            prune {
                whitelist_names => ["^id$","^magnitude$","^start_time$","^username$","^description$","^categories$"]
            }
        }
        else {
           drop {}
        }
}
output {
        stdout {}
        elasticsearch {
            index => "bad-username-offenses"
            document_id => "%{[id]}"
            hosts => "https://127.0.0.1:9200"
            user => "elastic"
            password => "luKCzUWSLiL=Ah7rUanu"
            cacert => "/etc/elasticsearch/certs/http_ca.crt"
        }
}

Assuming the above configuration file is saved as qradar-offenses.conf, we can run it with Logstash using the command:

logstash -f /root/logstash-blog/qradar-offenses.conf

Note: Please ensure that you specify the full path to the .conf file. By default, Logstash will attempt to find the .conf file in /usr/share/logstash/.

Note: If logstash is not found in the path, try using /usr/share/logstash/bin/logstash instead.

The output from Logstash is seen below. The output has been truncated considering the number of lines required to represent all the Offenses.

{
             "id" => 16,
    "description" => "Bad Username Detected - pepsi",
       "username" => "pepsi",
     "start_time" => 2022-07-13T18:53:47.388Z,
     "categories" => [
        [0] "SSH Login Failed"
    ],
      "magnitude" => 4
}
{
             "id" => 15,
    "description" => "Bad Username Detected - paratha1",
       "username" => "paratha1",
     "start_time" => 2022-07-13T18:53:30.326Z,
     "categories" => [
        [0] "SSH Login Failed"
    ],
      "magnitude" => 4
}
{
             "id" => 14,
    "description" => "Bad Username Detected - paratha",
       "username" => "paratha",
     "start_time" => 2022-07-13T18:52:55.233Z,
     "categories" => [
        [0] "SSH Login Failed"
    ],
      "magnitude" => 4
}
.
.
.

Similarly, we can verify that the data was stored in Elasticsearch by making an API request to the Search API using curl. The output of the API request is seen below. The output has been truncated considering the number of lines required to represent all the Offenses.

> curl --cacert /etc/elasticsearch/certs/http_ca.crt -u elastic:luKCzUWSLiL=Ah7rUanu https://localhost:9200/bad-username-offenses/_search
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 7,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "bad-username-offenses",
        "_id": "16",
        "_score": 1,
        "_source": {
          "id": 16,
          "description": "Bad Username Detected - pepsi",
          "username": "pepsi",
          "start_time": "2022-07-13T18:53:47.388Z",
          "categories": [
            "SSH Login Failed"
          ],
          "magnitude": 4
        }
      },
      {
        "_index": "bad-username-offenses",
        "_id": "15",
        "_score": 1,
        "_source": {
          "id": 15,
          "description": "Bad Username Detected - paratha1",
          "username": "paratha1",
          "start_time": "2022-07-13T18:53:30.326Z",
          "categories": [
            "SSH Login Failed"
          ],
          "magnitude": 4
        }
      },
      {
        "_index": "bad-username-offenses",
        "_id": "14",
        "_score": 1,
        "_source": {
          "id": 14,
          "description": "Bad Username Detected - paratha",
          "username": "paratha",
          "start_time": "2022-07-13T18:52:55.233Z",
          "categories": [
            "SSH Login Failed"
          ],
          "magnitude": 4
        }
      },
      .
      .
      .
    ]
  }
}

As seen in the above snippet, we have our Offenses stored in an Elasticsearch index. We can now perform queries, aggregations, and other operations on our data. Furthermore, like MongoDB, we can integrate Elasticsearch with Business Intelligence (BI) platforms to produce automated reports and dashboards. A quick way to start visualizing Elasticsearch data is with Kibana.

Conclusion

In this tutorial, we learnt how to develop ETL pipelines on Logstash to programatically fetch raw data from QRadar REST APIs, apply processing, and output into various formats and destinations. To summarize:

We started by introducing ETL (extract, transform and load) and explained how it enables SOC teams to ingest data from different sources, fuse and correlate data, and produce actionable reports and dashboards. We also introduced Logstash, an open-source data pipeline that can help us achieve our ETL goals.

Then, we began our journey to understand Logstash pipeline configurations with three examples.

In the first example, we fetched all the Rules deployed on QRadar and routed them to the standard output (STDOUT). Here, in the input stage, we leveraged the Http_poller input plugin to make the REST API request to QRadar and fetch raw JSON data. In the filter stage, we leveraged the Prune filter plugin to whitelist only the required fields, and in the output stage, we leveraged the Stdout output plugin to print the processed event to STDOUT.

In the second example, we fetched all the Log Sources onboarded on QRadar and persisted them to a MongoDB database collection. Here, in the input stage, similar to the previous example, we leveraged the Http_poller input plugin to make the REST API request to QRadar and fetch raw JSON data. In the filter stage, we leveraged the Mutate filter plugin to add a new field (_id) to the output event. We also leveraged the Prune filter plugin to whitelist only the required fields. In the output stage, we leveraged the Mongodb output plugin to store the events as BSON documents within a MongoDB database collection. We connected to the MongoDB server using mongosh and ran a few queries to confirm that the data was properly persisted.

In the third example, we fetched all the Offenses created on QRadar and persisted them to an Elasticsearch index. Here, in the input stage, similar to the previous examples, we leveraged the Http_poller input plugin to make the REST API request to QRadar and fetch raw JSON data. In the filter stage, we leveraged conditional statements to limit the Offenses to a subset of SSH login violations. Then, we leveraged the Date filter plugin to parse the start_time timestamp and convert it from Unix time to ISO 8601. We also leveraged the Mutate filter plugin to capture the username associated with each Offense in a separate field, and to modify the description field to include the username. We also leveraged the Prune filter plugin to whitelist only the required fields. In the output stage, we leveraged the Elasticsearch output plugin to store the events as documents within an Elasticsearch index. To verify that the data was properly persisted, we sent a GET request to the Elasticsearch Search API using curl to fetch all the Offenses.

Using the examples discussed in this tutorial, you can easily write new Logstash configurations and leverage the vast plethora of available plugins to perform all kinds of ETL operations. In the SOC, you can modify these examples to fetch data from your other systems (such as SIEM, SOAR, EDR, and Vulnerability Management, among many others) and integrate your destinations with Business Intelligence (BI) tools and platforms to automate SOC reporting.

I hope you enjoyed reading this tutorial. Please reach out via email if you have any questions or comments.

Beginner
QRadar
SIEM
IBM
Security
Tutorial
VM
VirtualBox
Logstash
Elasticsearch
API
Data-Analysis
ETL
ELK
Elastic
MongoDB
NoSQL