Tutorial 1: Event Aggregation : Streaming BTS
In this tutorial we'll learn how Blurr performs basic data aggregation. The following concepts will be introduced:
- The Blurr Transform Spec document (BTS)
- The basic blocks of a BTS:
Header
,Store
,Identity
andAggregates
- How events are processed and aggregated one by one by a
Block Aggregate
- How
Identity
andDimensions
are used to create new records.
Try the code from this example launching a Jupyter Notebook.
1. Events
Our sample application is a fairly simple game in which the player can either win or lose.
Users can play as many games as they want in one sitting, what we call a session. Each event will have a session_id
to identify the session in which the game was played.
This app collects 2 types of events:
game_start
: sent when a user starts a new game.game_end
: sent when a user finishes a game. Contains awon
field that marks whether the user won the game (1
for a win,0
for a loss).
Example:
{
"user_id": "09C1", # unique user identifier
"session_id": "915D", # the session the game is played on
"event_id": "game_start", # type of the event
"country" : "US", # demographic data
"timestamp": "2018/03/04 09:01:03" # time of the occurrence of the event
}
Events are stored as JSON
entries, split by a new line character \n
:
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:01:03" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:03:04" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:04:31" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:10:22" }
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:11:03" }
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:21:55" }
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_start", "timestamp": "2018/03/04 09:22:13" }
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:25:24" }
Our goal is to collect session statistics, such as games played in a session by a user, or the total games won.
2. The Transformation
For the sequence of events listed before we're interested in the number of games played and number of games won by player and session.
We will transform the original sequence of events into an series of records containing the desired information:
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 2 | 2 |
D043 | B6FA | 1 | 1 |
T8KA | 09C1 | 1 | 1 |
In order to obtain this transformation, Blurr will process the events sequentially one by one using this Blurr Transform Spec (BTS) file.
Type: Blurr:Transform:Streaming
Version: '2018-03-01'
Name: sessions
Stores:
- Type: Blurr:Store:Memory
Name: hello_world_store
Identity: source.user_id
Import:
- { Module: dateutil.parser, Identifiers: [ parse ]}
Time: parse(source.timestamp)
Aggregates:
- Type: Blurr:Aggregate:Block
Name: session_stats
Store: hello_world_store
Dimensions:
- Name: session_id
Type: string
Value: source.session_id
Fields:
- Name: games_played
Type: integer
When: source.event_id == 'game_start'
Value: session_stats.games_played + 1
- Name: games_won
Type: integer
When: source.event_id == 'game_end' and source.won == 1
Value: session_stats.games_won + 1
Let's have a quick look at the five main blocks of this BTS: Header
, Store
, Time
, Identity
and Aggregates
.
2.1. Header
Type: Blurr:Transform:Streaming
Version: '2018-03-07'
Name : sessions
Type
and Version
identify the capabilities of the BTS.
Further in this series of tutorials we'll introduce different types of BTSs, such as Window
BTS. We'll also learn how BTSs are combined, the reason why every BTS must have a unique Name
.
2.2. Store
Store:
- Type: Blurr:Store:Memory
Name: hello_world_store
The output of a transformation is a collection of records persisted in a datastore. For this example we'll be using an in-memory datastore.
2.3. Identity
Every BTS has an Identity, which is always a property of the events being processed. In our example, the Identity is the property user_id
:
Identity: source.user_id
In a BTS we can access the properties of the event being processed using the
source
keyword, as insource.user_id
orsource.won
The Identity is the main dimension around which events are aggregated. At this stage, let's just think on the Identity as a mandatory field that is part of both the original events and the output.
2.4. Time
Used to parse timestamp expressions from events.
Time: parser.parse(source.timestamp, 'YYYY/mm/dd HH:MM:SS')
Among other things, Blurr uses Time
to internally generates start_time
and end_time
values for each session. We'll see in the next tutorial why this is critical to certain aggregation features.
2.5. Aggregates
This is where the magic happens. Aggregates define the nature of the transformation. Our example has a single Aggregate of type Block Aggregate
. Different types of Aggregates will be introduced in the next tutorials.
We'll learn how the transformation happens in the next section by examining the flow of data event by event.
3. Data Flow
Events are processed one by one, and then aggregated as defined in the Block Aggregate
:
Aggregates:
- Type: Blurr:Aggregate:Block
Name: session_stats
Store: hello_world_store
Dimensions:
- Name: session_id
Type: string
Value: source.session_id
Fields:
- Name: games_played
Type: integer
When: source.event_id == 'game_start'
Value: session_stats.games_played + 1
- Name: games_won
Type: integer
When: source.event_id == 'game_end' and source.won == '1'
Value: session_stats.games_won + 1
In order to understand how Block Aggregate
aggregates data we'll use the sequence of events from the initial section.
3.1. First Event : game_start
The first event is processed when the first user starts playing the game:
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start" }
Aggregates are calculated taking into account the historical series of events. In this case, games_played
is increased by 1
every time a new game starts:
- Name: games_played
Type: integer
When: source.event_id == 'game_start'
Value: session_stats.games_played + 1
Whenever a game_start
event is received, the existing session_stats.games_played
record is increased by one.
You can always access a field in the previously saved record by using the name of the Aggregate and the name of the field, such as in
session_stats.games_played
orsession_stats.games_won
.
Since this is the first historic event, the following will happen:
- A new record is created in the store with the default values for each field (
""
forstring
,0
forinteger
) - The event is processed, updating the record using the
Value
expressions for the field. The content ofValue
can be any Python expression.
The resulting record is added to the store:
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 1 | 0 |
3.2. Second Event : game_end
The user from the 1st event wins a game:
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1 }
Processing this event results in the existing record having games_won
increased by one:
- Name: games_won
Type: integer
When: source.event_id == 'game_end' and source.won == 1
Value: session_stats.games_won + 1
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 1 | 1 |
3.3. 3rd and 4th Event : user plays a new game
The same user plays and wins a new game in the same session:
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1 }
After processing both events,games_played
and games_won
are increased by one.
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 2 | 2 |
3.4. 5th and 6th Event : a new user plays a game
A second user starts a new game:
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_start" }
Previously we defined source.user_id
as the Identity of the BTS:
Identity: source.user_id
Here we introduce one of the roles of the Identity: whenever an event is received and the Identity value doesn't exist in the store (like when a new user plays a game), a new record is added:
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 2 | 2 |
D043 | B6FA | 1 | 0 |
After the game_end
event is received, the record is updated with the win result:
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_end", "won": 1 }
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 2 | 2 |
D043 | B6FA | 1 | 1 |
3.5. 7th Event : a user starts a new session
After some time, the user decides to play again. This is considered a new session from the game perspective:
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_start" }
There's an element of the Aggregate we haven't covered yet, Dimension
:
Dimensions:
- Name: session_id
Type: string
Value: source.session_id
Dimensions is a key component of event aggregation. A Block Aggregate
always contains a Dimensions
section, defining the record has to be upated with new events in the store.
The dimension fields are evaluated first every event. If the Block Aggregate
is not already evaluating these dimensions then
an existing record from the store is retrieved. If no record is found in the store then a new record is created.
source.session_id
is the value of the propertysession_id
in the event being processed (T8KA
).session_stats.session_id
is the value ofsession_id
in the last record saved for the same Identity (i.e. the last session played by the user,915D
)
source.session_id == session_stats.session_id
"T8KA" == "915D" # False - New record is created because T8KA doesn't already exist in the store
As a result of the evaluation of Dimensions
a new record is created in the store:
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 2 | 2 |
D043 | B6FA | 1 | 1 |
T8KA | 09C1 | 1 | 0 |
3.6. 8th Event : game_end
The previous user finishes the game:
{ "user_id": "09C1", "session_id": "T8KA", "country" : "US", "event_id": "game_end", "won": 1 }
Since session_id
is the same for the last record saved from the same user (created after the previous event):
source.session_id == session_stats.session_id
"T8KA" == "T8KA" # True
No record is created. The last record for that user is updated instead:
session_id | user_id | games_played | games_won |
---|---|---|---|
915D | 09C1 | 2 | 2 |
D043 | B6FA | 1 | 1 |
T8KA | 09C1 | 1 | 1 |
4. Previewing the transformation using Blurr CLI
We can preview the result of the transformation using blurr transform
command:
$ blurr transform --streaming-bts tutorial1-streaming-bts.yml tutorial1-data.log
["09C1/session_stats/915D/", {"_identity": "09C1", "_start_time": "2018-03-04T09:01:03", "_end_time": "2018-03-04T09:10:22", "games_played": 2, "games_won": 2, "session_id": "915D"}]
["09C1/session_stats/T8KA/", {"_identity": "09C1", "_start_time": "2018-03-04T09:22:13", "_end_time": "2018-03-04T09:25:24", "games_played": 1, "games_won": 1, "session_id": "T8KA"}]
["B6FA/session_stats/D043/", {"_identity": "B6FA", "_start_time": "2018-03-04T09:11:03", "_end_time": "2018-03-04T09:21:55", "games_played": 1, "games_won": 1, "session_id": "D043"}]
transform
prints the result of the transform in JSON format, which is slightly different from the table representation.
Each entry consists of an array with 2 items:
- A
identity/aggregate_name/session_id/
string. The Identity is represented byuser_id
in the tables. - An object with the remaining values of the record.