Logstash

In Applied NSM we wrote quite a bit about both LogStash and Snorby. Recently, a reader of this blog posed the question if there is a way to pivot from Snorby events to your Bro logs in Logstash. Well, it is actually quite easy.

To start, you'll obviously need to have a functional instance of Snorby and Logstash with Bro logs (or any other relevant parsed PSTR-type data) feeding in to it. In this case, we'll assume that to reach our logstash dashboard, we go to http://192.168.1.12:9292/index.html#/dashboard/file/logstash.json.

If you've played around with Snorby, you've probably noticed the lookup sources when you click on an IP in an event. There are default sources, but you can also add additional sources. Those are configured from the Administration tab at the top right.

Lookup Sources

Figure 1: Lookup Sources Option in Snorby

At this time you're only allowed to use two different variables, ${ip} and ${port}, and from my testing, you can only use them once in a given lookup URL. Normally this isn't an issue if, for instance, you are doing research on an IP from your favorite intel source and you're feeding the IP in as a variable on the URL. However, if for some reason you're needing to feed it in twice, referencing ${ip} will only fill the initial variable, and leave the second blank. This becomes an issue with parsed Bro logs in Logstash.

Though not immediately obvious, Logstash allows for you to control the search from the URL as such:

http://192.168.1.12:9292/index.html#/dashboard/file/logstash.json?query=id.orig_h:192.168.1.75

Test it out!

In Snorby, the lookup source for this would be:

http://192.168.1.12:9292/index.html#/dashboard/file/logstash.json?query=id.orig_h:${ip}

However, lets assume you wanted to find logs where 192.168.1.75 existed as either the source or destination address:

http://192.168.1.12:9292/index.html#/dashboard/file/logstash.json?query=id.orig_h:192.168.1.75%20OR%20id.resp_h:192.168.1.75

That search is perfectly valid for use in Logstash, and the URL functions as expected. However, if you make the lookup source in Snorby to match that by pairing ${ip}, you'll notice that only the initial use of ${ip} is completed, and the use on id.resp.h is left blank. For the reason, I would recommend applying the simpler method of just using the message field (the unanalyzed raw log essentially) in the query. We'll also add in ${port} to narrow down more.

http://192.168.1.12:9292/index.html#/dashboard/file/logstash.json?query=message:(${ip} AND ${port})

That lookup source will look for any instance of the ip and matching port within the log. Word of warning that there is a small chance you'll get an unexpected blip somewhere with this method, as it is literally looking for that IP address and that port number as any unique string within the message, in any order. Hypothetically, you could have an odd log that has the selected IP, but the port actually is the response_body_len or some other integer field, though that would be extremely unlikely.

You'll notice that the lookup source is defaulting to the past 24 hours, and is displaying the entire log by default. If we want to change this, we're going to have to utilize a slightly different method by using a "scripted dashboard". The two Logstash searches below search for traffic that contains "91.189.92.152" and "80". However, there are a few differences.

http://192.168.1.12:9292/index.html#/dashboard/file/logstash.json?query=message:(91.189.95.152%20AND%2080)

http://192.168.1.12:9292/index.html#/dashboard/script/logstash.js?query=message:(91.189.95.152%20AND%2080)&fields=@timestamp,id.orig_h,id.orig_p,id.resp_h,id.resp_p,_type,host,uri

In testing both of these, the difference is immediate. You have the ability to custom output fields, which is essential. You'll also notice that on the second URL we're looking at /dashboard/script/logstash.js instead of /dashboard/file/logstash.json. "Scripted Dashboards" are entirely javascript, thus allowing for full control over the output. While we're adding custom fields, lets go ahead and also say that we want to look at the past 7 days (&from=7d) along with referencing the 7 days based on timestamp collected instead of ingestion time (&timefield=@timestamp).

http://192.168.1.12:9292/index.html#/dashboard/script/logstash.js?query=message:(91.189.92.152%20AND%2080)&from=7d&timefield=@timestamp&fields=@timestamp,id.orig_h,id.orig_p,id.resp_h,id.resp_p,_type,host,uri

