Time Windowed Lists in Go

January 21st 2018
by Cory Finger

Updated on October 5th 2018

In this post we're going to talk about a data structure that I call a time windowed list.

A time windowed list is a list that:

  • Holds a list of events that have occurred recently

  • Automatically deletes expired events from the list

This is an important data structure when discussing real-time user analytics because of the fact that real-time processing is hard if there is an unbounded time-window that you're responsible for analyzing.

Feel free to follow along with the Github repository associated with this post!


Why is this useful?

Let's say that you're segmenting your users based on their average number of actions. You want to determine if they are Power Users ar Casual Users.

  • User A has been visiting the site for two years with 2 million events logged

  • User B has been visiting the site for two months with 300 events logged

If you were to analyze their entire history, it'd take a long time to analyze the events of User A. It'd take very little time to analyze the events of User B.

Depending on how this is done, your server might run out of RAM trying to load all of the events of User A or take even longer by paging through groups of them.

You might also draw the wrong conclusions. If you took the average daily events for User A, you might classify them as a Power User due to the heavy overall, if potentially stale, usage. But that usage might have equally been from a long time ago and not be representative of their current status as a Power User.

The fact is that recent events are often much more valuable than historical events.

You can get many of the benefits of analytics tracking through recent events at a much lower cost than if you tried to do the same while including all historical events.

A time windowed list allows you to set a cut-off for the time-period you care about, before worrying about your classifications or statistics. By making this trade-off, we can run queries on extremely inexpensive servers and get results that are more relevant to our modern-day than our historical data can provide.

Why Golang?

Go (or Golang) is pretty fun, concise, memory-efficient and fast. I also like the potential gained through its approachability towards multi-threaded programming.

That combination allows for us to get the most out of the small servers we'll be using for our analytics processing.

We're using small servers because I see a lot of benefit in more individuals and small companies being able to make use of these new technologies and methodologies. It can lead to the development of better products through more accurate representations of user usage.

Let's get started

We're going to break this code up into the following files:

- time_windowed_list.go
- main.go
  • time_windowed_list.go will have the implementation of our time windowed list.

  • main.go will have a short example code of how to use the list.

Want to read more posts like this? Subscribe to the blog! Sign in with Github

Imports and Types

Let's create our time_windowed_list.go file:

package main

import (
  "time"
  "sort"
  "fmt"
)

type TimeWindowEntry interface {
  TStamp()      time.Time
}

type TimeWindow struct {
  Entries       []TimeWindowEntry
}

type TimeHash int64
type ByTimeHash []TimeHash
func (a ByTimeHash) Len() int           { return len(a) }
func (a ByTimeHash) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByTimeHash) Less(i, j int) bool { return a[i] < a[j] }

type TimeWindowedList struct {
  Windows       map[TimeHash]TimeWindow
  DurationType  time.Duration
  MaxDurations  int
}

The imports are pretty standard. Let's break these types down:

TimeWindowEntry

This is anything with a TStamp() method that returns a time.Time.

For our purposes, this TStamp() method represents when the event occurred and we'll use it to know when the item should be expired and removed from the list.

TimeWindow

This is a slice of TimeEntry items that represents a window of time.

For example, a TimeWindow might represent one hour. So every entry in the TimeWindow would have occurred within the same 1-hour period.

TimeHash

This is a representation of a specific window of time.

This implementation will use Unix time (int64). But, the equivalent using strings would be if we represented all the events ocurring at 8:04, 8:22, and 8:59 with the string 8:00. The TimeHash 8:00 would be used to represent the window of time between 8:00-8:59

ByTimeHash

This is a convenience type used in order to make our own custom sorting mechanism for a list of TimeHash.

Learn more about this from the documentation.

TimeWindowedList

This is the type we'll use externally to initialize and manage our time windowed list.

  • Windows is a map that we'll use to associate each TimeHash with the relevant TimeWindow. If this is confusing right now, don't worry too much. It'll become more clear as we write methods for it.

  • DurationType is the type of duration we'll use in our time limit. If this was set to time.Hour, we'd have some maximum amount of hours for which we'd keep track of entries.

  • MaxDurations is the number of TimeWindows we should keep track of at any given time.

For example, if we wanted a time limit of 5 hours on this list:

timeWindowedList.Windows = make(map[TimeHash]TimeWindow)
timeWindowedList.DurationType = time.Hour
timeWindowedList.MaxDurations = 5

Constructor (Convenience)

func NewTimeWindowedList(durType time.Duration, maxDur int) TimeWindowedList {
  l := TimeWindowedList{}
  l.Windows = make(map[TimeHash]TimeWindow)
  l.DurationType = durType
  l.MaxDurations = maxDur

  return l
}

Let's start off by creating a constructor function to make it easier to use this new type we've made. This function simply returns an instance of the type with default properties.

Entry Expiration

func (wl *TimeWindowedList) ExpireOldEntries() {
  windowCount := len(wl.Windows)
  windowOverflow := windowCount - wl.MaxDurations

  if windowOverflow > 0 {
      windowTimes := make([]TimeHash, 0, len(wl.Windows))
      for windowTime := range wl.Windows {
        windowTimes = append(windowTimes, windowTime)
      }

      sort.Sort(ByTimeHash(windowTimes))
      for i := 0; i < windowOverflow; i++ {
        delete(wl.Windows, windowTimes[i])
      }
  }
}

