Part 2 - Snowplow Stream Enrichment

Part 2 - Snowplow Stream Enrichment

In the previous post, we discussed how to set up the Scala Stream Collector to collect analytics events secured through HTTPS and publish them to a Kinesis Stream.

In this post we're going to jump into setting up Snowplow Stream Enrichment through Terraform.

The Stream Enrich process will allow you to take the raw events coming through your Snowplow Collector and add useful metadata and structure to them. This'll help to segment your audience when you're analyzing the data in the future!

It may be helpful to check out the Github repo associated with this series, while following along! If you're just starting with this chapter, it may also be helpful to check out the rest of the series

Directory Structure

If you've followed along with the previous post, you're already on your way to having your Snowplow Streaming Enrich setup complete. We're going to follow a lot of the same structure for this part of the process.

At the end of this post, final directory structure will look something like this:

- iam-users
- kinesis-stream
- snowplow-collector
- snowplow-enrich
    - ami.tf
    - config.hocon.tpl
    - main.tf
    - resolver.js.tpl
    - security_groups.tf
    - variables.tf
- main.tf
...

Let's jump into what each of these files does! They'll all look pretty familiar to our snowplow-collector module.

ami.tf

data "aws_ami" "ubuntu" {
  most_recent = true

  filter {
    name   = "name"
    values = ["ubuntu/images/hvm-ssd/ubuntu-xenial-16.04-amd64-server-*"]
  }

  filter {
    name   = "virtualization-type"
    values = ["hvm"]
  }

  owners = ["099720109477"] # Canonical
}

We're going to be using Ubuntu for our enrichment host. This data source will go and find the latest Ubuntu AMI for your respective region.

Nothing too special about this file - Let's move on to the next one!

Want to read more posts like this? Subscribe to the blog! Sign in with Github

security_groups.tf

resource "aws_security_group" "snowplow-enrich" {
  name        = "snowplow-enrich"
  description = "Our SSH inbound, and all outbound traffic"

  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = ["${chomp(var.machine_ip)}/32"]
  }

  egress {
    from_port = 0
    to_port = 0
    protocol = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

Notice that we're only opening up port 22 on our inbound connections list for this enrichment process. The reason for this is that we don't need any HTTP connection here like we'd need for the collector.

We also open up outbound connections so that the server can download the necessary JAR files from Snowplow's servers.

variables.tf

variable "stream_in" { type = "string" }
variable "good_stream_out" { type = "string" }
variable "bad_stream_out" { type = "string" }
variable "enrich_version" { default = "0.13.0"}

variable "machine_ip" { type = "string" }
variable "key_pair_name" { type = "string" }
variable "key_pair_loc" { type = "string" }
variable "aws_region" { type = "string" }

variable "operator_access_key" { type = "string" }
variable "operator_secret_key" { type = "string" }

This file is best explained by talking about what each of the variables represent:

  • stream_in is the Kinesis stream of raw events coming from our Snowplow Collector. This will be the same as the good output stream that we set up in our collector.

  • good_stream_out is the Kinesis stream where we'll push events that were handled by our enrichment correctly. These are the events that were in an expected format and not too large to process.

  • bad_stream_out is the Kinesis stream where we'll push events that were either in an unreadable format or were too large to process effectively. Administrators can use this to keep an eye on their incoming events and make sure that nothing unexpected is happening.

  • enrich_version allows us to select which version of Snowplow Enrich we'd like to install

  • machine_ip is the IP of your current machine. We're using this so that we can lock down SSH to be inaccessible to the outside world. It may be a little overkill, but it'll secure our user's data just a bit more.

  • key_pair_name is the name of the SSH key pair that we'd like to secure the server with.

  • key_pair_loc is the location of the key pair (PEM file) on our hard drive

  • aws_region is the region in which the Kinesis Streams and Snowplow Enrichment Process are hosted

  • operator_access_key and operator_secret_key are the credentials the enrichment process will use to communicate with the AWS Kinesis API.

Enrichment Config files

This is where a majority of the customization will take place for our enrichment process. We can use this to add custom resolvers to our enrichment process and to write to different types of streams.

We're not going to go into that depth in this text, but this will be a good foundation for further customization!

config.hocon.tpl

# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.

enrich {
  # Sources currently supported are:
  # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'kafka' for reading Thrift-serialized records from a Kafka topic
  # 'nsq' for reading Thrift-serialized records from a Nsq topic
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
  source = kinesis

  # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'kafka' for writing enriched events to one Kafka topic and invalid events to another.
  # 'nsq' for writing enriched events to one Nsq topic and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt logging.
  sink = kinesis

  # AWS credentials
  # If both are set to 'default', use the default AWS credentials provider chain.
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    accessKey = "${access-key}"
    secretKey = "${secret-key}"
  }

  streams {
    in {
      # Stream/topic where the raw events to be enriched are located
      raw = "${stream-in}"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = "${good-stream-out}"
      # Stream/topic where the event that failed enrichment will be stored
      bad = "${bad-stream-out}"

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = "user_ipaddress"
    }

    kinesis {
      # Region where the streams are located
      region = "${aws-region}"

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      initialPosition = TRIM_HORIZON

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      initialTimestamp = "{{initialTimestamp}}"

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 1000
        maxBackoff = 10000
      }
    }

    # Kafka configuration
    kafka {
      brokers = "{{kafkaBrokers}}"

      # Number of retries to perform before giving up on sending a record
      retries = 0
    }

    # config for nsq
    nsq {
      # Channel name for nsq source
      # If more than one application is reading from the same NSQ topic at the same time,
      # all of them must have the same channel name
      rawChannel = "{{nsqSourceChannelName}}"

      # Host name for nsqd
      host = "{{nsqHost}}"

      # TCP port for nsqd, 4150 by default
      port = 4150

      # Host name for lookupd
      lookupHost = "{{lookupHost}}"

      # HTTP port for nsqlookupd, 4161 by default
      lookupPort = 4161
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit = 300000
      recordLimit = 200 # Not supported by Kafka; will be ignored
      timeLimit = 5000
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
    appName = "SnowplowEnrich-${stream-in}"
  }
}

