James Wing By James Wing on 2017-01-16

Processing events from AWS CloudTrail is a vital security activity for many AWS users. CloudTrail reports on important security events like user logins and role assumption, "management events" from API calls that can change the security and structure of your account, and recently "data events" from more routine data access to S3.

Apache NiFi is a great platform for processing a stream of CloudTrail events, providing low-latency handling and the flexibility to both directly process events while also supporting a wide variety of downstream processing and reporting options:

  • Enrich events with account information, IP geolocation, machine learning algorithms, etc.
  • Identify events that require immediate action
  • Route events to storage and query systems like S3, ElasticSearch, Kinesis, etc.

Ways to Process CloudTrail Events

CloudTrail's basic work pattern is to batch JSON events together and write the batch as a single file to your designated S3 bucket. You can optionally receive SNS notifications of new files and forward files to CloudWatch Logs. There are many ways to design a CloudTrail event processing solution using these options:

  1. Read from S3 - Enumerate and read the event files stored in S3. This works great for processing older events, but can be a heavy process if you are not selective.
  2. SNS Events - Receive notifications from CloudTrail announcing new files, then read the event files from S3. This provides reasonably low-latency access to events as they come in.
  3. CloudWatch Logs - If you have routed your trail to CloudWatch Logs, you can use this as a reading source. This is a good way to get visibility and basic search on your events.

Apache NiFi can conveniently help you with all three of these scenarios, but we will prefer the SNS solution to show off NiFi's low-latency capabilities.

Build an Enriched Events Table in Amazon Athena

As an example, let's build an Apache NiFi flow that processes events from CloudTrail, enriches them with IP geolocation information, and stores them in S3 optimized for the Amazon Athena query service. Breaking this down a bit:

  1. We configure CloudTrail to send SNS notifications of new events to our SQS queue
  2. Our NiFi flow will read from the SQS queue
  3. NiFi will load the referenced event data from S3
  4. We will do some basic geo-enrichment of IP addresses using NiFi's GeoEnrichIP processor
  5. The enriched event data will be stored in an S3 bucket accessible to Athena
  6. We will create an Athena table for queries against the enriched data

The diagram below illustrates our solution concept:

Concept illustration of Apache NiFi solution processing CloudTrail events to Athena

Outlining the NiFi Flow

I organized the Apache NiFi flow into Process Groups, which help divide up the logic of the flow and make it easier to understand. The resulting flow looks like this:

NiFi CloudTrail flow overview

The sections below look into selected details of each section, but the high level design matches closely to the concepts in the previous diagram:

  • Receive CloudTrail notifications via SQS, and parse out references to S3 files containing events
  • Retrieve event files from S3, uncompress, and split individual events out of the container array
  • Enrich individual events, using IP geolocation as a sample
  • Write the enriched events back to S3, optimizing location and format for later Athena queries

Receiving CloudTrail Events via SNS, SQS, and S3

Apache NiFi can easily receive SQS messages using the GetSQS processor. The notifications sent by CloudTrail simply contain a reference to the S3 bucket and object key of new events posted to your bucket.

{
  "Type" : "Notification",
  "MessageId" : "0fb3eec2-4de3-5ab3-8666-be3a2b7b2801",
  "TopicArn" : "arn:aws:sns:us-east-1:123456789012:cloudtrail-general-file-notifications",
  "Message" : "{\"s3Bucket\":\"my-cloudtrail-log-bucket\",\"s3ObjectKey\":[\"CloudTrail-General/AWSLogs/123456789012/CloudTrail/us-east-1/2017/01/17/123456789012_CloudTrail_us-east-1_20170117T1835Z_5iMqahTJXI03NfgL.json.gz\"]}",
  "Timestamp" : "2017-01-17T18:37:12.746Z",
  "SignatureVersion" : "1",
  "Signature" : "...",
  "SigningCertURL" : "...",
  "UnsubscribeURL" : "..."
}

NiFi can extract the JSON using the EvaluateJsonPath to extract the message portion of the notification as $.Message. Extracting SNS notification message with EvaluateJsonPath

This leaves us with a much more concise JSON referencing the new event files on S3.

