info@yenlo.com
deu
Menu
Enterprise Integration 13 min

Effiziente Verarbeitung großer Dateien mit Apache Camel (Teil 1)

Entfesseln Sie die Kraft von Apache Camel für die effiziente Verarbeitung großer Dateien! In diesem Blog führt Sie Ajanthan Eliyathamby, unser Integrationsspezialist, durch die Methoden XML-DSL und Java DSL.

Ajanthan Eliyathamby Integration Expert
Ajanthan Eliyathamby
Integrationsexperte
Apache Camel Messa kopie

Ein Leitfaden zur Verwendung von Java / XML-DSL und Spring Boot

Dateioperationen sind ein wesentlicher Bestandteil der Unternehmensintegration. Es handelt sich tatsächlich um eines der EIP-Muster, daher ist es sehr verbreitet und wir stoßen auf viele Formen und Anwendungsfälle.

Einer der anspruchsvolleren Anwendungsfälle tritt auf, wenn die Dateien groß sind, bis zu 10 Megabyte, und verarbeitet werden müssen. Für einen unserer Kunden haben wir einen Proof of Concept erstellt, der die effizienteste Möglichkeit zur Verarbeitung von 10-Megabyte-CSV-Dateien zeigt.

Wir haben das Dokument in zwei Teile aufgeteilt. Der erste Teil wird sich auf den XML-DSL-Ansatz konzentrieren und der zweite auf JAVA-DSL. Der gesamte Quellcode ist in der folgenden Yenlo-Bitbucket-URL zu finden.

Herausforderungen

Die Herausforderungen liegen oft im Speicherverbrauch (Heap-Größe) und in der Verarbeitungsgeschwindigkeit. Das Lesen der Nachricht im Speicher führt oft zu Problemen wie hoher Ressourcennutzung, Datenkorruption und langsamer Verarbeitung.

Für diesen POC haben wir ein Produkt aus dem Apache-Stack namens Camel ausgewählt. Dabei handelt es sich um ein Open-Source-Integrationsprodukt, das EIP unterstützt und ideal für diese Aufgabe geeignet ist. Warum? Aufgrund der Art und Weise, wie Camel eingerichtet ist, können wir seine domänenspezifischen Sprachmodelle verwenden, von denen es mehrere Optionen wie Java, XML, Groovy, YAML usw. gibt. Da wir die Auswahlmöglichkeiten haben, können wir die Leistungsanforderungen unserer Integration und das Wissen der Teams vergleichen und die DSL für die Entwicklung auswählen. Wenn es um das Dateilesen geht, ist die camel-Funktion des seitenweisen Lesens eine der Fähigkeiten, die viel Aufmerksamkeit erregen kann. Die Streaming-Option in Camel hilft uns, die Daten paginiert zu lesen und basierend auf dem festgelegten Paginierungswert zu verarbeiten. Dies hilft, die Speicherprobleme zu vermeiden, die während der Verarbeitung großer Dateien auftreten können, da die Datei nicht auf einmal vollständig in den Speicher geladen wird.

In diesem Artikel betrachten wir die beiden am häufigsten verwendeten DSLs: Java und XML-basierte Implementierung, und schließlich werden wir den Leistungsunterschied zwischen XML-DSL und Java-DSL vergleichen. Der Teil-1 wird die XML DSL-basierte Integration und der Teil-2 die Java DSL-basierte Integration enthalten.

Apache Camel Nachrichtenfluss

Im Apache Camel ist der Camel-Kontext der Container, der alle grundlegenden Komponenten enthält. Sobald die Route über DSLs konfiguriert und dem Camel-Kontext hinzugefügt wurde, wird die Route aktiv und beginnt mit der Verarbeitung von Nachrichten. In einem typischen Szenario wird die Nachricht durch jede in der Route definierte Konfiguration verarbeitet, wie z.B. Protokollierung der ursprünglichen Nachricht, Festlegen von Eigenschaften, Übersetzen der Nachricht, weitere Verarbeitung durch benutzerdefinierte Geschäftslogiken und schließlich Übergabe an das Endpunktziel.

wp advanced api management guide
Whitepaper Leitfaden Für Die Erweiterte API-Verwaltung

