What is real-time processing in our platform?

On top of Cloud of Things you can use the Advanced Rules (powered by Apama) engine to define business operations for immediate processing of incoming data from devices or other data sources. These user-defined operations can for example alert applications of new incoming data, create new operations based on the received data (such as sending an alarm when a threshold for a sensor is exceeded), or trigger operations on devices. The operation logic is implemented in Apama’s Event Processing Language (EPL).

Apama’s Event Processing Language covers statements, which are organized into actions and monitors. Monitor files can be edited directly from within Cloud of Things using the Advanced Rules application. Alternatively, you can install Apama on your local machine and develop your applications with Software AG Designer - an Eclipse-based development environment. You can deploy your monitor files as Apama applications to Cloud of Things, see EPL Apps > Basic functionality in the Advanced Rules guide for more information.

For further information on using Apama’s Event Processing Language in Cloud of Things refer to Using Apama Event Processing Language below and to the Advanced Rules guide.

What are the benefits of using real-time processing?

Cloud of Things’s real-time processing feature has the following benefits:

Using the Apama Event Processing Language (EPL)

Overview

The Apama Event Processing Language has a syntax similar to Java. In addition to simple flow control statements such as if, while, for, users can write listeners with the on keyword to react to events.

Apama EPL is documented in the Apama documentation.

As an example, the following statement listens for new temperature sensor readings greater than a particular temperature:

on all Measurement(type="c8y_TemperatureMeasurement") as m {
    if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
        Alarm alarm    := new Alarm;
        alarm.type     := "c8y_TemperatureAlert";
        alarm.source   := m.source;
        alarm.time     := currentTime;
        alarm.text     := "Temperature too high";
        alarm.status   := "ACTIVE";
        alarm.severity := "CRITICAL";
        send alarm to Alarm.SEND_CHANNEL;
    }
}

Here, Measurement is a pre-defined event containing the measurements. In this example, m is the Measurement event, the listener is filtering for measurements which are c8y_TemperatureMeasurement and the property is c8y_TemperatureMeasurement.T.value which is in degrees Celsius of a temperature sensor (see Sensor library).

Listeners such as the above should be placed in a monitor in the onload statement, and the file must contain using statements for the types used by the listener - for most of the Cloud of Things events, these are in the package com.apama.cumulocity. The full list is provided below - for the sake of brevity, we will omit these from further examples:

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Error;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindMeasurement;
using com.apama.cumulocity.FindMeasurementResponse;
using com.apama.cumulocity.FindMeasurementResponseAck;
using com.apama.cumulocity.FindOperation;
using com.apama.cumulocity.FindOperationResponse;
using com.apama.cumulocity.FindOperationResponseAck;
using com.apama.cumulocity.FindEvent;
using com.apama.cumulocity.FindEventResponse;
using com.apama.cumulocity.FindEventResponseAck;
using com.apama.cumulocity.SendEmail;
using com.apama.cumulocity.SendSMS;
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;
using com.apama.correlator.timeformat.TimeFormat;
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.RequestType;
using com.softwareag.connectivity.httpclient.Response;

monitor ListenForHighTemperatures {
    action onload() {
        on all Measurement(type="c8y_TemperatureMeasurement") as e {
            if e.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
                // handle the measurement
            }
        }
    }
}

How can I create derived data from EPL?

To create a new alarm or operation, create an instance of the relevant event type and use the send statement to send it to the relevant channel (defined with a constant on the event type). Assume that an alarm should be generated immediately if the temperature of a sensor exceeds a defined value. This is done with the following statement:

on all Measurement(type="c8y_TemperatureMeasurement") as m {
    if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
        send Alarm("","c8y_TemperatureAlert",m.source,currentTime,"Temperature too high","ACTIVE","CRITICAL",1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
    }
}

Technically, this statement produces a new alarm event each time a temperature sensor reads more than 100 degrees Celsius and sends it to Cloud of Things.

How can I control devices from EPL?

Remote control with EPL is done by sending an operation event. Remote operations are targeted to a specific device. The following example illustrates switching a relay based on temperature readings:

on all Measurement(type="c8y_TemperatureMeasurement") as m {
    if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
        send Operation("",m.source,"PENDING",{"c8y_Relay":<any>{"relayState":"CLOSED"}}) to Operation.SEND_CHANNEL;
    }
}

How can I query data from EPL?

It may be required to query information from the Cloud of Things database as part of the ongoing event processing. This is supported by sending events and using listeners to wait for responses. Here is an example that shows how to summarize total sales for vending machines every hour. The sales report data created after a purchase is retrieved from the Cloud of Things database.

using com.apama.aggregates.count;

monitor SalesReport {
    event SalesReport {
        Event e;
        ManagedObject customer;
    }
    event SalesOutput {
        integer count;
        string customerId;
    }

    action onload() {

        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

        on all Event() as e {
            monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
            integer reqId := integer.getUnique();
            on all FindManagedObjectResponse(reqId=reqId) as mor and not FindManagedObjectResponseAck(reqId=reqId) {
                route SalesReport(e, mor.managedObject);
            }
            on FindManagedObjectResponseAck(reqId=reqId) {
                monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
            }
            send FindManagedObject(reqId,"",{"childAssetId":e.source}) to FindManagedObject.SEND_CHANNEL;
        }

        from sr in all SalesReport() within 3600.0 every 3600.0
            group by sr.customer.id
            select SalesOutput(count(), sr.customer.id) as sales {
            send Measurement("", "total_cust_trx", "customer_trx_counterId", currentTime,
                {
                    "total_cust_trx":{
                        "total":MeasurementValue(sales.count.toFloat(), "COUNT", new dictionary<string,any>)
                    }
                }, {"customer_id":<any> sales.customerId}) to Measurement.SEND_CHANNEL;
        }
    }
}

In the above example we start by creating definitions for SalesReport and SaleOutput events. These hold the SalesReport (the Event and ManagedObject that identifies a sale) and the information we want to derive from a set of sales: the count and customerId. We listen for Event objects, and send a FindManagedObject request to look up the ManagedObject that the event came from. These SalesReport objects are sent, via the route statement, into a stream query. The stream query fires every hour (3,600 seconds) and selects an aggregate of the sales data per customer, and sends a new measurement representing the sales for that vending machine.

How is real-time processing implemented?

Cloud of Things provides several processing modes for API requests:

CEP architecture

Example

Assume that location updates from cars should be monitored every second while the car is driving, but only be stored once a minute into the database for reporting purposes. This is done using the following statement:

using com.apama.cumulocity.Event;
using com.apama.cumulocity.Measurement;

monitor SendEveryMinute {

    dictionary<string, Event> latestUpdates;

    action onload() {

        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
        on all Event() as e {
            if e.params.hasKey("c8y_LocationUpdate") {
                latestUpdates[e.source] := e;
            }
        }

        on all wait(60.0) {
            Event e;
            for e in latestUpdates.values() {
                send e to Event.SEND_CHANNEL;
            }
            latestUpdates.clear();
        }
    }
}