fb
WSO2 12 min

Processed upon Arrival: Real-time Event Processing with CEP

Jochen_Traunecker.jpg
Jochen Traunecker
Principal Solution Architect
7 jul no 1
Scroll

“You have reached the maximum number of log-in attempts.” Despite the simple and clear message, determining the number of log-in attempts made can be very complicated. Log-in attempts can be initiated from various sources such as portals or mobile applications (apps) and yet the calculation must be carried out as quickly as possible (real time). Ideally, the calculation will have been completed before the next log-in attempt is made so that the account is already blocked. Complex Event Processing is a way of processing these events in real time. This article presents Complex Event Processing (CEP) using the example of Siddhi as a CEP processor and WSO2 CEP as a CEP server.  

We are surrounded by events all the time. Your cellphone vibrates to indicate an incoming text message, warning tone signals that the doors of the train are closing, and countless lights flash in our cars to warn us about low fuel, freezing temperatures or an unbuckled seat belt. Embedded systems such as the ones in our cars have long been working in this event-oriented way. In this regard, the sources of events are usually sensors. The distance sensor conveys your distance to the car ahead of you at regular intervals. By aggregating these events, the car can work out whether the distance is getting greater or smaller. If the distance reduces to below a certain point, a signal can be sent to the braking system to initiate braking until the distance between the two cars is once again greater than the set value. Aggregating the events also makes it possible to determine how quickly the distance is being reduced and therefore to work out how hard to brake and whether to initiate emergency braking.

This is where Complex Event Processing (CEP) comes in. CEP makes it possible to continuously aggregate individual events or streams of events and to produce higher-order events from that. In the above example, higher-order events such as “initiate braking” or “accelerate” are produced from the simple distance events. CEP attempts to calculate patterns, relationships and data abstractions between the events, and to immediately react to that by sending another event.

However, CEP systems are still relatively uncommon, especially in the field of business management applications such as CRM, ERP, HR and in the therein integration into these systems, CEP is not often used. Applications that require a real time reaction are usually carried out in batches that are repeatedly implemented at short intervals. With Siddhi [1] and the WSO2 Complex Event Processor [2], there is now an opportunity for CEP to become established as a central component of IT infrastructure. Siddhi and WSO2 CEP come under the Apache 2 license, i.e. they are open source. Siddhi is a CEP engine written in Java and it can be embedded directly into individual applications. WSO2 CEP expands on Siddhi, offering management functions and a number of adapters so that events can be received via various protocols and in various formats.

Event-driven Architecture and Big Data as Driver
Because of the ever-increasing integration of systems, event-orientation is also used in traditional application fields such as order management. When an order is submitted, instead of calling up each system (such as inventory management and payments) individually, informing them of the submitted order and, in some cases, obstructing the system as it awaits an answer, the order management system simply publishes an event on which the other systems can register. These systems can, in turn, publish events themselves. This leads to a looser interlinking between the systems because a system only needs to know which events it is interested in, no matter through which system these were published. And the system doesn’t need to know which parts are interested in individual events, it simply needs to publish the events. The architecture pattern that follows this idea of system design is also called event-driven architecture (EDA). The particular characteristics of an EDA is the asynchronicity inherently associated with the architecture pattern. A more detailed description of this architecture pattern can be found under [3]. As part of an EDA, the CEP is assigned the task of making visible the connections between the events. For example, a number of expensive purchases made with the same credit card within a few minutes can be an event of interest because there is a chance that the purchases are made using stolen credit card details.

Unlike other solutions (see [4]), CEP offers the possibility of processing thousands of events virtually in real time, which is why CEP is often mentioned in the context of “big data”. In contrast to existing solutions for processing event streams such as Apache Storm, CEP systems offer the added advantage of declarative query language.

Structure of the WSO2 CEP Server
Figure 1 shows the architecture of the WSO2 CEP server and the principal path of the events through the server.

 7_jul_no_1.png

Figure 1: Architecture of the WSO2 CEP server (Source: WSO2)

 

The events are received via the Input Event Adapter. Adapters are available for most of the established protocols such as HTTP, SOAP, JMS and email. A special WSO2 Event Adapter, which receives events via Thrift, is available for integration with other WSO2 products. A complete list of the existing adapters can be found in document [5]. Furthermore, you can develop develop specific adapters for other procotols yourself.

