WSO2 Streaming Integrator (SI) is a streaming data processing server that integrates and takes action on streaming data. WSO2 Streaming Integrator can be used for real time ETL, working with streaming messaging systems like Kafka et cetera. In this blog, I will highlight two key features of transforming and correlation of the WSO2 Streaming Integrator. Let’s start by setting up the development environment first.
Developing in Streaming Integrator / Streaming Integrator Tooling
Visit the WSO2 website and download the SI Tooling. You can start the tooling by running tooling.sh inside the bin directory. Once the tooling has started, you can open the console by opening http://localhost:9390/editor in your browser. In this editor you can develop and test your Siddhi query.
Showcase: Transforming and Correlation
To show this functionality, I considered a scenario where the SI will receive user related data using http endpoint and merge first and last name, store as full name along with rest of user data.
Below is the JSON payload will be send to the SI:
Under the http://localhost:9390/editor I developed the following Siddhi query:
Key takeaways from above example
- @source: define how the SI will receive data as event. In my example I used http. Protocols such asTCP,Kafka, and et cetera, are also supported. After the data is received, it’s converted into to a Siddhi event and passed to a stream for processing. In the above example I exposed an endpoint http://localhost:8005/UserInfoAPI using the url attribute.
- @map: helps convert the input data stream to Siddhi stream. In this example I mapped the JSON attributes to corresponding Siddhi attributes. Theon.missing.attribute is by default set to true. I have set it to false, so even if the input JSON has missing attributes, the Siddhi query will still work.
- @sink: publishes events arriving into the stream to external endpoints (such asemail,TCP,Kafka,HTTP, et cetera.). In my example, I have log type sink.
Converting to XML
In some cases, JSON needs be converted to XML. It can be done by defining type as xml @map(type='xml').
The output can be seen as below.
Deploying to Server
Until now, all our examples were running on the SI Tooling. Next step is to deploy our Siddhi application on the SI server. To do this, visit the WSO2 website, download SI and start the server server.sh navigating to /bin folder.
Click Deploy from SI Tooling to start deploying to server, as shown below:
Select the Siddhi apps and the SI server where you want to deploy the Siddhi app. I am deploying my apps in the SI running on my local machine.
Below are the logs that confirm Siddhi Apps are deployed and executed. Note: Please stop the SI Tooling if you are running the SI on a single machine.
Data streaming into SI can be correlated using pattern or sequence. Siddhi supports two types of pattern correlation: counting pattern and logical pattern. In my next example I will demonstrate both.
Allows to match multiple events received with matching conditions and the number of events matched per condition can be restricted by condition postfixes.
Consider the following single stream that contains three events. Our goal is to find the event that has latest ‘enrolment’ for a particular country, city and reference number, where we consider the previous ‘enrolment’ event to be an error.
As shown in the input, there are three events where two of the events belong a particular country, city and reference. The selection query is executed only once as the first and third event meet the condition. As defined in our goal, we would consider the third event with enrolment(what) to be a success and the first one will be considered as an error.
Below is the result.
To match events logically or correlate through logical relationship.
In the example below, the goal is to find the event, whose datetime is 1 day longer among the various events received within 5 minute timeframe for a particular country, city and reference.