Go Patterns: Timers and Debouncers
Patterns for dealing with delayed code execution in the Go programming language
I’ve written before about building a network of Go processes working off a central events hub. Essentially a message queue with many producers on one end and consumers on the other.
These consumers do their work asynchronously - they do not block the event producers- and most of them perform tasks that are not directly user-facing, such as periodic backups, analytics gathering or cache purging.
In this article I’ll describe a few simple asynchronous patterns I’ve been using in these data consumers and how Go’s straight-forward concurrency model makes their implementation trivial.
Basic assumptions
The code samples below assume that incoming events are being pushed into a Go channel. The different programs then listen on that channel and handle the events accordingly.
// This will depend on what kind of events you're dealing with
// Here we'll assume that an event is a map of strings
Events := make(chan map[string][string])
In my programs, events are broadcast to a ZMQ socket as JSON-encoded data structures, but the patterns laid out in this article should be applicable to any event stream, regardless of how said events make it into your program.
The event data I’ll be working with look roughly like this:
{
"account": "acme",
"type": "update",
"time": "2014-12-29 00:00:12"
}
Delayed trigger
The pattern here is to listen for events of a certain type and set off a delayed function that will execute after a set interval. All events of the same type arriving while the timer is running will be ignored.
I use this to listen for changes in a web-based template editor and trigger a git-based versioning routine, where I’m only interested in versioning changes within 30-second intervals instead of every time the user presses the “save” button.
Events come with an “account” attribute. Each account sets off its own trigger.
// A *done* channel will notify the central loop
// whenever a trigger is done.
// Each account sets off its own delayed trigger, so we'll use account names as keys.
doneChan := make(chan string)
// The interval
interval := time.Second * 30
// Hold references to running triggers
// This data structure is global,
// but only accessed from a main *select* loop, which acts as a coordinator between concurrent threads.
var triggers map[string]bool
// Main select loop
for {
select {
// A new event is received
// on the Events channel
case event := <-Events:
// There's a new incoming message.
// Get its "account" key and figure out whether
// we already have a running trigger for it.
accountKey := event["account"]
trigger := triggers[accountKey]
// If no trigger pending for this account, create one.
// Otherwise do nothing.
if trigger == false {
// Register trigger
triggers[accountKey] = true
// Schedule work function
go func(account string, doneChan chan string) {
// Sleep for the configured interval.
time.Sleep(interval)
// Do the actual work, whatever it is.
work(account)
// Let the main select loop know that we're done.
doneChan <- account
}(accountKey, doneChan)
}
// A trigger has finished running.
// Here we un-register the account so the main loop is ready to set off a new trigger
// when a new event comes in.
case account := <-doneChan:
log.Println("Trigger is done!", account)
// Un-register account trigger
delete(triggers, account)
}
}
Fairly straight-forward. When a new event comes in, I check whether a delayed trigger function has already been registered for the event’s account. If not, I register an anonymous function in a concurrent goroutine. The function sleeps for a given period of time before calling a work function, with this signature:
func work(account string) {
// Do the work here
}
I could have used a timer channel instead, but in this case a time.Sleep in a goroutine does the job.
Timed aggregator
This pattern builds on delayed triggers.
The use case here is to buffer or aggregate or count events for the same account into in-memory data structures, and send those off to storage after a set interval.
Like in the “delayed trigger” example, the first event for a given account sets off its own timer. Subsequent events are counted up (or aggregated, depending on the use case) into a temporary data structure.
I’ve been using this pattern to gather analytics for different accounts and log them into different storage and analytics services while keeping my network requests rate under control.
The key is that multiple timers should be able to run concurrently, because events for different accounts might arrive in any order and at any time.
In this example I’ll just count up ocurrences of events for each account.
This time I’ll encapsulate the data aggregation logic into its own type.
type Counter struct {
count int
account string
doneChan chan string
}
func (c *Counter) Increment () {
c.count += 1
}
func (c *Counter) Submit () {
// Send current c.count to some 3rd party service,
// log, database, etc.
doSomeWork(c.count)
// Notify the main loop when we're done.
c.doneChan <- c.account
}
Now to the main select loop. Same as before, it listens to incoming events and keeps track of registered counters.
This time, however, we’ve extracted knowledge of how to increment counters and submit data into instances of the Counter struct.
// A *done* channel will notify the central loop
// whenever a counter is done submitting to a 3rd party service.
// We're buffering events by account, so we'll use unique account names as keys.
doneChan := make(chan string)
// Currently running counters, keyed by account.
// This will be a global data structure
// but access to it will be mediated by the central *select* loop. No threading issues.
var counters map[string]*Counter
// Per counter interval
interval := time.Second * 30
for {
select {
case event := <-Events
// There's a new incoming message.
// Get its "account" key and figure out whether
// we already have a counter for it.
accountKey := event["account"]
counter := counters[accountKey]
if counter == nil { // no counter yet. Create.
// Instantiate a counter for this account,
// passing the account name and global "done" channel.
counter = &Counter{doneChan: doneChan, account: accountKey}
// First hit.
counter.Increment()
// Schedule submitting the count data after a period of time
// time.AfterFunc will run the passed function in a concurrent goroutine.
time.AfterFunc(interval, counter.Submit)
// Register the counter
counters[accountKey] = counter
} else {
// In this case a new event has arrived for a counter that is already registered and running.
// Increment it.
counter.Increment()
}
case account := <-doneChan:
// A counter is done submitting data.
// Un-register it.
log.Println("unregister:", account)
delete(counters, account)
}
}
You’ll notice that now I’ve chosen to set off delayed function execution by using the handy time.AfterFunc function. We’ll revisit this in the next example.
Debounced trigger
In this example I want to run some routine only after bursts of the same event type have subsided.
I use this pattern to purge an account’s caches whenever data is updated, while avoiding unnecesary cache purges when multiple data updates happen in quick succession (such as sudden traffic spikes or batched data uploads).
In this example, however, we’ll build on the previous example to make it send count data to 3rd party services only after a lull in event activity.
We only need to change the implementation of the main select loop, leveraging the fact that time.AfterFunc returns an instance of time.Timer.
// We'll keep track of timers returned by time.AfterFunc
var timers map[string]*time.Timer
for {
select {
case event := <-Events
// There's a new incoming message.
// Get its "account" key and figure out whether
// we already have a counter for it.
accountKey := event["account"]
timer := timers[accountKey]
if timer == nil { // no timer yet. Create.
// Instantiate a counter for this account,
// passing the account name and global "done" channel.
counter = &Counter{doneChan: doneChan, account: accountKey}
// First hit.
counter.Increment()
// Schedule submitting the count data after a period of time
// and register the returned time.Timer instance
timers[accountKey] = time.AfterFunc(interval, counter.Submit)
} else {
// In this case a new event has arrived for a counter that is already registered and running.
// Increment it.
counter.Increment()
// Here we also "debounce" the current timer by calling Reset() on it.
timers[accountKey].Reset(interval)
}
case account := <-doneChan:
// A counter is done submitting data.
// Un-register it.
log.Println("unregister:", account)
delete(counters, account)
}
}
As you can see, this pattern is very similar to the previous one (so much so that industrious programmers might be tempted to generalise them into a single configurable implementation).
The main difference is that we make sure to defer execution of already scheduled work by calling time.Timer.Reset().
timers[accountKey].Reset()
Notes on working with concurrent goroutines
These 3 examples are taken from real programs I’ve been running in production. I’ve ommited code for the sake of clarity, such as error handling; but there are a few lines of code that should help make concurrent Go code more robust.
Go routines might fail or run into fatal error conditions. The following addition to the Counter.Submit() function used in this article handles these cases gracefully and prevents rogue threads from crashing the main program.
func (c *Counter) Submit () {
// Defer a recovery function that will handle
// fatal errors and make sure the main program is left in a consistent state.
defer func() {
if err := recover(); err != nil {
log.Println(c.account, "Goroutine failed:", err)
// Make sure we unregister this counter in the main loop so we can try again.
c.doneChan <- c.account
}
}()
// Send current c.count to some 3rd party service,
// log, database, etc.
doSomeWork(c.count)
// Notify the main loop when we're done.
c.doneChan <- c.account
}
Here you can read more about this trick and caveats.
This short article is an excellent introduction into working with timers in Go.