We're using this as a Terraform template file to fill in the details of the streams that we've set up. We're setting up the input stream, the output streams and the access keys that the enrichment process should use to access them.

resolver.js.tpl

{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
  "data": {
    "cacheSize": 500,
    "repositories": [
      {
        "name": "Iglu Central",
        "priority": 0,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://iglucentral.com"
          }
        }
      },
      {
        "name": "Iglu Central - GCP Mirror",
        "priority": 1,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://mirror01.iglucentral.com"
          }
        }
      }
    ]
  }
}

This'll set us up to use the official Iglu Repository for a JSON schema. This file can be modified to add your own JSON schemas for your enrichment process.

snowplow-enrich/main.tf

Here's where our module comes together and creates the server we'll use to host the Snowplow enrichment process!

# Configuration files
data "template_file" "enrich-config" {
    template = "${file("${path.module}/config.hocon.tpl")}"

    vars {
        stream-in         = "${var.stream_in}"
        good-stream-out   = "${var.good_stream_out}"
        bad-stream-out    = "${var.bad_stream_out}"
        aws-region        = "${var.aws_region}"
        access-key        = "${var.operator_access_key}"
        secret-key        = "${var.operator_secret_key}"
    }
}

data "template_file" "resolver-config" {
    template = "${file("${path.module}/resolver.js.tpl")}"
}

# EC2 Server
resource "aws_instance" "enrich" {
  ami             = "${data.aws_ami.ubuntu.id}"
  instance_type   = "t2.micro"
  key_name        = "${var.key_pair_name}"

  lifecycle {
    ignore_changes = ["ami"]
  }

  security_groups = [
    "${aws_security_group.snowplow-enrich.name}",
  ]

  tags {
    Name = "snowplow-enrich"
  }

  provisioner "remote-exec" {
    inline = [
      "sudo apt-get update",
      "sudo apt-get install -y unzip openjdk-8-jdk",
    ]

    connection {
      type          = "ssh"
      user          = "ubuntu"
      private_key   = "${file("${var.key_pair_loc}")}"
    }
  }

  provisioner "remote-exec" {
    inline = [
      "cat <<FILEXXX > /home/ubuntu/config.hocon",
      "${data.template_file.enrich-config.rendered}",
      "FILEXXX",
      "cat <<FILEXXX > /home/ubuntu/resolver.js",
      "${data.template_file.resolver-config.rendered}",
      "FILEXXX",
      "wget http://dl.bintray.com/snowplow/snowplow-generic/snowplow_stream_enrich_${var.enrich_version}.zip",
      "unzip snowplow_stream_enrich_${var.enrich_version}.zip",
      "echo \"@reboot  java -jar /home/ubuntu/snowplow-stream-enrich-${var.enrich_version}.jar --config /home/ubuntu/config.hocon --resolver file:/home/ubuntu/resolver.js &> /home/ubuntu/enrich.log\" | crontab -",
      "nohup java -jar /home/ubuntu/snowplow-stream-enrich-${var.enrich_version}.jar --config /home/ubuntu/config.hocon --resolver file:/home/ubuntu/resolver.js &> /home/ubuntu/enrich.log &",
      "sleep 1"
    ]

    connection {
      type          = "ssh"
      user          = "ubuntu"
      private_key   = "${file("${var.key_pair_loc}")}"
    }
  }
}

