info@yenlo.com
ned
Menu
Enterprise Integration 13 min

Efficiënte Verwerking van Grote Bestanden met Apache Camel (Deel 1)

Ontgrendel de kracht van Apache Camel voor efficiënte verwerking van grote bestanden! In deze blog gidst Ajanthan Eliyathamby, onze Integratiespecialist, je door de XML-DSL en Java DSL-methoden.

Ajanthan Eliyathamby Yenlo
Ajanthan Eliyathamby
Integratie Expert
Apache Camel Berichtenstroom

Een Gids Gebruikmakend van Java / XML-DSL en Spring Boot

Bestandsbewerkingen vormen een essentieel onderdeel van Enterprise Integratie. Het is eigenlijk een van de EIP-patronen, dus het is heel gebruikelijk en we komen veel verschillende vormen en gebruiksscenario’s tegen.

Een van de meer uitdagende gebruiksscenario’s is wanneer de bestanden groot zijn, tot wel 10 megabytes, en verwerkt moeten worden. Voor een van onze klanten hebben we een Proof of Concept gemaakt die de meest efficiënte manier liet zien om 10-megabyte CSV-bestanden te verwerken.

We hebben het document in twee delen verdeeld, het eerste deel zal zich richten op de XML-DSL benadering en het tweede deel op JAVA-DSL. Alle broncode is te vinden op deze Yenlo Bitbucket URL.

Uitdagingen

De uitdagingen liggen vaak in het geheugengebruik (heap-grootte) en de verwerkingssnelheid. Het in het geheugen lezen van het bericht leidt vaak tot problemen zoals hoog resourcegebruik, gegevenscorruptie en trage verwerking.

Voor dit POC hebben we een product uit de Apache-stack genaamd Camel geselecteerd, dit is een open-source integratieproduct dat EIP ondersteunt en bij uitstek geschikt is voor deze taak. Waarom? Omdat de manier waarop Camel is opgezet, ons in staat stelt gebruik te maken van zijn domeinspecifieke taalmodellen, waarbij we meerdere opties hebben zoals Java, XML, Groovy, YAML enzovoort. Omdat we de opties hebben om uit te kiezen, kunnen we de prestatiebehoeften van onze integratie vergelijken met de kennisexpertise van het team en de DSL voor de ontwikkeling selecteren. Als het gaat om het lezen van bestanden, is de camel-functie voor gepagineerd lezen een van de mogelijkheden die veel aantrekt. De streamingoptie in Camel helpt ons om de gegevens op een gepagineerde manier te lezen en te verwerken op basis van de paginawaarde die we hebben ingesteld. Dit helpt om de geheugenproblemen te voorkomen die kunnen optreden tijdens de verwerking van grote bestanden, omdat het bestand niet in één keer volledig in het geheugen wordt gelezen.

In dit artikel bekijken we de meest gebruikte twee DSL’s: Java en op XML gebaseerde implementatie en tot slot vergelijken we het prestatieverschil tussen XML-DSL en Java-DSL. Deel 1 bevat de XML DSL-gebaseerde integratie en Deel 2 bevat de Java DSL-gebaseerde integratie.

Apache Camel Berichtenstroom

In Apache Camel is de Camel-context de container die alle fundamentele componenten bevat. Zodra de Route is geconfigureerd via DSL’s en aan de Camel-context is toegevoegd, wordt de route actief voor het verwerken van berichten. In een typisch scenario wordt het bericht verwerkt via elke configuratie die is gedefinieerd in de route, zoals het loggen van het oorspronkelijke bericht, het instellen van eigenschappen, het vertalen van het bericht, verdere verwerking via aangepaste bedrijfslogica en uiteindelijk wordt het overgedragen aan het eindpunt.

Implementatie Gebruiksscenario

Zoals eerder vermeld, gaan we in dit artikel een bestandsverwerkingsgebruiksscenario bouwen in zowel XML-DSL als Java-DSL, die de twee meest gebruikte en kenmerkende DSL’s zijn voor Apache Camel-integraties.

