Discover our knowledge. Read our blogs!

Learn more

We build all our solutions with WSO2 and we are proud that we are Platinum Value-Added Reseller of WSO.

Learn more

Two key features of WSO2 Streaming Integrator

7 min read

Two key features of WSO2 Streaming IntegratorWSO2 Streaming Integrator (SI) is a streaming data processing server that integrates and takes action on streaming data. WSO2 Streaming Integrator can be used for real time ETL, working with streaming messaging systems like Kafka et cetera. In this blog, I will highlight two key features of transforming and correlation of the WSO2 Streaming Integrator. Let’s start by setting up the development environment first.

Developing in Streaming Integrator / Streaming Integrator Tooling

Visit the WSO2 website and download the SI Tooling. You can start the tooling by running tooling.sh inside the bin directory. Once the tooling has started, you can open the console by opening http://localhost:9390/editor in your browser. In this editor you can develop and test your Siddhi query.

Showcase: Transforming and Correlation

Transformation

To show this functionality, I considered a scenario where the SI will receive user related data using http endpoint and merge first and last name, store as full name along with rest of user data.

Below is the JSON payload will be send to the SI:

yenlo_blog_wso2-streaming-integrator_figure-1

Under the http://localhost:9390/editor I developed the following Siddhi query:

yenlo_blog_wso2-streaming-integrator_figure-2

Key takeaways from above example

  • @source: define how the SI will receive data as event. In my example I used http. Protocols such asTCP,Kafka, and et cetera, are also supported. After the data is received, it’s converted into to a Siddhi event and passed to a stream for processing. In the above example I exposed an endpoint http://localhost:8005/UserInfoAPI using the url attribute.
  • @map: helps convert the input data stream to Siddhi stream. In this example I mapped the JSON attributes to corresponding Siddhi attributes. Theon.missing.attribute is by default set to true. I have set it to false, so even if the input JSON has missing attributes, the Siddhi query will still work.
  • @sink: publishes events arriving into the stream to external endpoints (such asemail,TCP,Kafka,HTTP, et cetera.). In my example, I have log type sink.

yenlo_blog_wso2-streaming-integrator_figure-3

Converting to XML

In some cases, JSON needs be converted to XML. It can be done by defining type as xml @map(type='xml').

yenlo_blog_wso2-streaming-integrator_figure-4

The output can be seen as below.

yenlo_blog_wso2-streaming-integrator_figure-5

Deploying to Server

Until now, all our examples were running on the SI Tooling. Next step is to deploy our Siddhi application on the SI server. To do this, visit the WSO2 website, download SI and start the server server.sh navigating to /bin folder.

Click Deploy from SI Tooling to start deploying to server, as shown below:

yenlo_blog_wso2-streaming-integrator_figure-6

Select the Siddhi apps and the SI server where you want to deploy the Siddhi app. I am deploying my apps in the SI running on my local machine.

yenlo_blog_wso2-streaming-integrator_figure-7

Below are the logs that confirm Siddhi Apps are deployed and executed. Note: Please stop the SI Tooling if you are running the SI on a single machine.

yenlo_blog_wso2-streaming-integrator_figure-8

Correlation

Data streaming into SI can be correlated using pattern or sequence. Siddhi supports two types of pattern correlation: counting pattern and logical pattern. In my next example I will demonstrate both.

Counting Pattern

Allows to match multiple events received with matching conditions and the number of events matched per condition can be restricted by condition postfixes.

Input Stream

Consider the following single stream that contains three events. Our goal is to find the event that has latest ‘enrolment’ for a particular country, city and reference number, where we consider the previous ‘enrolment’ event to be an error.

yenlo_blog_wso2-streaming-integrator_figure-9

In the below code (line 24), we correlate every input event e1 with the subsequent event e2 based on country, city and reference with a filter condition for which attribute with ’enrolment’ pattern and a postfix condition with a minimum matching event occurrence of 1 to a maximum of unlimited. The event with latest enrolment(what) is considered successful and the previous ones as errors. The agent is calculated based on the last who attribute, and the time is retrieved from datetime using Javascript function.

yenlo_blog_wso2-streaming-integrator_figure-10

As shown in the input, there are three events where two of the events belong a particular country, city and reference. The selection query is executed only once as the first and third event meet the condition. As defined in our goal, we would consider the third event with enrolment(what) to be a success and the first one will be considered as an error.

Below is the result.

yenlo_blog_wso2-streaming-integrator_figure-11

Logical Pattern

To match events logically or correlate through logical relationship.

In the example below, the goal is to find the event, whose datetime is 1 day longer among the various events received within 5 minute timeframe for a particular country, city and reference.

Input Steam

yenlo_blog_wso2-streaming-integrator_figure-12

Siddhi Query

yenlo_blog_wso2-streaming-integrator_figure-13

Output Result

yenlo_blog_wso2-streaming-integrator_figure-14

Care to share?
   
Picture of Kashmira Ray
Published May 8, 2020

Kashmira Ray

Kashmira has more than 4 years of experience working in Java/.Net development with expert knowledge of relational databases, and product development. It is her passion to develop state of the art ‘products’ using java/.Net and web services.

Responses

Stay up to date with the latest articles