Quick start

Complete quick start example demonstrating integration of Fluxtion into an application

Introduction

5 Minute tutorial to demonstrate stream data processing using Fluxtion. The goal is to read a sensor data stream for a set of rooms, calculate aggregate values per room and notify a user class when a room breaches set temperature criteria. The user class will send an SMS alert when notified, if a number has been registered.

To get benefit out of this tutorial you should have:

  • A passing understanding of stream processing

  • Intermediate Java coding skills combined with basic knowledge of git and maven

Requirements

  • Read room sensor temperature as a stream of csv records or as instances of SensorReading events.

  • Merge csv records and SensorReading instances into a single event stream for processing

  • The event stream can be infinite

  • For each room calculate the max and average temperature

  • Run a tumbling window, zeroing all room values every 3 readings

  • Register a user class with the stream processor to act as a controller of an external system

  • If a room has an average of > 60 and max of > 90 then

    • Log a warning

    • A user class(TempertureController) will attempt to send an SMS listing rooms to investigate

  • Register an SMS endpoint with the controller by sending a String as an event into the processor

  • The SMS number can be updated in realtime

Running the application

Clone the quickstart project and execute the sensorquickstart.jar in the dist directory, as shown below:

The application processes the file temperatureData.csv as an input in place of real sensor source. The last three records trigger an alert condition. No SMS endoint is registered so the controller is unable to send a message.

After reading the csv file SensorReading events are programatically sent to the processor, to register an SMS number and create an alert condition. In this case the controller can now send an SMS message.

The environmental variable fluxtion.cacheDIrectory sets the cache directory for outputs of the fluxtion compiler. Executing a second time sees a significant reduction in processing time as the classes in the cache directory are used and no compilation of the stream processor is required. Deleting the cache directory will force the application to recompile at the next start.

Solution description

Dependencies

Build a processor and stream events

The code below is from SensorMonitor builds a streaming processing engine and sends events to it.

A method reference is passed to the reuseOrBuild function on line 2 to build the graph. The two string parameters are used as the fully qualified name of the generated stream processing class. The call to reuseOrBuild checks the classpath for a class that matches the fully qualified name. If no class can be loaded for that fqn, then a new stream processor is generated.

Processing events

Once built the application can send CharEvent's from a file to the generated StaticEventProcessor using the utility method CharStreamer,stream() on line 5 or programmatically via the onEvent method on line 7. The processor will dispatch events within the execution graph to the correct instance.

Defining the calculation

The builder method constructs the processor on lines 14-31. Method references are used throughout to increase type safety, make refactoring easier and make ide's more productive in building event processors.

Integrated user classes

The builder refers to two helper instances that define the input and output datatypes. Lombok is used to reduce boilerplate code for getter/setters.

A user supplied mapping function converts the aggregated sensor data for all rooms and creates a collection of room names that require investigation:

A user supplied controller instance is registered with the stream processor. When the list of rooms to investigate is > 0, the list is pushed to the user controller class.

To register an SMS endpoint the controller class annotates a method to receive a String as an event. The processor will route any String to the controller class method.

Cached compilation

The application generates a solution in a cache directory, set with system property:

-Dfluxtion.cacheDirectory=fluxtion

The fluxtion cache directory contains three sub-directories:

  • classes - compiled classes generated by the Fluxtion compiler

  • resources - meta-data describing the processor

  • sources - The java source files used to generate the classes

The classes directory is added to the classpath by the reuseOrBuild method. If an event processor cannot be loaded with the fqn supplied then the classes generated by the ahead of time compiler are written to the classes directory.

Executing the jar a second time sees a significant reduction in execution time as the application loads the compiled processor from the first run. Using the cached compiled classes gives an almost instant response to the input event stream.

Deleting the cache directory will cause the regeneration and compilation of the solution.

Generated files

  • source - A CSV marshaller and stream processing solution are generated here.

    • The main entry point to the stream processor is RoomSensorSEP.java

    • The CVS marshaller is SensorReadingCsvDecoder0.java

  • classes - The sources are compiled in this directory, ready for subsequent executions

  • resources - Description of the generated process graph for rendering and non class resources required at runtime

    • An image describing the processing graph

    • A graphml that can be interactively explored with a netbeans plugin

    • Any user lambdas used in the processor are serialised for loading in the generated processor

Last updated

Was this helpful?