Empfehlungen, Um Ihnen Zu Helfen, Ihre Eigene Architektur Zu Entwerfen Und Dabei Eine Auswahl Zu Treffen

Jetzt herunterladen

Implementierungsanwendungsfall

Wie bereits erwähnt, werden wir in diesem Artikel einen Dateiverarbeitungsanwendungsfall sowohl in XML-DSL als auch in Java-DSL erstellen, die beiden am häufigsten verwendeten und vorgestellten DSLs für Apache Camel Integrationen.

Unten sehen Sie das Diagramm, das den Anwendungsfall illustriert, den wir implementieren werden. Wir beabsichtigen, zwei Routen zu implementieren:

Route von Datei zu Thema

Route von Thema zu REST-API

XML-DSL Implementierung

Bevor wir mit der Implementierung beginnen, ist es besser, den Code als wiederverwendbare Komponente zu entwerfen. Das folgende Diagramm zeigt, wie wir die XML-DSL-Implementierung von Apache Camel als wiederverwendbare Komponente mithilfe der Routenvorlagenfunktion von Apache Camel erstellen können.

Hinweis: Die Funktion zum unit testing ist in diesem Abschnitt des Artikels für XML-DSL nicht enthalten, wird aber unter Java-DSL behandelt.

Unsere Absicht besteht darin, ein Projekt für die Kamel-Datei-Routenvorlagen in Spring Boot zu erstellen und die Komponentenklassen und Routen im Projekt für die Integration von Kamel-Dateien wiederzuverwenden.

Erklärung zum Projekt camel-file-route-templates

  • JsonAggregationStrategy.java

In diesem Kontext beinhaltet unser Ansatz zunächst das Extrahieren der Informationen aus der CSV-Datei, dann die Verarbeitung der Daten zur Erzeugung einer JSON-Ausgabe mit bestimmten Feldern. Darüber hinaus beabsichtigen wir, eine definierte Anzahl von Zeilen aus der Datei gleichzeitig zu gruppieren. Dieser Aggregationsprozess wird durch die in der application.yml-Datei unter der Eigenschaft „noOfLinesToReadAtOnce“ festgelegte Kamelkonfiguration erleichtert. Um diese Aggregation durchzuführen, werden wir die folgende Klasse verwenden.

package com.camel.file.process.templates.aggregate;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;

@Component
public class JsonAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        String body = null;
        if (!oldBody.startsWith("[")) {
            body = "[ " + oldBody + ", " + newBody + " ]";
        } else{
            body = oldBody.replace("]", "") + ", " + newBody + " ]";
        }
        oldExchange.getIn().setBody(body);
        return oldExchange;
    }
}
  • TimeGap.java

Das Hilfsprogramm „TimeGap.java“: Es wird verwendet, um die Verarbeitungszeit zu berechnen, was nützlich sein kann, um die Leistung zu vergleichen.

package com.camel.file.process.templates.utils;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;

@Component
public class TimeGap {
    public String calculateTimeDifference(Date startTime, Date endTime) {
        long diffInMillis = endTime.getTime() - startTime.getTime();
        long seconds = TimeUnit.MILLISECONDS.toSeconds(diffInMillis);
        long minutes = TimeUnit.MILLISECONDS.toMinutes(diffInMillis);
        long hours = TimeUnit.MILLISECONDS.toHours(diffInMillis);
        return String.format("%d hours, %d minutes, %d seconds %d milliseconds", hours, minutes, seconds, diffInMillis);
    }
}
  • FileRouteTemplateApplication.java
package com.camel.file.process.templates;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FileRouteTemplatesApplication {

    public static void main(String[] args) {
        SpringApplication.run(FileRouteTemplatesApplication.class, args);
    }

}

In der Datei resources/templates file-to-topic.xml: Diese Routenvorlage wurde entwickelt, um einen Dateistandort zu überwachen, den Inhalt der Datei abzurufen, ihn durch Streaming zu verarbeiten, ihn aufgrund von CSV-Linien zu segmentieren und ihn schließlich an ein Thema zu senden.

