d3cep

name=D3CEP | version=0.1 | accountable=Gustavo L.B. Baptista | depdency= DDS with Dynamic Topics (XTypes)

D3CEP, is a Distributed Event Processing middleware that is executed on the SDDL Core. It allows the dynamic deployment of Event Processing Agents (EPA) on a computer cluster or cloud computing nodes.

Each of such agents execute one or more CEP rules, which are Event-Condition-Actions (ECA) rules. These rules describe sub-patterns of events to be detected from the agent’s input events. Many agents can be logically connected to each other in an Event Processing Network (EPN) to compose a global processing logic, in which each agent’s output produces more abstract events that are consumed by other agent’s input.

The communication paradigm used in DCEP systems is an important design decision, which impacts key properties such as response time, scalability, reliability, availability and maintainability.

In a message-centric communication paradigm, the data exchange unit is the message, and the communication is performed by point-to-point connections. Examples of message-centric communication infrastructures are message queues systems, pub/sub infrastructures, etc. The role of the communication infrastructure is to ensure that messages are delivered correctly. In a Data-Centric communication paradigm, the exchange unit is the data itself. A data-centric infrastructure is aware and manages the following aspects of data: structure, content, constraints over structure and access to the data. There is a notion of a Global Shared Data Space, similar to a tuple space, which has the structure and instances of data. Nodes read and write to this space, and the role of the infrastructure is to ensure that all participants have a consistent and up to date view of the data.

The main difference about message-centric and data-centric infrastructures is how state is managed. Therefore, in message-centric communication, messages about data are sent and state is managed by communication peers. The middleware has no access to the messages contents. In a data-centric communication, data is directly dealt with, instead of sending messages about data. The middleware manages, consolidates and keeps state.

D3CEP uses a Data-Centric communication paradigm, so EPAs, producers and consumers don’t need to manage state. A reduction in the coupling among all entities (e.g. producers, consumers, rules, etc) is achieved. The data-centric nature of D3CEP provides an optimized performance for the routing of events among clustered processing nodes that execute EPAs, distributing the processing of an EPN to achieve scalability, and also optimizes the management of state among EPAs for the processing of distributed CEP rules. In order to achieve Data-Centric communication model, an implementation of the OMG DDS specification is used.

The Dynamic aspect of D3CEP provides flexibility in the definition and re-definition of event types, rules and situations of interest at execution time, without the need to stop a monitoring system. D3CEP requires the usage of a DDS product that implements the OMG DDS XTypes standard specification.

1.1. Main Execution Steps

1- The D3CEP service is initialized in the execution nodes, which periodically send heartbeats announcing their availability

2 – The Administrador creates Event Types

3 – The Administrador creates an EPN

4 – The Administrator maps the deployment of EPAs to Execution Nodes

5 – The service deploys the EPAs into the Execution Nodes

6 – EPAs become publishers and subscribers of the types used by the deployed rules

This section explains how the D3CEP system is used. First an example scenario is presented, than, how each of the components for the execution steps presented above are configured and started to implement the example.

The usage example is based on a Fire Detection Scenario.

2.1. Fire Detection Example Scenario

  • Smoke and Temperature Sensors

Smoke and temperature sensors are installed in the rooms monitored by the system. They produce elementary events into the DDS domain.

The smoke sensors produce the SmokeSensor event which contains the roomId, the sensorId, and a flag indicating whether smoke has been detected.

The temperature sensors produce the TemperatureSensor event, which contains the roomId, the sensorId, and the temperature measured in the room.

  • Smoke Detected EPA

This EPA consumes SmokeSensor events and when a measurement contains an indication that smoke has been detected, it generates the SmokeDetected event, which cointains the roomId and the sensorId where smoke has been detected.

  • High Temperature EPA

This EPA consumes TemperatureSensor events, and when a measurement contains an indication that temperature has passed a determined treshhold, it generates the HighTemperature event, which contains the roomId, the sensorId, and the measured temperature.

  • Fire Detected EPA

This EPA consumes SmokeDetected and HighTemperature events, and when both events have been received from sensors on the same room, it generates a FireDetected event, which contains the roomId, the smokeSensorId, the temperatureSensorId and the measured temperature.

2.2. Implementing the Fire Detection Scenario
  • 1- Run the D3CEP Processing Node Service in each Core Processing Node