Like we did before, lets go ahead and add that as a lookup source with the following URL in snorby:

http://192.168.1.12:9292/index.html#/dashboard/script/logstash.js?query=message:(${ip}%20AND%20${port})&from=7d&timefield=@timestamp&fields=@timestamp,id.orig_h,id.orig_p,id.resp_h,id.resp_p,_type,host,uri

 

Examine the URL for example syntax

Figure 2: Note the Special URL Syntax in this LogStash Example

To summarize, here are some of the possible lookup sources I've mentioned, with the advanced lookup being my recommendation for these purposes:

3 Possible Lookup Sources for Pivoting to Logstash

Figure 3: Three Possible Lookup Sources for Pivoting to Logstash

Bro is one of the best things to happen to network security monitoring in a long time. However, the ability to parse and view Bro logs in most organizations isn't always too ideal. One option is to peruse Bro logs via something like Splunk; but with high throughput, you'll be paying a pretty penny since Splunk is priced based upon the amount of data ingested. Another popular (and free) solution is Elsa. However, while Elsa is extremely fast at data ingestion and searches, it currently has limitations on the number of fields that can be parsed due to its use of Sphinx. On top of that, Elsa requires searches with very specific terminology, and doesn't easily do wildcard searches without additional transforms. This is where Logstash comes in. Logstash is an excellent tool for managing any type of event or logs, and can easily parse just about anything you can throw at it. I say "easily" because once you're over the learning curve of first generating the Logstash configuration, creating addition configurations comes much more easily. In this guide I will talk about how you can use Logstash to parse logs from Bro 2.2. The examples shown here will only demonstrate parsing methods for the http.log and ssl.log files, but the download links at the end of the post will provide files for parsing all of Bro's log types.

If you want to follow along, then know that this guide assumes a few things. First, we'll be parsing "out-of-the-box" Bro 2.2 logs, which means you'll need an "out-of-the-box" Bro 2.2 installation. If you don't already have a Bro system then the easiest route to get up and running would normally be to use Security Onion, but as of this writing, Security Onion currently uses Bro 2.1 (although I'm sure this will change soon). In the meantime, reference Bro.org's documentation on installation and setup. Next, you'll need to download the latest version of Logstash, which I tested at version 1.2.2 for this article. We tested these steps using Logstash and Bro on a single Ubuntu 12.04 system.

TLDR: You can download a complete Logstash configuration file for all Bro 2.2 log files and fields here.

Creating a Logstash Configuration

Let's get started by creating a master configuration file. Logstash relies on this file to decide how logs should be handled. For our purposes, we will create a file called bro-parse.conf, which should be placed in the same directory as the Logstash JAR file. It is made up of three main sections:input, filter, and output. Below is the basic outline for a Logstash configuration file:

input {
  ...
}

filter {
  ...
}

output {
  ...
}

Input

The input section of the Logstash configuration determines what logs should be ingested, and the ingestion method. There are numerous plug-ins that can be used to ingest logs, such as TCP socket, terminal stdout, a twitter API feed, and more. We are going to use the "file" plug-in to ingest Bro logs. This plug-in constantly reads in a log file line-by-line in near real time. By default, it will read the file for new lines every 15 seconds, but this is configurable.

With the ingestion method identified, we need to provide the path to the log files we want to parse in the "path" field as well as a unique name for them in the "type" field. With the following configuration, the input section of the Logstash configuration is complete and will ingest "/opt/bro2/logs/current/http.log" and "/opt/bro2/logs/current/ssl.log", and will give them "type" names appropriately.

input {
  file {
    type => "BRO_httplog"
    path => "/opt/bro2/logs/current/http.log"
  }  
  file {
    type => "BRO_SSLlog"
    path => "/opt/bro2/logs/current/ssl.log"
  }
}

 

Filter