<?xml version="1.0" encoding="UTF-8"?>
<!-- This is a common template for csv/text file processing and generating and publish to Topic -->
<routeTemplates xmlns="http://camel.apache.org/schema/spring">
    <routeTemplate id="file-to-topic">
        <templateBean name="jsonBean" type="#class:com.camel.file.process.templates.aggregate.JsonAggregationStrategy"
                      beanType="com.camel.file.process.templates.aggregate.JsonAggregationStrategy"/>
        <templateBean name="timeGapBean" type="#class:com.camel.file.process.templates.utils.TimeGap"
                      beanType="com.camel.file.process.templates.utils.TimeGap"/>
        <route id="{{file-to-topic.routeId}}">
            <from uri="{{file-to-topic.file.uri}}"/>
            <log message="Starting to process big file: ${header.CamelFileName} and ${header.camelFileLength} Bytes"
                 loggingLevel="INFO"/>
            <setProperty name="startTime">
                <simple>${date:now}</simple>
            </setProperty>
            <split streaming="true">
                <tokenize token="{{file-to-topic.file.token}}"
                          skipFirst="{{file-to-topic.file.noOfLinesToSkip}}"
                          group="{{file-to-topic.file.noOfLinesToReadAtOnce}}"/>
                <log message="Message Before Splitting: ${body}"/>
                <unmarshal>
                    <bindy type="Csv" classType="{{file-to-topic.mapperClass}}"/>
                </unmarshal>
                <split aggregationStrategy="{{jsonBean}}">
                    <simple>${body}</simple>
                    <bean beanType="{{file-to-topic.processorClass}}"/>
                    <marshal>
                        <json library="Jackson"/>
                    </marshal>
                    <log message="Message Sent after Processing: ${body}"/>
                </split>
                <log message="Message Sent after Splitting: ${body}" loggingLevel="INFO"/>
                <to uri="{{file-to-topic.endpoint.uri}}"/>
            </split>
            <setProperty name="endTime">
                <simple>${date:now}</simple>
            </setProperty>
            <log message="Done processing big file: ${header.CamelFileName}" loggingLevel="INFO"/>
            <to uri="bean:{{timeGapBean}}?method=calculateTimeDifference(${exchangeProperty.startTime},${exchangeProperty.endTime})"/>
            <log message="Time difference: ${body}" loggingLevel="INFO"/>
        </route>
    </routeTemplate>
</routeTemplates>

In der Datei resources/templates topic-to-rest.xml: Diese Routenvorlage ist so konzipiert, dass sie Nachrichten aus dem Thema abruft und sie anschließend an eine Backend weiterleitet, wobei auch die Möglichkeit einer erneuten Zustellung besteht.

<?xml version="1.0" encoding="UTF-8"?>
<!-- This is a common template for listening to a topic and publish to a REST endpoint -->
<routeTemplates xmlns="http://camel.apache.org/schema/spring">
    <routeTemplate id="topic-to-rest">
        <route id="{{topic-to-rest.routeId}}">
            <from uri="{{topic-to-rest.listener.uri}}"/>
            <throttle timePeriodMillis="{{topic-to-rest.receiver.throttle.lockPeriodMilliSeconds}}">
                <constant>{{topic-to-rest.receiver.throttle.requestCount}}</constant>
            </throttle>
            <setHeader name="Content-Type">
                <constant>application/json</constant>
            </setHeader>
            <setHeader name="Authorization">
                <constant>Bearer myToken {{topic-to-rest.receiver.token}}</constant>
            </setHeader>
            <onException>
                <exception>org.apache.camel.http.base.HttpOperationFailedException</exception>
                <onWhen>
                    <simple>${exception.statusCode} == 422</simple>
                </onWhen>
                <redeliveryPolicy maximumRedeliveries="{{topic-to-rest.receiver.reDelivery.attempts}}"
                                  redeliveryDelay="{{topic-to-rest.receiver.reDelivery.delay}}"/>
                <handled>
                    <constant>true</constant>
                </handled>
                <log message="HTTP error occurred with status ${exception.statusCode}. Response body: ${exception.message}"/>
                <to uri="{{topic-to-rest.receiver.reDelivery.deadLetterQueue}}"/>
            </onException>
            <to uri="{{topic-to-rest.receiver.uri}}"/>
            <choice>
                <when>
                    <simple>${header.CamelHttpResponseCode} == 200</simple>
                    <log message="Message Successfully sent to Rest Endpoint and Received status code: ${header.CamelHttpResponseCode}"/>
                </when>
            </choice>
        </route>
    </routeTemplate>