Hieronder het diagram dat het gebruiksscenario illustreert dat we gaan implementeren. We proberen twee routes te implementeren:

Bestand naar Onderwerp Route

Onderwerp naar Rest API Route

XML-DSL Implementatie

Voordat we met de implementatie beginnen, is het beter om de manier waarop we de code schrijven als een herbruikbaar component te ontwerpen. Het onderstaande diagram toont hoe we de XML-DSL Apache Camel-implementatie als een herbruikbaar component kunnen maken met behulp van de Route-sjablonenfunctie van Apache Camel.

Opmerking: De functie voor unit tests is niet opgenomen in dit gedeelte van het artikel voor XML-DSL, maar zal worden behandeld onder Java-DSL.

Ons doel hier is om een project voor camel-file-route-templates te maken in Spring Boot en de componentklassen en routes te hergebruiken in het project camel-file-integration-one.

Uitleg over camel-file-route-templates project

  • JsonAggregationStrategy.java

In deze context houdt onze aanpak in dat we eerst de informatie uit het CSV-bestand halen, vervolgens gegevensverwerking uitvoeren om een JSON-uitvoer te genereren met gespecificeerde velden. Bovendien streven we ernaar een gedefinieerd aantal regels uit het bestand tegelijkertijd te groeperen. Dit aggregatieproces wordt vergemakkelijkt door de in de applicatie.yml-bestand gespecificeerde Camel-configuratie onder de eigenschap “noOfLinesToReadAtOnce”. Voor het uitvoeren van deze aggregatie zullen we de volgende klasse gebruiken.

package com.camel.file.process.templates.aggregate;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;

@Component
public class JsonAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        String body = null;
        if (!oldBody.startsWith("[")) {
            body = "[ " + oldBody + ", " + newBody + " ]";
        } else{
            body = oldBody.replace("]", "") + ", " + newBody + " ]";
        }
        oldExchange.getIn().setBody(body);
        return oldExchange;
    }
}
  • TimeGap.java

Utils-pakket met TimeGap.java: dit wordt gebruikt om de procesduur te berekenen, wat nuttig kan zijn voor het vergelijken van de prestaties.

package com.camel.file.process.templates.utils;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;

@Component
public class TimeGap {
    public String calculateTimeDifference(Date startTime, Date endTime) {
        long diffInMillis = endTime.getTime() - startTime.getTime();
        long seconds = TimeUnit.MILLISECONDS.toSeconds(diffInMillis);
        long minutes = TimeUnit.MILLISECONDS.toMinutes(diffInMillis);
        long hours = TimeUnit.MILLISECONDS.toHours(diffInMillis);
        return String.format("%d hours, %d minutes, %d seconds %d milliseconds", hours, minutes, seconds, diffInMillis);
    }
}
  • FileRouteTemplateApplication.java
package com.camel.file.process.templates;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FileRouteTemplatesApplication {

    public static void main(String[] args) {
        SpringApplication.run(FileRouteTemplatesApplication.class, args);
    }

}

Onder het resources/templatesbestand file-to-topic.xml: Deze routetemplate is ontworpen voor het bewaken van een bestandslocatie, het ophalen van de inhoud van het bestand, het verwerken ervan via streaming, het segmenteren ervan op basis van CSV-regels en het uiteindelijk publiceren naar een onderwerp.