We first load up the template files that we created previously so that we can save them to the server. We fill in all the variables they need for the string interpolation.

We create a t2.micro Ubuntu server with our key pair to secure the SSH connection.

There's a lifecycle.ignore_changes on the AMI so that we aren't asked to update the server whenever a new Ubuntu AMI comes out (as the data source would do by default).

security_groups sets up our firewall and tags.Name is there to conveniently add a name that lets us identify the server in our list of EC2 instances.

Our first remote-exec provisioner will install Java on the server so that we can run the enrichment application.

The second remote-exec provisioner:

  1. Writes our configuration files to the hard drive

  2. Downloads the Snowplow Streaming Enrich application and unzips it

  3. Adds a line to our crontab file that starts the enrichment application when the server boots

  4. Starts the enrichment application and disconnects it from our SSH session via nohup so that we can safely close the connection.

main.tf

Everything is in place for our module! Let's instantiate the module within our root main.tf file. Along with the rest of the existing Snowplow stack.

# Build necessary IAM roles
module "snowplow-users" { source = "iam-users" }

# Set up Kinesis Streams
module "analytics-good" {
  source = "kinesis-stream"
  name = "AnalyticsCollector-Good"
}

module "analytics-bad" {
  source = "kinesis-stream"
  name = "AnalyticsCollector-Bad"
}

module "analytics-enrich-good" {
  source = "kinesis-stream"
  name = "AnalyticsEnriched-Good"
}

module "analytics-enrich-bad" {
  source = "kinesis-stream"
  name = "AnalyticsEnriched-Bad"
}

# Get local machine's IP
data "http" "my-ip" {
  url = "http://icanhazip.com"
}

module "snowplow-collector" {
  source            =  "snowplow-collector"
  aws_region          = "${var.aws_region}"
  machine_ip          = "${data.http.my-ip.body}"
  key_pair_name       = "${var.key_pair_name}"
  key_pair_loc        = "${var.key_pair_location}"
  operator_access_key = "${module.snowplow-users.operator-access-key}"
  operator_secret_key = "${module.snowplow-users.operator-secret-key}"

  good_stream_name    = "${module.analytics-good.stream-name}"
  bad_stream_name     = "${module.analytics-bad.stream-name}"
  ssl_acm_arn         = "${var.ssl_acm_arn}"
}

module "snowplow-enrich" {
  source            =  "snowplow-enrich"
  aws_region          = "${var.aws_region}"
  machine_ip          = "${data.http.my-ip.body}"
  key_pair_name       = "${var.key_pair_name}"
  key_pair_loc        = "${var.key_pair_location}"
  operator_access_key = "${module.snowplow-users.operator-access-key}"
  operator_secret_key = "${module.snowplow-users.operator-secret-key}"

  stream_in           = "${module.analytics-good.stream-name}"
  good_stream_out     = "${module.analytics-enrich-good.stream-name}"
  bad_stream_out      = "${module.analytics-enrich-bad.stream-name}"
}

The additions we've made to this file are:

  • Create analytics-enrich-good and analytics-enrich-bad kinesis streams

  • Instantiate the snowplow-enrich module we've created with the proper variables.

Moving forward

In our next post, we'll create a Snowplow Sink that will let us start crunching the data that's moving through our new analytics stack!

In future posts we'll also detail how to create some applications that can read from these Kinesis streams and use them to analyze and act on your analytics data in real time.

Want to read more posts like this? Subscribe to the blog! Sign in with Github