</routeTemplates>
  • pom.xml

Beachten Sie die Beispieldatei pom.xml unter https://bitbucket.org/yenlo/yenlo_camel/src/master/xml-dsl/camel-file-route-templates/pom.xml

Ein wichtiger Teil in der pom.xml ist:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <goals>
                <goal>jar</goal>
            </goals>
            <phase>package</phase>
            <configuration>
                <classifier>library</classifier>
            </configuration
        </execution>
    </executions>
</plugin>

Das ist erforderlich, um das Projekt so zu verpacken, dass es als Bibliotheksprojekt verwendet werden kann.

Führen Sie den Befehl mvn clean install aus: Dadurch wird die Jar-Datei im lokalen Maven-Repository bereitgestellt, damit sie in unserem nächsten Projekt wiederverwendet werden kann.

Wenn Sie die Haupt-Jar-Datei, die von Spring Boot generiert wurde, extrahieren und ansehen, sehen Sie die Projektstruktur wie unten dargestellt.

Wenn wir dies als Abhängigkeit angeben, können die Klassen nicht wiederverwendet werden und es werden Fehler für nicht gefundene Klassen angezeigt. Deshalb verwenden wir die Konfiguration maven-jar-plugin, die eine Jar-Datei mit dem Namen -library.jar generiert. Wenn Sie diese Jar-Datei extrahieren:

Diese eignet sich nun als Abhängigkeit.

Hinweis: Es wurden auch einige zusätzliche Abhängigkeiten zu pom.xml hinzugefügt, die für die Ausführung der Routenvorlagen benötigt werden. Da wir diese im gemeinsamen Projekt einschließen, müssen wir diese im nächsten Projekt nicht hinzufügen.

Erklärung zum camel-file-integration-one-Projekt

In diesem Projekt verwenden wir die allgemeinen Vorlagen, die im vorherigen Projekt erstellt wurden, und werden die Routen daraus erstellen.

  • pom.xml
    Siehe die pom.xml unter https://bitbucket.org/yenlo/yenlo_camel/src/master/xml-dsl/camel-file-integration-one/pom.xml

Der wichtige Teil, der beachtet werden muss, befindet sich im Abschnitt Abhängigkeit, der das vorherige Vorlagenprojekt als Abhängigkeit hinzufügt.

<dependency>
    <groupId>com.camel.file.process.templates</groupId>
    <artifactId>camel-file-route-templates</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <classifier>library</classifier>
</dependency>