<?xml version="1.0" encoding="UTF-8"?>
<!-- This is a common template for csv/text file processing and generating and publish to Topic -->
<routeTemplates xmlns="http://camel.apache.org/schema/spring">
    <routeTemplate id="file-to-topic">
        <templateBean name="jsonBean" type="#class:com.camel.file.process.templates.aggregate.JsonAggregationStrategy"
                      beanType="com.camel.file.process.templates.aggregate.JsonAggregationStrategy"/>
        <templateBean name="timeGapBean" type="#class:com.camel.file.process.templates.utils.TimeGap"
                      beanType="com.camel.file.process.templates.utils.TimeGap"/>
        <route id="{{file-to-topic.routeId}}">
            <from uri="{{file-to-topic.file.uri}}"/>
            <log message="Starting to process big file: ${header.CamelFileName} and ${header.camelFileLength} Bytes"
                 loggingLevel="INFO"/>
            <setProperty name="startTime">
                <simple>${date:now}</simple>
            </setProperty>
            <split streaming="true">
                <tokenize token="{{file-to-topic.file.token}}"
                          skipFirst="{{file-to-topic.file.noOfLinesToSkip}}"
                          group="{{file-to-topic.file.noOfLinesToReadAtOnce}}"/>
                <log message="Message Before Splitting: ${body}"/>
                <unmarshal>
                    <bindy type="Csv" classType="{{file-to-topic.mapperClass}}"/>
                </unmarshal>
                <split aggregationStrategy="{{jsonBean}}">
                    <simple>${body}</simple>
                    <bean beanType="{{file-to-topic.processorClass}}"/>
                    <marshal>
                        <json library="Jackson"/>
                    </marshal>
                    <log message="Message Sent after Processing: ${body}"/>
                </split>
                <log message="Message Sent after Splitting: ${body}" loggingLevel="INFO"/>
                <to uri="{{file-to-topic.endpoint.uri}}"/>
            </split>
            <setProperty name="endTime">
                <simple>${date:now}</simple>
            </setProperty>
            <log message="Done processing big file: ${header.CamelFileName}" loggingLevel="INFO"/>
            <to uri="bean:{{timeGapBean}}?method=calculateTimeDifference(${exchangeProperty.startTime},${exchangeProperty.endTime})"/>
            <log message="Time difference: ${body}" loggingLevel="INFO"/>
        </route>
    </routeTemplate>
</routeTemplates>

Onder het resources/templatesbestand topic-to-rest.xml: Deze routetemplate is ontworpen om berichten uit het onderwerp op te halen en ze vervolgens door te sturen naar een backend, met de mogelijkheid tot heraflevering.

<?xml version="1.0" encoding="UTF-8"?>
<!-- This is a common template for listening to a topic and publish to a REST endpoint -->
<routeTemplates xmlns="http://camel.apache.org/schema/spring">
    <routeTemplate id="topic-to-rest">
        <route id="{{topic-to-rest.routeId}}">
            <from uri="{{topic-to-rest.listener.uri}}"/>
            <throttle timePeriodMillis="{{topic-to-rest.receiver.throttle.lockPeriodMilliSeconds}}">
                <constant>{{topic-to-rest.receiver.throttle.requestCount}}</constant>
            </throttle>
            <setHeader name="Content-Type">
                <constant>application/json</constant>
            </setHeader>
            <setHeader name="Authorization">
                <constant>Bearer myToken {{topic-to-rest.receiver.token}}</constant>
            </setHeader>
            <onException>
                <exception>org.apache.camel.http.base.HttpOperationFailedException</exception>
                <onWhen>
                    <simple>${exception.statusCode} == 422</simple>
                </onWhen>
                <redeliveryPolicy maximumRedeliveries="{{topic-to-rest.receiver.reDelivery.attempts}}"
                                  redeliveryDelay="{{topic-to-rest.receiver.reDelivery.delay}}"/>
                <handled>
                    <constant>true</constant>
                </handled>
                <log message="HTTP error occurred with status ${exception.statusCode}. Response body: ${exception.message}"/>
                <to uri="{{topic-to-rest.receiver.reDelivery.deadLetterQueue}}"/>
            </onException>
            <to uri="{{topic-to-rest.receiver.uri}}"/>
            <choice>
                <when>
                    <simple>${header.CamelHttpResponseCode} == 200</simple>
                    <log message="Message Successfully sent to Rest Endpoint and Received status code: ${header.CamelHttpResponseCode}"/>
                </when>
            </choice>
        </route>
    </routeTemplate>
</routeTemplates>
  • pom.xml