First, each Processing Node (PN) of the core network needs to run a daemon process which runs the D3CEP service, allowing each node to passively become a container of D3CEP entities and to run an internal [http://esper.codehaus.org/tutorials/tutorial/tutorial.html|ESPER]] CEP engine. A settings file is passed to the service with the properties of the node.

The following file contains sample settings for the node 1 to be used for executing the example. Other equivalent properties files must be defined for each existent node and service instance, e.g. pn2.properties and so on.

pn1.properties
id=1
name=NodeA
kind=PNDaemon
info=Node A PNDaemon
enabled=true
machineName=VM1
networkAddress=191.168.2.118
environmentData=
periodicityMs=10000
pnServiceLogLevel=INFO
cepEngineLogLevel=INFO
  • 2- Run the Metadata Administrator Application

Then, the declaration of the event types involved in the processing needs to be inserted into the D3CEP domain. The Metadata Administrator application should be run, passing an XML file with the event types to be declared.

The following XML file declares the event types used in the Fire Detection Example:

metadataFireDetection.xml
<?xml version="1.0" encoding="UTF-8"?>
<metadata-descriptions>
 
    <metadata name="SmokeSensor" kind="EventType" info="Sent by smoke sensors" enabled="true">
        <members>
            <member name="sensorId"      type="String"  size="1024" isKey="true"  info="The id of the smoke sensor"/>
            <member name="roomId"        type="String"  size="1024" isKey="false" info="The id of the room where the smoke sensor is located"/>
            <member name="smokeDetected" type="boolean" size="0"    isKey="false" info="Flag indicating whether the smoke sensor has detected smoke"/>
        </members>
    </metadata>
 
    <metadata name="SmokeDetected" kind="EventType" info="Event generated when smoke is detected" enabled="true">
        <members>
            <member name="sensorId"      type="String"  size="1024" isKey="true"  info="The id of the smoke sensor"/>
            <member name="roomId"        type="String"  size="1024" isKey="false" info="The id of the room where the smoke sensor is located"/>
        </members>
    </metadata>
 
    <metadata name="TemperatureSensor" kind="EventType" info="Sent by temperature sensors" enabled="true">
        <members>
            <member name="sensorId"      type="String"  size="1024" isKey="true"  info="The id of the smoke sensor"/>
            <member name="roomId"        type="String"  size="1024" isKey="false" info="The id of the room where the smoke sensor is located"/>
            <member name="temperature"   type="float"   size="0"    isKey="false" info="The measured temperature"/>
        </members>
    </metadata>
 
    <metadata name="HighTemperature" kind="EventType" info="Event generated when high temperature is detected" enabled="true">
        <members>
            <member name="sensorId"      type="String"  size="1024" isKey="true"  info="The id of the smoke sensor"/>
            <member name="roomId"        type="String"  size="1024" isKey="false" info="The id of the room where the smoke sensor is located"/>
            <member name="temperature"   type="float"   size="0"    isKey="false" info="The measured temperature"/>
        </members>
    </metadata>
 
    <metadata name="FireDetected" kind="EventType" info="Event generated when fire is detected" enabled="true">
        <members>
            <member name="roomId"              type="String"  size="1024" isKey="true" info="The id of the room where the fire was detected"/>
            <member name="smokeSensorId"       type="String"  size="1024" isKey="false"  info="The id of the smoke sensor"/>
            <member name="temperatureSensorId" type="String"  size="1024" isKey="false"  info="The id of the temperature sensor"/>
            <member name="temperature"         type="float"   size="0"    isKey="false" info="The measured temperature"/>
        </members>
    </metadata>
 
</metadata-descriptions>
  • 3- Run the D3CEP Administrator Application

Finally, the D3CEP Administrator Application can be used to define the Processing State, which is the description of the CEP entities that implement the example. An XML file is used to define the Event Processing Networks (EPN), which is composed by a set of Event Processing Agents (EPA) which exchange primitive and complex events among each other.

The mapping for the deployment of each EPA to the Processing Nodes is also defined in this file.

The D3CEP Administrator application then publishes all entities and deployment mappings into the D3CEP DDS domain those entities. The PNs receive the deployment state, and deploy the EPAs assigned to them.

The following XML file implements the Fire Detection example:

deploymentStateFireDetection.xml
<?xml version="1.0" encoding="UTF-8"?>
 
<d3cep-admin>
 
	<deployment-state name="D3CEP_DeploymentState" id="1" info="D3CEP Deployment State" enabled="true">
 
	   <epns>
	      <epn name="D3CEP_EPN" id="1" info="D3CEP EPN" enabled="true">
	        <epas>
	            <epa id="1" name="SmokeDetected_EPA" info="Monitors SmokeSensor events and generates SmokeDetected events when smoke is detected" enabled="true">
	               <event-types>
	                   <event-type>SmokeSensor</event-type>
	                   <event-type>SmokeDetected</event-type>
	               </event-types>
	               <rule id = "1" name="SmokeDetected" info="Monitors SmokeSensor events and generaes SmokeDetected events" enabled="true">
	                  <statements>
	                     <statement order = "1" id="1" name="D3CEP_Statement" info="Monitors SmokeSensor events and generaes SmokeDetected events" enabled="true">
	                        <text>                    
	                            <![CDATA[ 
	                            INSERT INTO SmokeDetected(sensorId,roomId) Select s.sensorId, s.roomId from SmokeSensor.win:time(? sec) s where s.smokeDetected = true
	                            ]]>
	                        </text>
	                        <publish-event-types></publish-event-types>
	                        <subscribe-event-types>
	                        	<event-type>SmokeSensor</event-type>
	                        </subscribe-event-types>
	                        <parameters>
		                        <parameter id = "1" order="1" valueRep="60" type="Integer" name="timeWindowSec" info="The size, in seconds, of the time window considered by the rule" enabled="true"/>
	                        </parameters>
	                     </statement> 
	                     <statement order = "2" id="2" name="D3CEP_Statement" info="Publishes SmokeDetected events" enabled="true">
	                        <text>                    
	                            <![CDATA[ 
	                            Select * from SmokeDetected
	                            ]]>
	                        </text>
	                        <publish-event-types>
	                        	<event-type>SmokeDetected</event-type>
	                        </publish-event-types>
	                        <subscribe-event-types></subscribe-event-types>
	                        <parameters>
	                        </parameters>
	                     </statement>
	                  </statements>
	               <external-functions/>
	               </rule>
	            </epa>
	            <epa id = "2" name="HighTemperature_EPA" info="Monitors TemperatureSensor events and generates HighTemperature events when high temperature is detected" enabled="true">
	               <event-types>
	                   <event-type>TemperatureSensor</event-type>
	                   <event-type>HighTemperature</event-type>
	               </event-types>
	               <rule id = "2" name="HighTemperature" info="Monitors TemperatureSensor events and generates HighTemperature events" enabled="true">
	                  <statements>
	                     <statement order = "1" id="3" name="D3CEP_Statement" info="Monitors TemperatureSensor events and generates HighTemperature events" enabled="true">
	                        <text>                    
	                            <![CDATA[ 
	                            INSERT INTO HighTemperature(sensorId,roomId,temperature) Select s.sensorId, s.roomId, s.temperature from TemperatureSensor.win:time(? sec) s where s.temperature > ?
	                            ]]>
	                        </text>
	                        <publish-event-types></publish-event-types>
	                        <subscribe-event-types>
	                        	<event-type>TemperatureSensor</event-type>
	                        </subscribe-event-types>
	                        <parameters>
		                       <parameter id = "2" order="1" valueRep="60" type="Integer" name="timeWindowSec"    info="The size, in seconds, of the time window considered by the rule" enabled="true"/>
		                       <parameter id = "3" order="2" valueRep="40" type="Integer" name="temperatureLimit" info="The temperature limit to consider a high temperature" enabled="true"/>
	                        </parameters>
	                     </statement>
	                     <statement order = "2" id="4" name="D3CEP_Statement" info="Publishes HighTemperature events" enabled="true">
	                        <text>                    
	                            <![CDATA[ 
	                            Select * from HighTemperature
	                            ]]>
	                        </text>
	                        <publish-event-types>
	                        	<event-type>HighTemperature</event-type>
	                        </publish-event-types>
	                        <subscribe-event-types></subscribe-event-types>
	                        <parameters>
	                        </parameters>
	                     </statement>
	               	  </statements>
	               <external-functions/>
	               </rule>
	            </epa>
	            <epa id = "3" name="FireDetected_EPA" info="Monitors SmokeDetected and HighTemperature events and generates FireDetected events" enabled="true">
	               <event-types>
	                   <event-type>SmokeDetected</event-type>
	                   <event-type>HighTemperature</event-type>
	                   <event-type>FireDetected</event-type>
	               </event-types>
	               <rule id = "3" name="FireDetected" info="Monitors SmokeDetected and HighTemperature events and generates FireDetected events" enabled="true">
	                  <statements>
	                     <statement order = "1" id="5" name="D3CEP_Statement" info="Monitors SmokeDetected and HighTemperature events and generates FireDetected events" enabled="true">
	                        <text>           
	                            <![CDATA[ 
	                            INSERT INTO FireDetected(roomId,smokeSensorId,temperatureSensorId,temperature) 
	                            SELECT s.roomId, s.sensorId, h.sensorId, h.temperature
	                            FROM SmokeDetected.std:unique(roomId) as s, HighTemperature.std:unique(roomId) as h 
	                            WHERE s.roomId = h.roomId
	                            ]]>
	                        </text>
	                        <publish-event-types></publish-event-types>
	                        <subscribe-event-types>
	                        	<event-type>SmokeDetected</event-type>
	                        	<event-type>HighTemperature</event-type>
	                        </subscribe-event-types>
	                        <parameters>
	                        </parameters>
	                     </statement>
	                     <statement order = "2" id="6" name="D3CEP_Statement" info="Publishes FireDetected events" enabled="true">
	                        <text>                    
	                            <![CDATA[ 
	                            Select * from FireDetected
	                            ]]>
	                        </text>
	                        <publish-event-types>
	                        	<event-type>FireDetected</event-type>
	                        </publish-event-types>
	                        <subscribe-event-types></subscribe-event-types>
	                        <parameters>
	                        </parameters>
	                     </statement>
	               	  </statements>
	               <external-functions/>
	               </rule>
	            </epa>
	        </epas>
	      </epn>
	   </epns>
 
	   <pns-deployments>
	      <deployment id="1" name="ProcessingNodeDeployment1" info="Processing Node Deployment 1" enabled = "true" processing-node-id="1">
	         <deployed-epas>
	            <deployed-epa name="SmokeDetected_EPA"/>
	         </deployed-epas>
	      </deployment>
	      <deployment id="2" name="ProcessingNodeDeployment2" info="Processing Node Deployment 2" enabled = "true" processing-node-id="2">
	         <deployed-epas>
	            <deployed-epa name="HighTemperature_EPA"/>
	         </deployed-epas>
	      </deployment>
	      <deployment id="3" name="ProcessingNodeDeployment3" info="Processing Node Deployment 3" enabled = "true" processing-node-id="3">
	         <deployed-epas>
	            <deployed-epa name="FireDetected_EPA"/>
	         </deployed-epas>
	      </deployment>
	   </pns-deployments>
 
	</deployment-state>
 
</d3cep-admin>
  • 4- Run the Subscriber_FireDetection Application

A sample subscriber application is implemented inside the D3CEP_Administrator module. It subscribes to SmokeDetected, HighTemperature and FireDetected events and receives notifications from the EPAs deployed in the D3CEP Processing Nodes, when they are detected.

  • 5- Run the Publisher_Sensors Application

A sample publisher application is implemented inside the D3CEP_Administrator module. It publishes SmokeSensor and TemperatureSensor events, with values that trigger the detection of the rules of the EPAs (e.g. they publish a smoke sensor reading indicating detected smoke, and a temperature reading with a high temperature value, both for the same roomId).

  • 6- Check the console of the Subscriber_FireDetection Application

The console of the subscriber application should print the received events: SmokeDetected, HighTemperature and FireDetected.

Descrever a arquitetura do componente. As principais classes e relações que sustentam o componente.

  • getService() Returns Service
  • myMethod() Returns Service

Detalhes de implementação, por exemplo, descrevendo as principais rotinas e os seus fluxos de execução, por onde o desenvolvedor deveria olhar para modificar esse componente.

G. Baptista, M. Endler, Data-Centric and Dynamic Distributed Complex Event Processing for Large-Scale Situation Detection (titulo provisório), Tese de Doutorado, Departamento de Informática, PUC-Rio, 2014 (em breve)

  • d3cep.txt
  • Last modified: 2017/07/21 03:08
  • (external edit)