The Input Event Adapter passes the message on to an Event Builder. The Event Builder is responsible for translating the incoming events into the event format of the CEP server. The standard event formats supported are WSO2Event, XML, JSON, MAP and Text. The Event Builders also support simple mapping operations. XPATH is used for XML and JSONPath is used for JSON.

After leaving the Event Builder, the event reaches the Event Stream Manager. The Event Stream Manager is responsible for the management of all event streams and is the central hub for all events. This is also where the definitions of the event streams are managed. Event streams are always specified in a WSO2-specific format [6]. Events are forwarded to the processing engine, where they are received, on the basis of these definitions. Event streams can also be associated with in-flows and out-flows so that events can be received and published using various protocols and formats.

The event processor is responsible for carrying out the queries, whose definitions we’ll be looking at later. In the case of the WSO2 CEP servers, the queries are part of an execution plan. A Siddhi engine is started up for a query. The connection between WSO2 CEP servers and Siddhi can also be seen clearly here. Siddhi is the CEP engine that is used by the WSO2 CEP server.

The Event Stream Manager passes on outgoing events to an Event Formatter. This formatter can transform the internal WSO2 event format into other formats such as XML, JSON and Text, and can easily execute mappings analog to the Event Builder.

Finally, the Output Event Adapter’s task is to send the outgoing events via various protocols. The same protocols are supported here as standard as the ones supported by the Input Event Adapter. It is possible to support other protocols by implementing a specific protocol adapter here too.

Writing Queries
After having looked at the path taken by events through the server we now come to the most important part – writing queries.

Event definition is an important element. This specifies what attributes an event has. Unfortunately, the term “event definition” has two meanings in WSO2 CEP. The previous section presented the WSO2 Event Stream Definition. The event definition presented in this section is the event definition used internally by Siddhi. WSO2 CEP users need not worry about this because here, the definitions are automatically synchronized. However, it is important to present them here for the sake of completeness. Listing 1 shows a Siddhi event definition. Inside Siddhi, every stream is identified using a name.

define stream AccessLogEntry(clientip string, request string, ...)

Listing 1

Siddhi queries bear a resemblance to SQL queries. Here, using “select”, it is not the tables but the streams that are selected, from which information is to be read. Events are written in streams using “insert into”.

Siddhi supports a number of operators to select events. A filter is used to set the requirement for a certain attribute value, When doing so, =, <, > are available as relational operators. Individual requirements can be combined to create complex requirements using the “and”, “or” and “not” operators. Listing 2 shows a selection based on an IP address and the HTTP verb.

select from AccessLogEntry[clientip == ’127.0.0.1’ and verb == ’GET’]

insert into LocalAcces clientip, verb, request;

Listing 2

Streams of events can, in theory, contain an infinite number of events. Siddhi offers the option of forming so-called “windows” as a way of selecting a partial quantity of these events. Siddhi supports windows that are defined by the number of events (e.g. the last 100 events), time (e.g. all events within the last minute) or the attribute value (e.g. all events with the same IP address). A detailed description of the windows is given in the documentation. It should be noted here that temporal windows set as a basis for the window calculation use the time stamp at which the event was received by Siddhi. If you need a calculation of the window on the basis of the time stamp when the event was created, newer Siddhi versions offer the option of reverting to the External Time Window. The result of a window is a huge number of events, upon which further operators can be used. The output of events can either be triggered as soon as all events have been summarized (batch window) or whenever a new event arrives (sliding window). Listing 3 shows an example where a sliding window is produced which gathers all the events that arrive in one minute.

from AccessLogEntry[response == '401']#window.time( 1 min )

select clientip, request, count(response) as requestNumber

group by clientip, request

insert into Unauthorized for all-events;

Listing 3

There is also the option of combining a number of streams via a join operator. The join operator is used as standard when an event comes in on one of the streams. If this is not desired, the keyword can be used to unidirectionally define a stream, which triggers the join when an event comes in. That way a join is executed as an INNER JOIN. It should be noted that at least one stream must have a defined window. Listing 4 shows a join that is executed as soon as a TickEvent comes in.

from TickEvent[symbol==’IBM’]#window.length(2000) as t unidirectional

join NewsEvent#window.time(500) as n