Alright, so this is the hard part to get right. How are we going to expire items from the list?

Let's start by going through the algorithm:

  1. Check if we have too many Windows in our list. MaxDurations is the maximum number of TimeWindows we should ever care about. If we have more than that, we're obviously tracking too many things.

  2. If we have too many Windows, sort the windows (by TimeHash) and remove the oldest windows until we have the right amount.

Why didn't we use X algorithm…?

There are a number of ways we could handle expriation of old items.

For example, we could maintain a flat list of every item and iterate through it, filtering everything that is too old. Why did we choose this particular way to do it?

The reason is that filtering a flat list of items would be a bit too slow for our purposes. If we're adding entries at a high speed, iterating through a list of all of them can be a pretty expensive thing to do.

There might be an algorithm out there that's faster than this one. One that I don't know about yet.

But, when I find one, I'll be happy to update this blog post with the new one!

Why is this algorithm fast?

This algorithm attempts to exploit the fact that buckets don't need to be expired too often.

Lets say that your TimeWindows represent 1 hour and your entries generally occur sequentially (don't jump back and forth through time, by more than an hour, constantly).

If that's true, this expiration will happen about once every hour and will defer most of the work to the background garbage collection. We can probably handle that without crashing any servers.

There's a tradeoff here - Items can be held in memory a bit longer than need be.

But, if your Add method is constantly being fired (by analytics events), items will stick around at most an hour before they are removed. Our future methods will deal with filtering out the leftovers!

Adding Entries

func (wl *TimeWindowedList) Add(entry TimeWindowEntry) {
  entryTime := TimeHash(entry.TStamp().Truncate(wl.DurationType).Unix())

  if window, ok := wl.Windows[entryTime]; ok {
    wl.Windows[entryTime] = TimeWindow{Entries: append(window.Entries, entry)}
  } else {
    wl.Windows[entryTime] = TimeWindow{Entries: []TimeWindowEntry{entry}}
  }

  wl.ExpireOldEntries()
}

With that expiration code in place, adding entries is pretty simple!

All we need to do is:

  1. Create a TimeHash for the entry (to find the right TimeWindow)

  2. Check if the TimeWindow exists 2a. If the TimeWindow exists, append the entry to it. 2b. If the TimeWindow does not exist, create one.

  3. Expire old entries to clear up some RAM, if necessary.

Listing Entries

func (wl *TimeWindowedList) All() []TimeWindowEntry {
  wl.ExpireOldEntries()
  startTime := time.Now().Add(-time.Duration(wl.MaxDurations) * wl.DurationType).Unix()

  var all []TimeWindowEntry
  for windowTime := range wl.Windows {
    window := wl.Windows[windowTime]

    for i := range window.Entries {
      entry := window.Entries[i]

      if(entry.TStamp().Unix() > startTime) {
        all = append(all, entry)
      }
    }
  }

  return all
}

Here we define a startTime that represents "the earliest timestamp we care about".

We loop through all TimeWindows and loop through all the Entries in those time windows.

For each entry, we determine whether or not to add it to the returned list. We do this by comparing its' timestamp to the earliest timestamp we care about.

If the event happened before the startTime, we ignore it.

The reason we do that is because, as was mentioned in the Entry Expiration section, we still have some extra events in memory. We filter them out here so that we don't mis-represent the events.

Display List Contents (Convenience)

func (wl *TimeWindowedList) DisplayContents() {
  wl.ExpireOldEntries()
  startTime := time.Now().Add(-time.Duration(wl.MaxDurations) * wl.DurationType).Unix()

  fmt.Println("----")
  for windowTime := range wl.Windows {
    window := wl.Windows[windowTime]

    var allInBucket []TimeWindowEntry
    for i := range window.Entries {
      entry := window.Entries[i]

      if(entry.TStamp().Unix() > startTime) {
        allInBucket = append(allInBucket, entry)
      }
    }

    fmt.Printf("%d - %d\n", windowTime, len(allInBucket))
  }

  fmt.Printf("\nTotal in list: %d\n", len(wl.All()))
  fmt.Println("----\n")
}

This looks very similar to the All function we defined above. It displays the number of relevant items in each TimeWindow bucket and then the total number of relevant items in the entire list.

We're just creating it as a convenience method to try the list out in the next section!

Try it out

Let's define a main.go file with the following contents:

package main

import (
  "time"
  "fmt"
)

type MyEntry struct {
  HappenedAt    time.Time
}

func(e MyEntry) TStamp() time.Time {
  return e.HappenedAt
}

func main() {
  windowedList := NewTimeWindowedList(time.Second, 5)

  for range time.Tick(2 * time.Second) {
    newEntry := MyEntry{HappenedAt: time.Now()}
    windowedList.Add(newEntry)

    fmt.Printf("Current Time: %d\n", time.Now().Unix())
    windowedList.DisplayContents()
  }
}

This file defines a MyEntry type that represents an event.

  • We create a TimeWindowedList with a time limit of 5 seconds.

  • We add a new entry to it every 2 seconds

  • We print the current time and the contents of the time windowed list

Use go run *.go to run the code!

Create awesome things!

In future posts, I'll use and expand upon this data structure, along with Snowplow's Streaming Analytics to do some analysis of real-time user analytics!

Want to read more posts like this? Subscribe to the blog! Sign in with Github