Analytics Data Integrity - It's tough

Analytics Data Integrity - It's tough

I've been very interested in the possibility of building more useful applications by analyzing how users are moving through the applications we build, how they interact with them, and how that changes as the application evolves.

Over the past few weeks I've been trying to build a system for analyzing user behavior through my streaming Snowplow Analytics stack.

It's been pretty tough.

This is something I've wanted to do for a while. But I've always been overwhelmed by the sheer complexity of managing and manipulating the large streams of data.

How can I handle the large volumes of data in an efficient way? What if I lose some data? What if I duplicate the data? What if these issues lead my coworkers to draw false conclusions about our users?

These are all very valid questions. Questions which I am probably not the most well-equipped in the world to answer.

But - We all have to start somewhere. We can only spend so much time lying around, waiting for someone else to build our dream system. No better time than the present to get our hands dirty and learn a thing. Do something interesting.

Plus, it's fun.

Here, I've written down some of the issues that have come up and how I've been solving them so far.

Helpful Resources

The analytics stack that I'm working with is running, in real-time, through a Kinesis Stream using a streaming version of Snowplow Analytics setup that I have described how to build .

I have a Github project that I've started. It uses all of these techniques. It is not currently well documented or easy-to-use. But if it proves useful, I will be making it more well-documented and well-tested in the future!

There is a great, in-depth book called Streaming Systems for those who want to learn more about processing large amounts of analytical data, at scale!

Event Re-Ordering

One thing that I've been wanting to build is a real-time user segmentation system and user funnel tracking system.

This means that the order of events matters. If we want to see a list of people who've read Part 1 - The Snowplow Collector and then read Part 2 - The Snowplow Stream Enrichment Process, We need to make sure that the events are processed in the correct order.

We want to prevent this scenario:

Event 2
Event 3
Event 1
Event 4

If the order could be mixed up, the funnels would be incorrect and we could draw the wrong conclusions for how people are flowing through the website.

At-least-once messages

The first thing to understand is that Kinesis has at least once message guarantees. This means that, if a message makes it to Kinesis, we will always receive it at least one time.

It also means that we might receive the same message more than once.

This affects us in an interesting way:

  • Data can be duplicated
  • Event order can be messed up through data duplication

This is what that might look like:

Event 1
Event 3
Event 2
Event 2
Event 1
Event 4

We'll handle the actual data duplication later in this article.

But, the thing to note here is that the Kinesis system itself cannot guarantee ordering. Snowplow also can't really do anything to fix that, from their end.

Derived Timestamp VS Collector Timestamp

Another thing to note is that the Snowplow Trackers themselves can't ensure that the data is received by the Snowplow Collector in the correct order.

This isn't really a flaw in the Snowplow Tracker so much as an inconvenience caused by the intricacies of networking on the internet.

The Snowplow Javascript Tracker, for example, will make a series of network calls to the Snowplow Collector.

These calls happen asynchronously and the network may drop or slow down any given request. If an analytics event were dropped, it would need to be retried. By the time it's retried, another analytics event, that technically happened at a later time, might be received by the collector.

Event 2 - Happened at 1:03 (Derived). Received at 1:03 (Collector)
Event 1 - Happened at 1:02 (Derived). Received at 1:04 (Collector)

The gist of this is that, due to the constraints of the physical world, the Collector Timestamp is not a good measure of the actual time an event occurred. Nor is the order of events pushed to the Kinesis stream guaranteed to correspond to the order in which they actually happened.

None of these issues will probably be solved any time soon.

Luckily, Snowplow has added another timestamp to event records called the Derived Timestamp. This uses all the information the event has and uses it to guess when the event actually occurred.

In the case of the Javascript Tracker, it can make use of a client-side timestamp that can be recorded at the time of execution.

More can be read about the topic of the Derived Timestamp, from the Snowplow team by taking a look at their blog


They often post a lot of useful information about the decisions they've made throughout building the system. Definitely a good read!


We need a compromise.With these constraints, we can make a reasonably accurate solution.go bufferedEvents = [] for 1.minute { nextEvent = getNextEvent() bufferedEvents.append(nextEvent) } sort(bufferedEvents) ... process this sorted list of events and repeat ...go Event 1 - Viewed Page Event 1 - Viewed Page (Duplicate) Event 1 - Viewed Page (Duplicate) Event 2 - Viewed Page Viewed Pages Count: 4event_fingerprint

This enrichment can be found in the Snowplow documentation.


event_fingerprintgo fingerprint := md5(evt.UserId + evt.NetworkUserId + evt.SeAction + ...)go ... receive event ... if seenBefore { ... skip ... } else { ... process ... }recent memorywithin the past hourjavascript seenFingerprints = {} for evt in events { if seenFingerprints[evtFingerprint] { continue } ... process event ... seenFingerprints[evtFingerprint] = true }most needjavascript seenFingerprints = {} for evt in events { if seenFingerprints[evtFingerprint] { continue } ... process event ... seenFingerprints[evtFingerprint] = true } saveToDisk(seenFingerprints)use Terraform for a ton of stuffRedisjavascript if !seenFingerprints[fingerprint] && !inRedis(fingerprint) { ... process event ... seenFingerprints[fingerprint] = true } ... saveToRedis(seenFingerprints).ForOneHour()willcanSupervisor

