Featured Blog | This community-written post highlights the best of what the game industry has to offer. Read more like it on the Game Developer Blogs or learn how to Submit Your Own Blog Post
Analytics Event Collection and Storage with Apache Flume NG
A look at how Plumbee collects and stores analytics events from its gaming platform using Apache Flume NG.
At Plumbee we take a data driven approach to analysing our business and hold strongly to the belief that you can never have enough data. As a result a significant proportion of our workforce are dedicated to the analysis of events from sources as diverse as Customer Relationship Management (CRM), advertising, database writes and Remote Procedure Calls (RPCs). On any given day we collect approximately 450 million events adding 166GB of compressed data to our existing warehouse of 87TB, all of which is stored in Amazon’s cloud based storage solution S3 (Simple Storage Service).
In a previous blog post we discussed our approach to logging analytics events represented in JSON into Amazon’s Simple Queue Service (SQS), a highly scalable, reliable and fast cloud based message queue. In this post we cover the consumer side of that story focusing on how we process and store those events in S3 using Apache Flume NG.
What is Flume NG?
Apache Flume NG (Next Generation) is an open source project written in Java which uses the architectural paradigm of streaming data flows for moving events between different systems. Flume's basic architectural model revolves around three core components; sources, channels and sinks. Events are consumed by a source component and temporarily stored in a channel before being picked up by a sink which then stores the event in an external repository (a final destination point) or forwards the event to another agent for further processing. When sources, channels and sinks are chained together they define a flow in which events move between each hop/component using a transactional model to guarantee message delivery. Flume also supports the concept of interceptors, which allow events to be manipulated in-flight as they move between source components and downstream channels.
The next sections describe in more detail the source, channel, and sink we use to support our use case.
SQS Source
To retrieve events from SQS we created a custom Flume SQS source plugin which uses batch API calls, long polling and a back off policy for improving efficiency and reducing costs. In Amazon you pay for what you use so the fewer calls you make the less expensive it is.
Our plugin is heavily instrumented (extending from Flume’s in-built MonitoringCounterGroups framework) to provide us with metrics to measure how the plugin is performing. These metrics can be queried using Jconsole or (if enabled) exposed via a Jetty web server running inside the agent. An example of the web server output can be found below:
To provide better visibility of these metrics we also publish them to Amazon’s CloudWatch service via a custom Flume MonitoringService plugin. We have alarms on key metrics to notify us when things look abnormal and require our attention.
With regards to efficiency we always try and operate our Flume nodes at 90% batch efficiency (BatchEfficiencyPercentage). This means that for every batch request we make to SQS we get back 9 out of a maximum 10 messages. Unlike our producers who try and get rid of data as fast as possible to mitigate potential data loses when servers are terminated, consumers can simply back off to achieve maximum efficiency and reduced costs.
Event Partition Interceptor
To access our data efficiently we partition our events in S3 by date, type and sub_type. This partitioning allows us to:
Process our data more easily. The event type and sub_type form part of the S3 key (file path) and provide our processing tools upfront knowledge of what type of processing should be applied to the data without the need to actually deserialize the content.
Only read relevant data. Without partitioning, if we wanted to retrieve a list of all transactions made in the w/c 2014-06-09 we’d need to read approx. 1TB of data, deserialize the JSON events and discard 98.6% to access the mere 15MB of data we’re actually interested in.
Achieve higher levels of compression.
To support our partitioning logic we created a custom Flume interceptor that deserializes the JSON event, looks at the data structure and assigns it to a partition recorded in the event’s message header. When the event is later transferred from the File channel to the HDFS sink, those headers are used to construct the location of where data should be written to in S3.
Infrastructure & File Channel
Within our data collection infrastructure we have several Amazon Elastic Compute Cloud (EC2) instances provisioned via CloudFormation