Ein Leitfaden mit Java / XML-DSL und Spring Boot
In unserem vorherigen Blog in dieser Serie, „Effiziente Verarbeitung großer Dateien mit Apache Camel„, haben wir uns auf XML-DSL konzentriert. Jetzt werden wir die Java-DSL für die Integration verwenden und die Leistung zwischen den beiden vergleichen.
Java-DSL Implementation
Genau wie wir es für die XML-DSL-Implementierung getan haben, werden wir zuerst die Projektstruktur auf eine wiederverwendbare Weise betrachten. Siehe das folgende Bild, das erklärt, wie dies für diese Beispielimplementierung strukturiert ist.
Das Vorlagenprojekt wird auch den Unit-Test enthalten, der „camel-test-spring-junit5“ verwendet.
Erläuterung des camel-file-templates-Projekts
Wie bereits im vorherigen Blog erwähnt, umfasst unser Ansatz zunächst das Extrahieren von Informationen aus der CSV-Datei, dann die Durchführung der Datenverarbeitung, um eine JSON-Ausgabe mit spezifischen Feldern zu generieren. Darüber hinaus streben wir danach, eine definierte Anzahl von Zeilen aus der Datei gleichzeitig zu gruppieren. Dieser Aggregationsprozess wird durch die in der application.yml-Datei unter dem Attribut „noOfLinesToReadAtOnce“ angegebene Camel-Konfiguration erleichtert. Um diese Aggregation durchzuführen, werden wir die folgende Klasse verwenden.
package com.integration.camel.file.common.route.aggregate;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;
@Component
public class JsonAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
String body = null;
if (!oldBody.startsWith("[")) {
body = "[ " + oldBody + ", " + newBody + " ]";
} else{
body = oldBody.replace("]", "") + ", " + newBody + " ]";
}
oldExchange.getIn().setBody(body);
return oldExchange;
}
}
Unter „utils“ haben wir TimeGap.java. Dies wird verwendet, um die Verarbeitungszeit zu berechnen, was nützlich sein kann, um die Leistung zu vergleichen.
package com.integration.camel.file.common.route.utils;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;
@Component
public class TimeGap {
public String calculateTimeDifference(Date startTime, Date endTime) {
long diffInMillis = endTime.getTime() - startTime.getTime();
long seconds = TimeUnit.MILLISECONDS.toSeconds(diffInMillis);
long minutes = TimeUnit.MILLISECONDS.toMinutes(diffInMillis);
long hours = TimeUnit.MILLISECONDS.toHours(diffInMillis);
return String.format("%d hours, %d minutes, %d seconds, %d milliseconds", hours, minutes, seconds, diffInMillis);
}
}
Dann haben wir unter Vorlagen zwei Arten von Vorlagen:
FileToTopicRouteTemplate.java
Diese Routenvorlage ist so konzipiert, dass sie einen Dateispeicherort überprüft, den Inhalt der Datei abruft, diesen über Streaming verarbeitet, ihn basierend auf CSV-Regeln segmentiert und ihn schließlich an ein Thema veröffentlicht.
package com.integration.camel.file.common.route.templates;
import com.integration.camel.file.common.route.aggregate.JsonAggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.BindyType;
import org.apache.camel.model.dataformat.JsonLibrary;
/*
* This Route Template created for the reuse of a typical File To Topic Route.
*/
public class FileToTopicRouteTemplate extends RouteBuilder {
private final CamelContext camelContext;
public FileToTopicRouteTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public void configure() throws Exception {
String routeId = camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}");
String fileUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.uri}}");
String token = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.token}}");
int noOfLinesToReadAtOnce = Integer.parseInt(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.noOfLinesToReadAtOnce}}"));
boolean skipFirstLine = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.skipFirstLine}}"));
boolean streaming = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.streaming}}"));
String inputMapperClassName = camelContext.resolvePropertyPlaceholders("{{file-to-topic.mapperClass}}");
String processorClass = camelContext.resolvePropertyPlaceholders("{{file-to-topic.processorClass}}");
String endpointUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.endpoint.uri}}");
routeTemplate("file-to-topic-route-template")
.templateBean("timeGapBean")
.typeClass("com.integration.camel.file.common.route.utils.TimeGap")
.end()
.from(fileUri)
.routeId(routeId)
.log(LoggingLevel.INFO, "Starting to process the file: ${header.CamelFileName} and ${header.camelFileLength} Bytes")
.setProperty("startTime", simple("${date:now}"))
.split(body().tokenize(token, noOfLinesToReadAtOnce , skipFirstLine)).streaming(streaming)
.log(LoggingLevel.DEBUG, "Message in process after the initial split: ${body}")
.unmarshal().bindy(BindyType.Csv, Class.forName(inputMapperClassName))
.split(body(), new JsonAggregationStrategy())
.log(LoggingLevel.DEBUG, "Message in process after the individual split: ${body}")
.bean(processorClass)
.marshal().json(JsonLibrary.Jackson)
.log(LoggingLevel.DEBUG, "Message in process at the end of individual split: ${body}")
.end()
.log(LoggingLevel.DEBUG, "Message will be sent after aggregation: ${body}")
.to(endpointUri)
.end()
.setProperty("endTime", simple("${date:now}"))
.log(LoggingLevel.INFO, "Done processing the file: ${header.CamelFileName}")
.to("bean:{{timeGapBean}}?method=calculateTimeDifference(${exchangeProperty.startTime},${exchangeProperty.endTime})")
.log(LoggingLevel.INFO,"Time Taken to complete the route " + routeId + " on ${date:now} ${body}");
}
}
Empfehlungen, Um Ihnen Zu Helfen, Ihre Eigene Architektur Zu Entwerfen Und Dabei Eine Auswahl Zu Treffen
Jetzt herunterladenTopicToRestRouteTemplate.java
Diese Routenvorlage ist so konzipiert, dass sie Nachrichten aus dem Thema abruft und diese dann an eine Backend-Anwendung weiterleitet, wobei die Möglichkeit der erneuten Bereitstellung besteht.
package com.integration.camel.file.common.route.templates;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.http.base.HttpOperationFailedException;
/*
* This Route Template created for the reuse of a typical Topic To Rest Route.
*/
public class TopicToRestRouteTemplate extends RouteBuilder {
private final CamelContext camelContext;
public TopicToRestRouteTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public void configure() {
String routeId = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.routeId}}");
String listenerUri = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.listener.uri}}");
String lockPeriodMilliSeconds = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.throttle.lockPeriodMilliSeconds}}");
String requestCount = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.throttle.requestCount}}");
String receiverToken = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.token}}");
String receiverDelay = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.reDelivery.delay}}");
String deadLetterQueue = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.reDelivery.deadLetterQueue}}");
String receiverUri = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.uri}}");
String reDeliveryAttempts = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.reDelivery.attempts}}");
routeTemplate("topic-to-rest-route-template")
.from(listenerUri)
.routeId(routeId)
.throttle(constant(requestCount)).timePeriodMillis(lockPeriodMilliSeconds)
.onException(HttpOperationFailedException.class)
.onWhen(simple("${exception.statusCode} == 422"))
.maximumRedeliveries(reDeliveryAttempts)
.redeliveryDelay(receiverDelay)
.handled(true)
.log(LoggingLevel.ERROR, "HTTP error occurred with status ${exception.statusCode}. Response body: ${exception.message}")
.to(deadLetterQueue)
.end()
.setHeader("Content-Type", constant("application/json"))
.setHeader("Authorization", constant(receiverToken))
.to(receiverUri)
.choice()
.when(simple("${header.CamelHttpResponseCode} == 200"))
.log("Message Successfully sent to Rest Endpoint and Received status code: ${header.CamelHttpResponseCode}")
.endChoice();
}
}
application.yml. Da dies ein Vorlagenprojekt ist, das als Bibliothek verwendet wird, enthält test/resources die erforderlichen Konfigurationen.
camel:
springboot:
name: camel-file-templates
Unter „test“ werden wir InputCsvMapper.java und InputCsvProcessor.java haben, diese Klassen werden für Testzwecke verwendet.
package com.integration.camel.file.common.route.mapper;
import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;
@Data
@CsvRecord(separator = ",")
public class InputCsvMapper {
@DataField(pos = 1, columnName = "id")
private int id;
@DataField(pos = 2, columnName = "firstname")
private String firstName;
@DataField(pos = 3, columnName = "lastname")
private String lastName;
@DataField(pos = 4, columnName = "email")
private String email;
@DataField(pos = 5, columnName = "email2")
private String email2;
@DataField(pos = 6, columnName = "profession")
private String profession;
}
Code für InputCsvMapper.java und InputCsvProcessor.java ist hier enthalten.
package com.integration.camel.file.common.route.process;
import com.integration.camel.file.common.route.mapper.InputCsvMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.json.JsonObject;
import org.springframework.stereotype.Component;
@Component
public class InputCsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
InputCsvMapper csvRecord = exchange.getIn().getBody(InputCsvMapper.class);
JsonObject jsonObject = new JsonObject();
jsonObject.put("updatedId", csvRecord.getId());
jsonObject.put("updateName", csvRecord.getFirstName());
exchange.getIn().setBody(jsonObject);
}
}
Dann FileToTopicRouteTest.java, der den Unit-Test enthält, der für ein positives Szenario in der FileToTopicRouteTemplate implementiert wurde.
package com.integration.camel.file.common.route;
import com.integration.camel.file.common.route.templates.FileToTopicRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@CamelSpringBootTest
@EnableAutoConfiguration
@ActiveProfiles("test")
class FileToTopicRouteTest {
@EndpointInject("mock:activemq:topic:camel.testtopic")
MockEndpoint mockEndpoint;
@Configuration
static class TestConfig {
@Bean
RoutesBuilder route() {
return new RouteBuilder() {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new FileToTopicRouteTemplate(camelContext));
templatedRoute("file-to-topic-route-template").routeId("file-to-topic-route-test");
}
};
}
}
@Test
void verifyTheMessageCountReceived_Success() throws InterruptedException {
mockEndpoint.setExpectedMessageCount(7);
mockEndpoint.assertIsSatisfied();
}
}
Dann die Konfigurationsdatei für den Test – application-test.yml
file-to-topic:
routeId: "file-to-topic-route"
file:
uri: "file:src/test/resources?noop=true&delay=20000&antInclude=data_test_*.csv"
token: "\n"
noOfLinesToSkip: 1
noOfLinesToReadAtOnce: 2
skipFirstLine: true
streaming: true
mapperClass: "com.integration.camel.file.common.route.mapper.InputCsvMapper"
processorClass: "com.integration.camel.file.common.route.process.InputCsvProcessor"
endpoint:
uri: "mock:activemq:topic:camel.testtopic"
Die Testdatendatei: data_test_1.csv, die die folgenden Header enthalten wird.
id,firstname,lastname,email,email2,profession
Schließlich die pom.xml-Datei, hier wie bereits während der XML-DSL-Implementierung erwähnt, muss die Bibliothek mit dem maven-jar-plugin erstellt werden..
Überprüfen Sie die pom.xml unter https://bitbucket.org/yenlo/yenlo_camel/src/master/java-dsl/camel-file-templates/pom.xml
Führen Sie „mvn clean install“ aus, um die Artefakte im lokalen Repository bereitzustellen, die im nächsten Projekt verwendet werden sollen.
Erläuterung des camel-file-integration-sample-Projekts
InputCsvMapper.java und InputCsvProcessor.java: InputCsvMapper ist derjenige, der die CSV-Attribute zuordnet, und der InputCsvProcessor wird dabei helfen, die Payload zu ändern, bevor sie an das Backend gesendet wird.
package com.integration.camel.sample.mapper;
import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;
@Data
@CsvRecord(separator = ",")
public class InputCsvMapper {
@DataField(pos = 1, columnName = "id")
private int id;
@DataField(pos = 2, columnName = "firstname")
private String firstName;
@DataField(pos = 3, columnName = "lastname")
private String lastName;
@DataField(pos = 4, columnName = "email")
private String email;
@DataField(pos = 5, columnName = "email2")
private String email2;
@DataField(pos = 6, columnName = "profession")
private String profession;
}
package com.integration.camel.sample.process;
import com.integration.camel.sample.mapper.InputCsvMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.json.JsonObject;
import org.springframework.stereotype.Component;
@Component
public class InputCsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
InputCsvMapper csvRecord = exchange.getIn().getBody(InputCsvMapper.class);
JsonObject jsonObject = new JsonObject();
jsonObject.put("updatedId", csvRecord.getId());
jsonObject.put("updateName", csvRecord.getFirstName());
exchange.getIn().setBody(jsonObject);
}
}
Route Builder, die unter „builders“ definiert sind, wo wir die Routenvorlagen für unsere eigentliche Integrationsverwendung laden werden.
FileToTopicRoute.java
package com.integration.camel.sample.builders;
import com.integration.camel.file.common.route.templates.FileToTopicRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FileToTopicRoute extends RouteBuilder {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new FileToTopicRouteTemplate(camelContext));
templatedRoute("file-to-topic-route-template").routeId(camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}"));
}
}
TopicToRestRoute.java
package com.integration.camel.sample.builders;
import com.integration.camel.file.common.route.templates.FileToTopicRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FileToTopicRoute extends RouteBuilder {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new FileToTopicRouteTemplate(camelContext));
templatedRoute("file-to-topic-route-template").routeId(camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}"));
}
}
TopicToRestRoute.java
package com.integration.camel.sample.builders;
import com.integration.camel.file.common.route.templates.TopicToRestRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicToRestRoute extends RouteBuilder {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new TopicToRestRouteTemplate(camelContext));
templatedRoute("topic-to-rest-route-template").routeId(camelContext.resolvePropertyPlaceholders("{{topic-to-rest.routeId}}"));
}
}
Das Konfigurieren der Eigenschaften in application.yml
camel:
springboot:
name: camel-file-integration-sample
logging:
level:
org:
apache:
camel: DEBUG
spring:
activemq:
broker-url: "tcp://XXXXXXXX:61616"
user: XXXXXX
password: XXXXXX
file-to-topic:
routeId: "file-to-topic-route"
file:
uri: "file:src/main/resources?noop=true&delay=20000&antInclude=file_*.csv"
token: "\n"
noOfLinesToSkip: 1
noOfLinesToReadAtOnce: 2
skipFirstLine: true
streaming: true
mapperClass: "com.integration.camel.sample.mapper.InputCsvMapper"
processorClass: "com.integration.camel.sample.process.InputCsvProcessor"
endpoint:
uri: "activemq:topic:camel.testtopic"
topic-to-rest:
routeId: "topic-to-rest-route"
listener:
uri: "activemq:topic:camel.testtopic"
receiver:
uri: "https://XXXXXXXXXXXXX"
token: "12323444552211"
reDelivery:
attempts: 3
delay: 5000
deadLetterQueue: "activemq:queue:dead-letter"
throttle:
lockPeriodMilliSeconds: 10000
requestCount: 1
data_2.csv ist die Datei, die für die Integrationsausführung verwendet wird und die Header wie unten aufgeführt hat:
id, vorname, nachname, e-mail, e-mail2, beruf
Die pom.xml finden Sie unter https://bitbucket.org/yenlo/yenlo_camel/src/master/java-dsl/camel-file-integration-sample/pom.xml
Wie bereits erwähnt, kann ein Test unter test implementiert werden, da wir bereits dieselben Arten von Tests im Vorlagenprojekt durchgeführt haben, werden wir dies hier überspringen.
Führen Sie „mvn clean package“ aus und starten Sie dann die Anwendung mit „java -jar target/camel-file-integration-sample-1.0.0-SNAPSHOT.jar“
Das war’s… wir haben dieselbe Funktionalität auch mit der Java-DSL implementiert.
Leistungsvergleich zwischen XML-DSL und Java-DSL
Es ist immer eine Frage: Welche sollte man wählen? basierend auf der Implementierung oder dem Entwicklungsprozess, obwohl wir glauben, dass die XML-DSL für einen Nicht-Entwickler leichter zu verstehen ist, wenn wir jedoch den richtigen Verpackungs- und wiederverwendbaren Codeentwicklungsprozess berücksichtigen, ist die Java-DSL besser. Wie ich bereits während der Implementierung der XML-DSL erwähnt habe, wenn wir das Verpackungsaspekt betrachten: Java-DSL ist besser als XML-DSL, da wir beim Verpacken mit XML-DSL auch einige zusätzliche Konfigurationen vornehmen müssen, um die XML-Routenvorlagen aus den Abhängigkeitsprojekten in den Camel-Kontext zu laden.
Weiterhin haben wir die Leistung zwischen den beiden Methoden verglichen, für diesen Beispiel-Use-Case habe ich einen Test mit einer 10 MB-Datei durchgeführt und die Ergebnisse sind wie folgt.
Basierend auf den Ergebnissen können wir schlussfolgern, dass es besser ist, sich für die Java-DSL-Implementierung zu entscheiden, wenn wir die Java-Expertise im Team haben, was uns hilft, die Leistung zusammen mit der Unterstützung von Unit-, Integrations-Tests und Debugging-Fähigkeiten aufrechtzuerhalten.
Anhang
Bei der Verwendung des Idempotent Consumer gibt es verschiedene Optionen, die als Idempotent Repository verwendet werden können. Hier haben wir das File Idempotent Repository verwendet. Für andere Optionen siehe den Link auf https://camel.apache.org/components/4.0.x/eips/idempotentConsumer-eip.html. In dieser Methode wird beim Starten der Route zuerst die Datei gelesen und eine track.txt-Datei in einem Verzeichnis des Pods gespeichert, die mit einem NFS-Ort verbunden ist. Dann schreibt es beim Starten der Verarbeitung den Dateinamen, der verarbeitet werden soll, in die freigegebene Datei. Wenn ein anderer Pod versucht, dieselbe Datei zu konsumieren, wird sie übersprungen, da der Eintrag bereits vorhanden ist.
Bei dieser Technik besteht die Möglichkeit, dass die Datei mehrmals gelesen wird. Um dies zu vermeiden, ist es immer besser, eine zufällige Verzögerung hinzuzufügen, bevor der idempotente Verbraucher verwendet wird. Dann können wir sicher sein, dass die Datei nicht erneut verarbeitet wird.
Nachfolgend finden Sie der angepasste FileToTopicRouteTemplate.java nach Hinzufügen des FileIdempotentRepository.
package com.integration.camel.file.common.route.templates;
import com.integration.camel.file.common.route.aggregate.JsonAggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.BindyType;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.support.processor.idempotent.FileIdempotentRepository;
import java.io.File;
/*
* This Route Template created for the reuse of a typical File To Topic Route.
*/
public class FileToTopicRouteTemplate extends RouteBuilder {
private final CamelContext camelContext;
public FileToTopicRouteTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public void configure() throws Exception {
String routeId = camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}");
String fileUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.uri}}");
String token = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.token}}");
int noOfLinesToReadAtOnce = Integer.parseInt(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.noOfLinesToReadAtOnce}}"));
boolean skipFirstLine = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.skipFirstLine}}"));
boolean streaming = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.streaming}}"));
String inputMapperClassName = camelContext.resolvePropertyPlaceholders("{{file-to-topic.mapperClass}}");
String processorClass = camelContext.resolvePropertyPlaceholders("{{file-to-topic.processorClass}}");
String endpointUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.endpoint.uri}}");
String trackFile = camelContext.resolvePropertyPlaceholders("{{file-to-topic.trackFile}}");
routeTemplate("file-to-topic-route-template")
.templateBean("timeGapBean")
.typeClass("com.integration.camel.file.common.route.utils.TimeGap")
.end()
.from(fileUri)
.delay(simple("${random(1000,5000)}"))
.idempotentConsumer(header("CamelFileName"),
FileIdempotentRepository.fileIdempotentRepository(new File(trackFile)))
.routeId(routeId)
.log(LoggingLevel.INFO, "Starting to process the file: ${header.CamelFileName} and ${header.camelFileLength} Bytes")
.setProperty("startTime", simple("${date:now}"))
.split(body().tokenize(token, noOfLinesToReadAtOnce , skipFirstLine)).streaming(streaming)
.log(LoggingLevel.DEBUG, "Message in process after the initial split: ${body}")
.unmarshal().bindy(BindyType.Csv, Class.forName(inputMapperClassName))
.split(body(), new JsonAggregationStrategy())
.log(LoggingLevel.DEBUG, "Message in process after the individual split: ${body}")
.bean(processorClass)
.marshal().json(JsonLibrary.Jackson)
.log(LoggingLevel.DEBUG, "Message in process at the end of individual split: ${body}")
.end()
.log(LoggingLevel.DEBUG, "Message will be sent after aggregation: ${body}")
.to(endpointUri)
.end()
.setProperty("endTime", simple("${date:now}"))
.log(LoggingLevel.INFO, "Done processing the file: ${header.CamelFileName}")
.to("bean:{{timeGapBean}}?method=calculateTimeDifference(${exchangeProperty.startTime},${exchangeProperty.endTime})")
.log(LoggingLevel.INFO,"Time Taken to complete the route " + routeId + " on ${date:now} ${body}");
}
}
Das schließt unsere Diskussion über die effiziente Verarbeitung großer Dateien mit Apache Camel ab. Im Teil 1 dieser Serie haben wir die Herausforderungen bei der Verarbeitung großer Dateien behandelt und wie wir sie mit der Apache Camel XML-DSL implementieren können, und hier in Teil 2 haben wir die Implementierung mit Java-DSL behandelt und den Vergleich zwischen den beiden Implementierungen Java-DSL und XML-DSL durchgeführt.
Ich bin zuversichtlich, dass diese Blogserie Ihnen ausreichend Informationen für Ihre Reise zur Dateiverarbeitung mit Camel gegeben hat. Ich freue mich auf eine erneute Verbindung in einem anderen Blog. Bleiben Sie dran für weitere Einblicke und Updates.