Zie de voorbeeld pom.xml op https://bitbucket.org/yenlo/yenlo_camel/src/master/xml-dsl/camel-file-route-templates/pom.xml

Hierin is een belangrijk onderdeel in de pom.xml:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <goals>
                <goal>jar</goal>
            </goals>
            <phase>package</phase>
            <configuration>
                <classifier>library</classifier>
            </configuration
        </execution>
    </executions>
</plugin>

Dit is nodig om het project zo te verpakken dat het als een bibliotheekproject kan worden gebruikt.

Voer mvn clean install uit: hiermee wordt de jar in de lokale maven-repo geïmplementeerd zodat we deze kunnen hergebruiken in ons volgende project.

Wanneer u de hoofd-jar die door Spring Boot is gegenereerd, uitpakt, ziet u de projectstructuur zoals hieronder.

Als we dit als afhankelijkheid opgeven, zullen de klassen niet kunnen worden hergebruikt en zal het fouten geven zoals ‘klasse niet gevonden’. Daarom hebben we de configuratie voor maven-jar-plugin, die een jar met -library.jar zal genereren. Als u dat jar-bestand uitpakt:

Dit is nu geschikt om als afhankelijkheid te fungeren.

Opmerking: er zijn ook enkele extra afhankelijkheden toegevoegd aan pom.xml die nodig zijn voor het uitvoeren van de routetemplates. Omdat we deze in het gemeenschappelijke project opnemen, hoeven we deze niet toe te voegen in het volgende project.

Uitleg over camel-file-integration-one project

Het belangrijkste onderdeel om op te merken is het gedeelte afhankelijkheid, dat het vorige sjabloonproject toevoegt als een afhankelijkheid.

  • pom.xml
    Zie de pom.xml op https://bitbucket.org/yenlo/yenlo_camel/src/master/xml-dsl/camel-file-integration-one/pom.xml

Het belangrijkste onderdeel om op te merken is het gedeelte afhankelijkheid, dat het vorige sjabloonproject toevoegt als een afhankelijkheid.

<dependency>
    <groupId>com.camel.file.process.templates</groupId>
    <artifactId>camel-file-route-templates</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <classifier>library</classifier>
</dependency>

