GitXplorerGitXplorer
a

eventstreamer

public
2 stars
6 forks
7 issues

Commits

List of commits on branch master.
Verified
4a662c8b1d503e764f5cf7e0d9a2bff7c5179bcd

Update README.md

aavodovnik committed 6 years ago
Unverified
72bba6bc7b4d07da61e204e262ebc560332152b1

Build file.

aavodovnik committed 6 years ago
Unverified
7cc71b4ea6813f05fa88d4878461d27dc1da53e4

Add appveyor build file.

aavodovnik committed 6 years ago
Verified
398bf14067d01da16477f08c06e1785154be506a

Update README.md

aavodovnik committed 6 years ago
Unverified
042e8e1dd47cd295e3888e03725224aa4b529722

Readme addition.

aavodovnik committed 6 years ago
Unverified
08f4251c7d4ca56e0c79308dc4493f3f27c667a8

Merge branch 'redis-fallback' of https://github.com/avodovnik/servicefabric-eventstreamer into redis-fallback

aavodovnik committed 6 years ago

README

The README file for this repository.

EventStreamer - Azure Function & Redis

This is the Redis implementation of the event streaming through Azure Functions and Redis.

Build status

What is it?

We're using an Azure Function to listen to events, coming into Event Hub in a cannonical format. Each event contains a DataPoint, which looks like this:

public class DataPoint
{
    public string Key { get; set; }
    public DateTime Timestamp { get; set; }
    public string DeviceId { get; set; }
    public string SessionId { get; set; }
    public string SensorType { get; set; }

    public List<string> Names { get; set; } = new List<string>();
    public List<string> Values { get; set; } = new List<string>();
}

The reason for this structure is simple: we believed that there are many use cases where we will be collecting similar structured data from sensors, and we knew there will be some similarities (i.e. SessionId, etc.). The Names and Values fields are split so that it makes processing them with something like Azure Stream Analytics easier, should we choose to use it in the future.

Events come in from multiple players, or uniquely identifiable streams. The function separates them based on SessionId. Because this is a specific implementation that is based on a real-world example of one of the partners we're working with, we've also made an assumption on a specific field being there - we build the Key based on SessionId and AL which is the field that we can assume exists.

This happens in the following snippet of code:

private static (DataPoint point, string message) SafelyConvertToDataPoint(byte[] data, ILogger log)
{
    try
    {
    ///     ...
        var key = $"{point.SessionId}:{point.Values[1]}";
    ///     ...
    } 
    /// ...
}

You can think of this function as a sort of demultiplexer. It looks at a continuous stream of data, and splits out multiple individual streams (in our case, per player).

What does it do with the data?

When data comes in, the function buffers them into Redis and tries to aggregate until it gets a full second of data; the reason here is simple, the customer uses sensors which emit data at 100 Hz. To make meaningful processing of this data possible, we need to aggregate it into a wider time period. To achieve this, we use the Task ProcessPlayerAsync(string playerId, (DataPoint point, string strRepresentation)[] messages, ILogger log) method, which looks at the buffer that we have available for each player, and makes a decision to push time forward or not. The method Task PushTimeAsync(string playerId, ILogger log), when called, then pops all the data from the buffer (Redis) queue, and aggregates the values within the DataPoint.

The result of this is an aggregated row that can be sent forward for processing, or used for calculations.

How to Run it?

The function needs two configuration settings to function properly:

  • incomingEventHub which is the connection string to the event hub onto which the events are being sent to, and
  • RedisConnectionString which of course, is the connection string of the Redis cache that is used as a buffer for the events inside Azure Function.

When deploying locally, edit the local.settings.json file, and add the above fileds into Values.

Note, even though they are connection strings, they don't go into the ConnectionStrings set of values, as that's reserved for SQL connections only.

When running on Azure, make sure to add the above settings into the Application Settings section of the Function.

Generating sample data

If you want to generate sample data, you can use the Streamer.CLI which is a part of this project's master branch (for now). The streamer generates random events that satisfy the above assumption. It allows streaming to, and listening from, an EventHub with multiple partitions, and ensures each player (for our definition of such an entity) is only ever streamed to a single partition.

You can run the streamer using the following command:

dotnet run stream --eh "event hub connection string" --num 5000 --interval 10 -s "98ef1baf-02d3-4122-8d54-02f83577d38f"

The stream tells the application to stream data into the EventHub, num defines the amount of events it produces, the interval specifies the frequency it simulates (i.e. 10 means 10ms, or 100 Hz), and the optional -s allows you to specify a Session ID (this way you can test without flooding Redis with too many new keys).

Known Problems

  1. At the moment, Redis is never cleaned, which means that we are getting a ton of unused keys. Issue #9.
  2. Sometimes we see timeout exceptions, but the recent addition of async code seems to have fixed that.