{
    "s3Bucket":"my-cloudtrail-log-bucket",
    "s3ObjectKey":[
        "CloudTrail-General/AWSLogs/167566334345/CloudTrail/us-east-1/2017/01/17/167566334345_CloudTrail_us-east-1_20170117T1835Z_5iMqahTJXI03NfgL.json.gz"
    ]
}

Notice that s3ObjectKey is an array of keys, although there is only one shown here. Amazon documentation for CloudTrail notifications states that this could contain many files. Properly handling this array is easy in NiFi using a two-step process. First, we'll extract the S3 bucket using another EvaluateJsonPath processor. Then we'll use SplitJson to break out the individual S3 object keys, splitting on $.s3ObjectKey. The results of the split will be copies of the pre-split file, but with content equal to the value of the strings in the array. We transfer the S3 keys from the content to the filename attribute using ExtractText with a regular expression of (.*).

Receiving SQS messages and extracting message fields

Loading Events from S3

CloudTrail stores events in your S3 bucket in files containing a single JSON object, which itself contains a single attribute, Records, which is an array of CloudTrail event objects. We will build a NiFi process group that fetches these files from S3, un-gzips them, and splits the JSON records array, yielding a stream of individual CloudTrail JSON event records. With the exception of the the FetchS3Object processor, this continues the NiFi JSON work shown above using EvaluateJsonPath and SplitJson processors.

Loading CloudTrail files from S3 and splitting events

GeoIP Enrichment

Apache NiFi includes a built-in processor for geo-ip enrichment, GeoEnrichIP. We are going to use GeoEnrichIP with MaxMind's free GeoLite2 database, which will provide city and country information. Speaking of which...

This product includes GeoLite2 data created by MaxMind, available from http://www.maxmind.com.

To make this work in NiFi, we need a multiple-step flow that performs a few tasks:

  1. Extracts the IP address from the CloudTrail event record
  2. Gets the geographical information from GeoEnrichIP
  3. Update the event to add geographical results

GeoEnrichIP uses an input attribute, cloudtrail.sourceIPAddress in this case, and provides a set of output attributes based on that name:

Attribute Sample Data
cloudtrail.sourceIPAddress.geo.latitude 47.6102
cloudtrail.sourceIPAddress.geo.longitude -122.3043
cloudtrail.sourceIPAddress.geo.city Seattle
cloudtrail.sourceIPAddress.geo.subdivision.isocode.0 WA
cloudtrail.sourceIPAddress.geo.country United States
cloudtrail.sourceIPAddress.geo.country.isocode US

An ExecuteScript processor running ECMAScript/Javascript will allow us to easily add this information to each event record that receives an IP geolocation match:

var flowFile = session.get();
if (flowFile !== null) {

    var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
    var IOUtils = Java.type("org.apache.commons.io.IOUtils");
    var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");

    flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) {

        var inputJSON = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        var event = JSON.parse(inputJSON);

        var geoIpAttribute = "cloudtrail.sourceIPAddress";
        event.geo = {
            "latitude": flowFile.getAttribute(geoIpAttribute + ".geo.latitude"),
            "longitude": flowFile.getAttribute(geoIpAttribute + ".geo.longitude"),
            "city": flowFile.getAttribute(geoIpAttribute + ".geo.city"),
            "subdivision_isocode": flowFile.getAttribute(geoIpAttribute + ".geo.subdivision.isocode.0"),
            "country": flowFile.getAttribute(geoIpAttribute + ".geo.country"),
            "country_isocode": flowFile.getAttribute(geoIpAttribute + ".geo.country.isocode")
        };

        outputStream.write(JSON.stringify(event).getBytes(StandardCharsets.UTF_8));
    }));

    session.transfer(flowFile, REL_SUCCESS);
}

After ExecuteScript runs, our event records each contain an additional nested geo object like this:

"geo" : {
    "latitude" : "47.6102",
    "longitude" : "-122.3043",
    "city" : "Seattle",
    "subdivision_isocode" : "WA",
    "country" : "United States",
    "country_isocode" : "US"
}

Handling Events without IP Addresses