Und dann der Teil, der die Vorlagen im Camel-Kontext lädt. Dies ist eine der Overheads, die bei der Implementierung des gemeinsamen XML-DSL-Projekts auftreten. Auch wenn der Templates-Ordner zur Abhängigkeits-JAR-Datei hinzugefügt wird, wird er im camel-file-integration-one-Projekt nicht in den Camel-Kontext geladen. Um dies zu umgehen, verwenden wir das maven-dependency-plugin.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.2.0</version>
    <executions>
        <execution>
            <id>unpack-dependency</id>
            <phase>generate-resources</phase>
            <goals>
                <goal>unpack</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <artifactItem>
                        <groupId>com.camel.file.process.templates</groupId>
                        <artifactId>camel-file-route-templates</artifactId>
                        <version>1.0.0-SNAPSHOT</version>
                        <type>jar</type>
                        <overWrite>true</overWrite>
                        <outputDirectory>${basedir}/src/main/resources/common-templates</outputDirectory>
                        <includes>BOOT-INF/classes/templates/*.xml</includes>
                    </artifactItem>
                </artifactItems>
            </configuration>
        </execution>
    </executions>
</plugin>
  • Pojo/InputCsvMapper.java: Dies wird verwendet, um die CSV-Dateikopfzeilen zuzuordnen, damit wir sie mit Transformationen für bestimmte Felder verarbeiten können.
package com.camel.file.process.camelintegrationone.pojo;

import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;

@Data
@CsvRecord(separator = ",")
public class InputCsvMapper {

    @DataField(pos = 1, columnName = "id")
    private int id;

    @DataField(pos = 2, columnName = "firstname")
    private String firstName;

    @DataField(pos = 3, columnName = "lastname")
    private String lastName;

    @DataField(pos = 4, columnName = "email")
    private String email;

    @DataField(pos = 5, columnName = "email2")
    private String email2;

    @DataField(pos = 6, columnName = "profession")
    private String profession;

}
  • Process/InputCsvProcessor.java: Diese Implementierung transformiert die Nachricht in ein definiertes Format.
package com.camel.file.process.camelintegrationone.process;

import com.camel.file.process.camelintegrationone.pojo.InputCsvMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.json.JsonObject;

@Slf4j
public class InputCsvProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {

        InputCsvMapper csvRecord = exchange.getIn().getBody(InputCsvMapper.class);

        JsonObject jsonObject = new JsonObject();
        jsonObject.put("updatedId", csvRecord.getId());
        jsonObject.put("updateName", csvRecord.getFirstName());

        exchange.getIn().setBody(jsonObject);
    }
}
  • route-builder.xml : Builder, die die Routenbuilder enthalten.


<?xml version="1.0" encoding="UTF-8"?>
<templatedRoutes id="camel" xmlns="http://camel.apache.org/schema/spring">
    <templatedRoute routeTemplateRef="file-to-topic"/>
    <templatedRoute routeTemplateRef="topic-to-rest"/>
</templatedRoutes>
  • Die Werte der oben genannten 4 Elemente müssen in der application.yml festgelegt werden.
camel:
  springboot:
    name: camel-file-integration-one
    routes-include-pattern: classpath:common-templates/**/templates/*.xml,classpath:builders/*.xml,classpath:templates/*.xml

logging:
  level:
    org:
      apache:
        camel: DEBUG

spring:
  activemq:
    broker-url: "tcp://XXXXXXXXX:61616"
    user: XXXXXX
    password: XXXXXX

file-to-topic:
  routeId: "file-to-topic-route"
  file:
    uri: "file:src/main/resources?noop=true&delay=20000&antInclude=file_*.csv"
    token: "\n"
    noOfLinesToSkip: 1
    noOfLinesToReadAtOnce: 2
  mapperClass: "com.camel.file.process.camelintegrationone.pojo.InputCsvMapper"
  processorClass: "com.camel.file.process.camelintegrationone.process.InputCsvProcessor"
  endpoint:
    uri: "activemq:topic:camel.testtopic"

topic-to-rest:
  routeId: "topic-to-rest-route"
  listener:
    uri: "activemq:topic:camel.testtopic"
  receiver:
    uri: "https://run.mocky.io/v3/c18b3268-7472-4061-8132-1ba9dc15c3dd"
    #uri: "https://mock.codes/422"
    token: "12323444552211"
    reDelivery:
      attempts: 3
      delay: 5000
      deadLetterQueue: "activemq:queue:dead-letter"
    throttle:
      lockPeriodMilliSeconds: 10000
      requestCount: 1

Sobald mvn clean package ausgeführt wird, können wir beobachten, dass die Abhängigkeitsvorlagen im Ordner common-templates geladen werden.

Auch im gebauten Zielordner (target/classes) werden diese Vorlagen vorhanden sein. Da wir unsere application.yml mit dem „routes-include-pattern“ konfiguriert haben.

camel:
  springboot:
    name: camel-file-integration-one
    routes-include-pattern: classpath:common-templates/**/templates/*.xml,classpath:builders/*.xml,classpath:templates/*.xml

Es wird die Vorlagen in den Camel-Kontext laden, wenn wir die Spring Boot-Anwendung starten.

Führen Sie den typischen Spring Boot Startbefehl aus: java -jar target/camel-file-integration-one-1.0.0-SNAPSHOT.jar

Damit schließen wir unsere Diskussion zur XML-DSL-Implementierung ab, in der wir die Herausforderungen behandelt haben, die bei der Integration der Verarbeitung großer Dateien auftreten können. Wir haben erkundet, wie man diese Herausforderungen mithilfe der Apache Camel DSL – XML-DSL angehen kann. In unserem kommenden Blogbeitrag werden wir uns mit der Implementierung der Java DSL für denselben Anwendungsfall befassen und auch einen Leistungsvergleich zwischen den beiden Ansätzen durchführen: XML-DSL und Java DSL. Bleiben Sie dran!

deu
Schließen
Was ist auf unserer Speisekarte