Batch processing

Provide batch processing to nodes

Introduction

The goal is to provide a batch life-cycle for nodes to integrate with

Nodes may want to process events as part of a larger transaction. For example a set of bank transfer events may make up a single transaction that prints a confirmation on completion. We could tie the confirmation node to a batch life-cycle.

Fluxtion provides two annotations that configure batch processing:

Annotation

Behaviour

Notifies the node more messages are expected but there is a pause in the message flow.

Notifies the node that the batch has completed.

The generated SEP implements the BatchHandler interface. Applications can call the BatchHandler methods and the SEP will handle all dispatch logic to nodes on the execution graph. Batch handlers are invoked in reverse topological order.

Example

The example shows two nodes that declare batchPause and batchEnd methods using the annotations defined. Batch invocations are in reverse topological order.

The code for the example is located here.

The node classes

public class DataHandler {

    @EventHandler
    public void dataEvent(DataEvent event) {}

    @OnBatchPause
    public void batchPause() {}

    @OnBatchEnd
    public void batchEnd() {}
}


public class BatchNode {
    
    private final DataHandler handler;

    public BatchNode(DataHandler handler) {
        this.handler = handler;
    }
    
    @OnBatchPause
    public void batchPause(){}
    
    @OnBatchEnd
    public void batchEnd(){}
}

The generated SEP

As described the node batch methods are invoked in the SEP BatchHandler interface methods. The child node does not declare any batch methods as is not included in batch callbacks.

public class SampleProcessor implements EventHandler, BatchHandler, Lifecycle {

  //Node declarations
  private final DataHandler dataHandler_1 = new DataHandler();
  private final BatchNode batchNode_3 = new BatchNode(dataHandler_1);
  private final ChildNode childNode_5 = new ChildNode(batchNode_3);
  //code omitted for clarity

  public void handleEvent(DataEvent typedEvent) {
    //Default, no filter methods
    dataHandler_1.dataEvent(typedEvent);
    childNode_5.recalculate();
    //event stack unwind callbacks
    afterEvent();
  }

  @Override
  public void batchPause() {
    batchNode_3.batchPause();
    dataHandler_1.batchPause();
  }

  @Override
  public void batchEnd() {
    batchNode_3.batchEnd();
    dataHandler_1.batchEnd();
  }
}

png

The png generated by the Fluxtion ESC:

Last updated

Was this helpful?