The filter section is where you'll need to get creative. This section of the Logstash configuration takes the log data from the input section and decides how that data is parsed. This allows the user to specify what log lines to keep, which to discard, and how to identify the individual fields in each log file.  We will use conditionals as the framework for creating these filters, which are essentially just if-then-else statements.

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

For this filter, we're going to use a nested conditional statement. First, we want to discard the first few lines of the Bro log files since this is just header information that we don't need. These lines begin with the "#" sign, so we can configure our conditional to discard any log line beginning with "#" using the "drop" option.  That part is trivial, but then it gets tricky. This is because we have to instruct Logstash on how to recognize each field in the log file. This can involved a bit of legwork since you will need to actually analyze the log format and determine what the fields will be called, and what common delimiters are used. Luckily, I've done a lot of that legwork for you. Continuing with our example we can begin by looking at the Bro 2.2 http.log and ssl.log files, which contain 27 and 19 fields to parse respectively, delimited by tabs:

 brohttpfieldsFigure 1: Bro 2.2 http.log

 brosslfields

Figure 2: Bro 2.2 ssl.log

The manner by which these fields are parsed can affect the performance as the amount of data you are collecting scales upward, but depending on your hardware, that is usually an extreme case. For the sake of guaranteeing that all fields are parsed correctly, I used non-greedy regular expressions. Logstash allows for "Grok" regular expressions, but I've found that there are bugs when using specific or repetitive Grok patterns. Instead, I've taken the regex translation for the Grok patterns and used Oniguruma syntax instead. In testing, these have shown to be much more reliable, creating no "random" errors. The resulting filter looks like this:

filter {

if [message] =~ /^#/ {
  drop {  }
} else {  

# BRO_httplog ######################
  if [type] == "BRO_httplog" {
      grok { 
        match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<trans_depth>(.*?))\t(?<method>(.*?))\t(?<host>(.*?))\t(?<uri>(.*?))\t(?<referrer>(.*?))\t(?<user_agent>(.*?))\t(?<request_body_len>(.*?))\t(?<response_body_len>(.*?))\t(?<status_code>(.*?))\t(?<status_msg>(.*?))\t(?<info_code>(.*?))\t(?<info_msg>(.*?))\t(?<filename>(.*?))\t(?<tags>(.*?))\t(?<username>(.*?))\t(?<password>(.*?))\t(?<proxied>(.*?))\t(?<orig_fuids>(.*?))\t(?<orig_mime_types>(.*?))\t(?<resp_fuids>(.*?))\t(?<resp_mime_types>(.*))" ]
      }
  }
# BRO_SSLlog ######################
  if [type] == "BRO_SSLlog" {
    grok { 
      match => [ "message", "(?<ts>(.*?))\t(?<uid>(.*?))\t(?<id.orig_h>(.*?))\t(?<id.orig_p>(.*?))\t(?<id.resp_h>(.*?))\t(?<id.resp_p>(.*?))\t(?<version>(.*?))\t(?<cipher>(.*?))\t(?<server_name>(.*?))\t(?<session_id>(.*?))\t(?<subject>(.*?))\t(?<issuer_subject>(.*?))\t(?<not_valid_before>(.*?))\t(?<not_valid_after>(.*?))\t(?<last_alert>(.*?))\t(?<client_subject>(.*?))\t(?<client_issuer_subject>(.*?))\t(?<cert_hash>(.*?))\t(?<validation_status>(.*))" ]
    }
  }
 }
}

As you can see in the filter, I've taken each field (starting with the timestamp, ts), and generated an expression that matches it. For the sake of making sure that all fields are captured correctly, I've used the general non-greedy regex ".*?". After each delimiter, I have a "\t",representing the tab delimiter that exists between each field. This can be optimized by making more specific field declarations with more precise regular expressions. For instance, an epoch timestamp will never contain letters, so why should you use a wildcard that contains them? Once you have the filter complete, you can move on to the easy part, the output.

Output

The output section of the Logstash configuration determines where ingested events are supposed to go. There are many output options in Logstash, but we are going to be sending them to Elasticsearch. Elasticsearch is the powerful search and analytics platform behind Logstash. To specify the output, we'll just add the following at the end of the Logstash configuration:

output {
elasticsearch { embedded => true }
}

That concludes how to build a Logstash configuration that will ingest your Bro logs, exclude the lines we don't want, parse the individual data fields correctly, and output them to elasticsearch for Logstash. The only thing left to do is get them on the screen. To do that we'll launch Logstash by entering the following the command in a terminal, specifying the Logstash JAR file and the configuration file we just created:

java -jar logstash-1.2.2-flatjar.jar agent -f bro-parse.conf -- web

That might take a few seconds. To verify that it everything is running correctly, you should open another terminal and run:

netstat -l | grep 9292

Once you can see that port 9292 is listening, that means that Logstash should be ready to rock.

netstat9292

Figure 3: Verifying Logstash is Running

Now you should be able to open a web browser and go to http://127.0.0.1:9292. Once there you'll probably only see the Kibana dashboard, but from there you can open the pre-built Logstash dashboard and see your Bro logs populating!

Screenshot from 2013-11-15 14:13:48

Figure 4: Bro Logs in Logstash

Logstash uses the Kibana GUI for browsing logs. The combination of Elasticsearch, Logstash, and Kibana in one package make for the easiest Bro logging solution you can find. The most basic function that we now have is the search. Searches allow for the use of wildcards or entire search terms. For instance searching for "oogle.com" will probably give you 0 results. However, searching for "*oogle.com" is likely to give you exactly what you expect; any visits to Google hosted domains. Search will also find full search terms (single terms or uniquely grouped terms between specific delimiters) without the need of a wildcard. For instance, if you want to search specifically for "plus.google.com", that is likely to return results as you would expect.

To specify the logs you'd like to view by timestamp, there is a "timepicker" at the top right.

timepicker

Figure 5: Logstash Timepicker

You can take advantage of the parsing of individual fields by generating statistics for the unique values associated with each field. This can be done by simply viewing a Bro log and clicking a field name in the left column of the screen. You can also see more complete visualizations from that window by clicking "terms". For example, the pie chart below is one that I generated that indicates how many records exist in each of the Bro logs I'm parsing.

Screenshot from 2013-11-15 14:11:05

Figure 6: Examining Bro Log Sums

As another example, lets filter down to just SSL logs. Under the "Fields" panel, click "type" to reveal the variations of log types. Then, click the magnifying glass on "Bro_SSLlog". Now you have only Bro SSL logs, as well as a new field list representing only fields seen in the SSL events that are currently present. If we only want to see certain fields displayed, you can click their check boxes in the order they're displayed. If you want those rearranged suddenly, just move them with the left and right arrows in the event columns on the event display. Below is an example of sorting those SSL logs by timestamp, where the logs displayed are ts, server_name, uid, issuer_subject, and subject.

Screenshot from 2013-11-15 14:48:57

Figure 7: Sorting Bro SSL Logs

To remove the Bro_SSLlog filter, you can open up the "filtering" panel at the top of the page and  remove that additional filter. Doing so will revert back to all data types, but with the fields still selected.

This guide only scratches the surface of the types of analysis you can do with Logstash. When you combine a powerful network logging tool like Bro and a powerful log analysis engine like Logstash, the possibilities are endless. I suggest you play around with customizing the front end and perusing the logs. If you somehow mess up badly enough or need to "reset" your data, you can stop Logstash in the terminal, and remove the data/ directory that was created in same location as the logstash JAR file. I've created a config file that you can use to parse all of the Bro 2.2 log files. You can download that file here.

UPDATE - December 18, 2013

As per G Porter's request, I've generated a new Logstash Bro configuration that is tailored to work with the most recent Security Onion update. That update marked the deployment of Bro 2.2 to Security Onion, and if you compare it to an "out-of-the-box" Bro 2.2 deployment, there are a few additions that I've accounted for.

You can download the Security Onion specific Logstash Bro 2.2 configuration here.