En vervolgens het deel dat de sjablonen in de Camel Context laadt. Dit is een van de overheads die wordt ondervonden bij de implementatie van het gemeenschappelijke XML-DSL-project. Hoewel de sjablonenmap aan het afhankelijkheids-JAR-bestand is toegevoegd, wordt deze niet geladen naar de Camel Context bij gebruik in het camel-file-integration-one project. Om dit te overwinnen, gebruiken we de maven-dependency-plugin.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.2.0</version>
    <executions>
        <execution>
            <id>unpack-dependency</id>
            <phase>generate-resources</phase>
            <goals>
                <goal>unpack</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <artifactItem>
                        <groupId>com.camel.file.process.templates</groupId>
                        <artifactId>camel-file-route-templates</artifactId>
                        <version>1.0.0-SNAPSHOT</version>
                        <type>jar</type>
                        <overWrite>true</overWrite>
                        <outputDirectory>${basedir}/src/main/resources/common-templates</outputDirectory>
                        <includes>BOOT-INF/classes/templates/*.xml</includes>
                    </artifactItem>
                </artifactItems>
            </configuration>
        </execution>
    </executions>
</plugin>
  • Pojo/InputCsvMapper.java : Dit wordt gebruikt om de kopteksten van het csv-bestand in kaart te brengen, zodat we het kunnen verwerken met transformaties voor specifieke velden.
package com.camel.file.process.camelintegrationone.pojo;

import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;

@Data
@CsvRecord(separator = ",")
public class InputCsvMapper {

    @DataField(pos = 1, columnName = "id")
    private int id;

    @DataField(pos = 2, columnName = "firstname")
    private String firstName;

    @DataField(pos = 3, columnName = "lastname")
    private String lastName;

    @DataField(pos = 4, columnName = "email")
    private String email;

    @DataField(pos = 5, columnName = "email2")
    private String email2;

    @DataField(pos = 6, columnName = "profession")
    private String profession;

}
  • Process/InputCsvProcessor.java : Deze implementatie zal het bericht transformeren naar een gedefinieerd formaat.
package com.camel.file.process.camelintegrationone.process;

import com.camel.file.process.camelintegrationone.pojo.InputCsvMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.json.JsonObject;

@Slf4j
public class InputCsvProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {

        InputCsvMapper csvRecord = exchange.getIn().getBody(InputCsvMapper.class);

        JsonObject jsonObject = new JsonObject();
        jsonObject.put("updatedId", csvRecord.getId());
        jsonObject.put("updateName", csvRecord.getFirstName());

        exchange.getIn().setBody(jsonObject);
    }
}
  • route-builder.xml : Builders die de routebouwers bevatten.


<?xml version="1.0" encoding="UTF-8"?>
<templatedRoutes id="camel" xmlns="http://camel.apache.org/schema/spring">
    <templatedRoute routeTemplateRef="file-to-topic"/>
    <templatedRoute routeTemplateRef="topic-to-rest"/>
</templatedRoutes>
  • De waarden van deze 4 elementen moeten worden ingesteld in de application.yml
camel:
  springboot:
    name: camel-file-integration-one
    routes-include-pattern: classpath:common-templates/**/templates/*.xml,classpath:builders/*.xml,classpath:templates/*.xml

logging:
  level:
    org:
      apache:
        camel: DEBUG

spring:
  activemq:
    broker-url: "tcp://XXXXXXXXX:61616"
    user: XXXXXX
    password: XXXXXX

file-to-topic:
  routeId: "file-to-topic-route"
  file:
    uri: "file:src/main/resources?noop=true&delay=20000&antInclude=file_*.csv"
    token: "\n"
    noOfLinesToSkip: 1
    noOfLinesToReadAtOnce: 2
  mapperClass: "com.camel.file.process.camelintegrationone.pojo.InputCsvMapper"
  processorClass: "com.camel.file.process.camelintegrationone.process.InputCsvProcessor"
  endpoint:
    uri: "activemq:topic:camel.testtopic"

topic-to-rest:
  routeId: "topic-to-rest-route"
  listener:
    uri: "activemq:topic:camel.testtopic"
  receiver:
    uri: "https://run.mocky.io/v3/c18b3268-7472-4061-8132-1ba9dc15c3dd"
    #uri: "https://mock.codes/422"
    token: "12323444552211"
    reDelivery:
      attempts: 3
      delay: 5000
      deadLetterQueue: "activemq:queue:dead-letter"
    throttle:
      lockPeriodMilliSeconds: 10000
      requestCount: 1

Nadat mvn clean package is uitgevoerd, kunnen we zien dat de afhankelijkheidssjablonen worden geladen naar de gemeenschappelijke sjablonenmap.

Ook de gebouwde target/classes-map zal deze sjablonen bevatten. Omdat we onze application.yml hebben geconfigureerd met het “routes-include-pattern”.

camel:
  springboot:
    name: camel-file-integration-one
    routes-include-pattern: classpath:common-templates/**/templates/*.xml,classpath:builders/*.xml,classpath:templates/*.xml

Het zal de sjablonen laden naar de Camel Context wanneer we de Spring Boot applicatie starten.

Voer het typische Spring Boot startcommando uit: java -jar target/camel-file-integration-one-1.0.0-SNAPSHOT.jar

Dat concludeert onze bespreking van de XML-DSL-implementatie, waarbij we de uitdagingen hebben aangepakt die zich voordoen bij het integreren van de verwerking van grote bestanden. We hebben onderzocht hoe we deze uitdagingen kunnen aanpakken met behulp van de Apache Camel DSL – XML-DSL. In onze aankomende blog zullen we ingaan op de implementatie van de Java DSL voor dezelfde use case, en zullen we ook een prestatievergelijking maken tussen de twee benaderingen: XML-DSL en Java DSL. Hou ons in de gaten!

ned
Sluiten