An important point in process group is not only doing the geo-enrichment when we have IP addresses, but smoothly handling events without IP addresses. Amazon publishes many events where internal services are acting that have something like "sourceIPAddress" : "AWS Internal". Our flow is doing a couple of things for this:

  • Regex Matching for IP - Our RouteOnAttribute processor is only sending records for geo-enrichment that match a simple IPv4 regex.
  • GeoEnrichIP Unmatched - The unmatched relationship for our GeoEnrichIP processor is routed out of the process group so we are not trying to add a geo: {...} section to them.

GeoEnrichIP flow and error handling

Saving Bundles of Events to S3 for Athena

Apache NiFi has a PutS3Object processor for writing data to S3. But what exactly do we want to write? Amazon Athena can read JSON records natively, so we will mostly just leave the CloudTrail events in their original JSON format. However, we want to get the events packaged into compressed bundles and directory structures that Athena reads. CloudTrail events can add up fast, bundling and compressing will greatly enhance both the storage efficiency in S3, and the query efficiency through Athena. On the other hand, this is security event data, and we want events to pass through the flow quickly rather than pile up into very large sets. NiFi's MergeContent processor is our hero here, because it has setting for balancing exactly these criteria.

MergeContent processor properties

For this example, we'll set MergeContent to bundle 10,000 flowfiles, but wait no more than 5 minutes before allowing a bundle to proceed. At least once every 5 minutes is the key here. In most current processing based on CloudTrail's SNS notifications, we will probably bundle less than 10,000 events in a 5-minute period, especially outside of business hours, but this allows us to bundle up larger quantities if we are processing a backlog or enumerating historial S3 event files.

Building the Athena Table

Now that we have our events in S3, we need to create a table in Athena to enable SQL queries against the events.

First, create a database in Athena to contain our table:

CREATE DATABASE IF NOT EXISTS cloudtrail;

Then create a table for CloudTrail events. The following schema contains many common event fields, but you may want to review the CloudTrail event content reference for the full reference, especially for service- and API-specific fields. Our custom geo element has been added at the end.

CREATE EXTERNAL TABLE cloudtrail.event (
    eventID string,
    eventType string,
    eventTime string,
    eventSource string,
    eventName string,
    awsRegion string,
    sourceIPAddress string,
    userAgent string,
    userIdentity STRUCT<
        type: string,
        arn: string,
        principalId: string,
        accountId: string,
        userName: string,
        accessKeyId: string,
        invokedBy: string,
        identityProvider: string,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: string,
                creationDate: string
            >,
            sessionIssuer: STRUCT<
                type: string,
                principalId: string,
                arn: string,
                accountId: string,
                userName: string
            >
        >
    >,
    requestID string,
    errorCode string,
    errorMessage string,
    readOnly boolean,
    recipientAccountID string,
    sharedEventID string,
    vpcEndpointId string,
    eventVersion string,
    apiVersion string,
    geo STRUCT<
        latitude: string,
        longitude: string,
        city: string,
        subdivision_isocode: string,
        country: string,
        country_isocode: string
    >
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://my-cloudtrail-log-bucket/athena-security/events/';

Now we can run Athena queries against the records. Because Athena reads from S3 for each query, we dynamically pick up new events as our NiFi flow writes them to S3.

SELECT
  date_format(from_iso8601_timestamp(eventTime), '%Y-%m-%d') as "period",
  COUNT(*) as "events"
FROM
  cloudtrail.event
GROUP BY 1
ORDER BY 1 DESC;

Amazon Athena UI showing query results

And use our geolocation information to validate login activity:

SELECT
  eventType,
  date_format(from_iso8601_timestamp(eventTime) AT TIME ZONE 'America/Los_Angeles', '%Y-%m-%d %H:%i %a') as eventTime,
  userIdentity.userName,
  eventName,
  awsRegion,
  geo.city,
  geo.country
FROM
  cloudtrail.event
WHERE
  eventType = 'AwsConsoleSignIn'
ORDER BY
  eventTime DESC
LIMIT 10;

Amazon Athena UI showing query results

Flow Template

This sample flow is available as a NiFi Template, CloudTrail_Event_Processing_Starter_Kit.xml.