on t.symbol == n.company

insert into JoinedStream

Listing 4

Siddhi also offers the option of checking patterns of events that arise one after each other. The operators used for this are called “Pattern” and “Sequence”. The difference between Pattern and Sequence is that with Pattern, patterns can be interrupted by other events, whereas with Sequence the exact pattern must appear. Listing 5 shows a pattern that allows you to recognize repeat customers.

from every a1 = PizzaOder 

     -> a2 = PizzaOder[custid=a1.custid] 

insert into ReturningCustomers

   a1.custid as custid a2.ts as ts

Listing 5

To speed up or to distribute queries, Siddhi offers the option of forming “Partitions”. To do so, the events are divided into Partitions based on certain criteria. Listing 6 shows an example in which the orders are divided based on their ID, so that the time between order and dispatch can be calculated.

define partition oderParition by PizzaOrder.id, PizzaDone.oid, PizzaDelivered.oid  

select from PizzaOder as o ->PizzaDone as p -> PizzaDelivered as d

insert into OrderTimes (p.ts-o.ts) as time2Preprae, (d.ts-p.ts) as time2Delivery

   partition by oderParition

 Listing 6

Forming a Partition is useful when a query can be calculated more quickly within a Partition than when used on the whole stream of events. This is the case with the query in Listing 6. In this case the partitioning also has the effect of making the query easier to design because it does not need to take into consideration any events from other orders. Furthermore, Partitions are a means of scaling because Partitions can be divided between various nodes.

Finally, event tables offer the option of temporarily saving events in a table. Events can be inserted into an event table, be deleted from it, and combined with incoming events using a join operation. Tables are always useful when events are to be held for processing for a long time (e.g. over days or weeks) and when the windows held in the memory are thereby pushed to their limits. Architecturally, the use of event tables should be thought through carefully because batch processing might be more suitable depending on the specific situation.

Siddhi without WSO2 CEP
Siddhi can also be operated outside the WSO2 CEP server. Listing 7 shows how Siddhi can be instantiated in a Java program. The SiddhiManager is the central component. This makes it possible to define streams and queries, and to send and receive events.

/ Create Siddhi Manager

SiddhiManager siddhiManager = new SiddhiManager();

siddhiManager.defineStream("define stream cseEventStream ( symbol string, price float, volume int )");

siddhiManager.addQuery("from cseEventStream [ price >= 50 ] " +

   "select symbol, price " +

   "insert into StockQuote ;");

siddhiManager.addCallback("StockQuote", new StreamCallback() {

           @Override

           public void receive(Event[] events) {

               EventPrinter.print(events);

           }

       });

InputHandler inputHandler = siddhiManager.getInputHandler("cseEventStream");

inputHandler.send(new Object[]{"IBM", 75.6f, 100});

Thread.sleep(500);

siddhiManager.shutdown();      

Listing 7

Example

For anyone who wants to try out Siddhi and WSO2 CEP, the access log example outlined in the introduction is available in three varieties on Github [7]. The first two varieties use WSO2 CEP while the third variety uses Siddhi directly within a Java program. The two WSO2 CEP examples differ in that the graphic configuration screen is used in the first case, while in the second case XML configuration files are produced that can be deployed directly on the server.

Conclusion

Siddhi allows access to CEP and thereby makes it possible to process events in real time. Siddhi can be used as a part of the WSO2 Complex Event Processor or be directly integrated into your own applications. Which of the two varieties is ultimately used depends on the respective requirements of the project. The WSO2 CEP is more suitable as a means of having a quick-start in CEP because the initial barrier to entry is significantly lower thanks to the graphic administration screens.

References and Sources

[1] https://github.com/wso2/siddhi 

[2] http://wso2.com/products/complex-event-processor/ 

[3] http://www.eaipatterns.com/docs/EDA.pdf 

[4] http://srinathsview.blogspot.ch/2014/12/realtime-analytics-with-big-data-what.html 

[5]https://docs.wso2.com/display/CEP310/WSO2+Complex+Event+Processor+Documentation 

[6] http://wso2.com/products/complex-event-processor/ 

[7] https://github.com/ungerts/siddhi-cep-example 

Full API lifecycle Management Selection Guide

WHITEPAPER

smartmockups l0qqucke