A nice little article about installing Supervisor on an Ubuntu server, written by DigitalOcean, can be found here.

Pessimistic Kinesis Checkpointing

Most libraries that help us read from a Kinesis stream have built-in tools for a process called checkpointing.

Checkpointing is essentially a way to keep track of the last record that we successfully processed so that if our server crashes, we can start from where we left off.

It often uses a backend like Redis or DynamoDB to persist the checkpoint information in case of a server crash.

More on the process of Kinesis Checkpointing can be found here.


Though, in practice, a lot of the libraries that allow us to consume Kinesis will have checkpointing built in. So we don't have to deal with it manually.

I'm going to actually be a little pessimistic and only checkpoint every 5 minutes.

... do all processing ...

every.5.minutes {
  ... save latest position in Kinesis stream ...
}

If my application crashes at any point during that 5 minutes, it will start from the beginning of the 5 minute period.

This restart would theoretically cause the application to re-process events in the case of a crash.

But, due do the duplication prevention that we programmed earlier, we should be fine in most cases!

Event Corruption

This is all well and good - But if the data is corrupted and causes consistent crashing of our application, it could create a constant start-fail-restart loop that causes our pipeline to halt.

Snowplow actually takes care of most of this for us - Both the collector and the enrichment process filter out corrupted data before it hits our application.

That being said - Our programming probably isn't perfect. Our application could crash on future updates or if our application has a bug in it.

We should make sure that we have some way to monitor the system and be alerted if it stays crashed for an extended period of time.

... if the application crashes 5 times in a row, email me ...

We could also set up a process for logging and then skipping problematic events. But that could prove confusing and problematic. Especially in the case of a large bug in the application.

I'd recommend this process when the codebase has matured and become well-tested:

err = processEvent(evt)

if err != nil {
  logEventBroken(evt)
}

... continue processing after skipping event ...

It'd be annoying to come back after a weekend to find that we lost days worth of data!

Application State Persistence

I've been doing all of these things for one main goal:

Real-time Usage Tracking

But, we've already brought up the fact that the application can crash at any given time.

We can't keep a representation of the user in memory and append to it because we might lose that data in the case of a crash. We want to save it somewhere and retrieve it when needed.

... process a bunch of events ...

persistUsageHistory(userId, usageHistory)

... Or, in the event of a crash or reload ...

usageHistory = retrieveUsageHistory(userId)

So far I've found that ElasticSearch is a good place to store our aggregation data for later retrieval.

If a user hasn't been seen, or hasn't been seen in a while, we can load the stored data for that user out from ElasticSearch extremely quickly and then manipulate the user in memory upon future events.

We can persist the changes made to the in-memory user object periodically as it changes. We can remove that user from memory if they haven't been seen in a while - to prevent overloading of the server.

This also gives us the benefit of using Kibana for data visualization. Kibana is a super powerful dashboarding software for data exploration.

This dashboard also lets us see at a glance if our data is looking alright. To see that the system is working.

Bonus: Parallel Computing

It can be tough to keep up with the large amounts of data hitting our application at fast speeds. If we're too slow, we'll fall behind and lose the benefit of being able to act on the data quickly.

I've been using the Go programming language and been using goroutines and channels everywhere it makes sense. This allows me to take full advantage of the server my application runs on. It does that without the large complexity often associated with threads.

userManager := NewUserManager()
userManager.AddEvent(evt)

... in userManager ...

func NewUserManager() UserManager {
 ...

 um.Events = make(chan SnowplowEvent)
 go um.ProcessLoop()

 return um
}

func (um *UserManager) ProcessLoop() {
  for {
    evt := <-um.Events
    ...
  }
}

func (um *UserManager) AddEvent(evt SnowplowEvent) {
  um.Events <- evt
}

It is likely that one day, I'll need to shard the data and distribute the processing amongst multiple servers. But no company I've used this with currently has enough data flowing through its analytics system, or intense amounts of real-time processing, to warrant such a complexity.

I feel as though most companies are in such a position - Having analytics data but not enough to need distributed computing. So the tutorial here should be a good start into exploring their analytics.

I may one day make a tutorial for using this in a distributed system.

But for now, this is it!

Have fun!

Go gain some insights from how your users are using your products!

You might find that some assumptions you've been making are incorrect and that these usage patterns will help you to develop better products and features.

I'll probably be writing more articles about this in the future. So, if you're interested in learning more about behavioral analytics or predictive analytics - subscribe to the blog!

Usage Metrics: Users as a Unifying Force

Usage Metrics: Users as a Unifying Force

What is Reactive Programming?

What is Reactive Programming?