Een handleiding met Java / XML-DSL en Spring Boot
In onze vorige blog in deze serie over efficiënte verwerking van grote bestanden met Apache Camel hebben we ons gericht op XML-DSL. Nu zullen we de Java DSL gebruiken voor de integratie en de prestaties tussen beide vergelijken.
Implementatie van Java-DSL
Net zoals we hebben gedaan bij de XML-DSL-implementatie, zullen we eerst naar de projectstructuur kijken op een herbruikbare manier. Zie de onderstaande afbeelding die uitlegt hoe dit is gestructureerd voor deze voorbeeldimplementatie.
Het sjabloonproject zal ook de Unit Test bevatten, met behulp van “camel-test-spring-junit5”.
Toelichting op het camel-file-templates-project
Zoals vermeld in de vorige blog, houdt onze aanpak in dat we eerst de informatie uit het CSV-bestand extraheren, vervolgens gegevensverwerking uitvoeren om een JSON-uitvoer te genereren met gespecificeerde velden. Bovendien streven we ernaar een gedefinieerd aantal regels uit het bestand tegelijk te groeperen. Dit aggregatieproces wordt vergemakkelijkt door de Camel-configuratie gespecificeerd in het application.yml-bestand onder de eigenschap “noOfLinesToReadAtOnce.” Om deze aggregatie uit te voeren, zullen we de volgende klasse gebruiken.
package com.integration.camel.file.common.route.aggregate;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;
@Component
public class JsonAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
String body = null;
if (!oldBody.startsWith("[")) {
body = "[ " + oldBody + ", " + newBody + " ]";
} else{
body = oldBody.replace("]", "") + ", " + newBody + " ]";
}
oldExchange.getIn().setBody(body);
return oldExchange;
}
}
In het ‘utils’-gedeelte hebben we TimeGap.java. Dit wordt gebruikt om de verwerkingstijd te berekenen, wat handig kan zijn voor het vergelijken van de prestaties.
package com.integration.camel.file.common.route.utils;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;
@Component
public class TimeGap {
public String calculateTimeDifference(Date startTime, Date endTime) {
long diffInMillis = endTime.getTime() - startTime.getTime();
long seconds = TimeUnit.MILLISECONDS.toSeconds(diffInMillis);
long minutes = TimeUnit.MILLISECONDS.toMinutes(diffInMillis);
long hours = TimeUnit.MILLISECONDS.toHours(diffInMillis);
return String.format("%d hours, %d minutes, %d seconds, %d milliseconds", hours, minutes, seconds, diffInMillis);
}
}
Vervolgens hebben we onder ’templates’ 2 soorten sjablonen:
FileToTopicRouteTemplate.java
Dit route-sjabloon is ontworpen voor het monitoren van een bestandslocatie, het ophalen van de inhoud van het bestand, het verwerken ervan via streaming, segmentatie op basis van CSV-regels en het uiteindelijk publiceren ervan naar een topic.
package com.integration.camel.file.common.route.templates;
import com.integration.camel.file.common.route.aggregate.JsonAggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.BindyType;
import org.apache.camel.model.dataformat.JsonLibrary;
/*
* This Route Template created for the reuse of a typical File To Topic Route.
*/
public class FileToTopicRouteTemplate extends RouteBuilder {
private final CamelContext camelContext;
public FileToTopicRouteTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public void configure() throws Exception {
String routeId = camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}");
String fileUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.uri}}");
String token = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.token}}");
int noOfLinesToReadAtOnce = Integer.parseInt(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.noOfLinesToReadAtOnce}}"));
boolean skipFirstLine = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.skipFirstLine}}"));
boolean streaming = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.streaming}}"));
String inputMapperClassName = camelContext.resolvePropertyPlaceholders("{{file-to-topic.mapperClass}}");
String processorClass = camelContext.resolvePropertyPlaceholders("{{file-to-topic.processorClass}}");
String endpointUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.endpoint.uri}}");
routeTemplate("file-to-topic-route-template")
.templateBean("timeGapBean")
.typeClass("com.integration.camel.file.common.route.utils.TimeGap")
.end()
.from(fileUri)
.routeId(routeId)
.log(LoggingLevel.INFO, "Starting to process the file: ${header.CamelFileName} and ${header.camelFileLength} Bytes")
.setProperty("startTime", simple("${date:now}"))
.split(body().tokenize(token, noOfLinesToReadAtOnce , skipFirstLine)).streaming(streaming)
.log(LoggingLevel.DEBUG, "Message in process after the initial split: ${body}")
.unmarshal().bindy(BindyType.Csv, Class.forName(inputMapperClassName))
.split(body(), new JsonAggregationStrategy())
.log(LoggingLevel.DEBUG, "Message in process after the individual split: ${body}")
.bean(processorClass)
.marshal().json(JsonLibrary.Jackson)
.log(LoggingLevel.DEBUG, "Message in process at the end of individual split: ${body}")
.end()
.log(LoggingLevel.DEBUG, "Message will be sent after aggregation: ${body}")
.to(endpointUri)
.end()
.setProperty("endTime", simple("${date:now}"))
.log(LoggingLevel.INFO, "Done processing the file: ${header.CamelFileName}")
.to("bean:{{timeGapBean}}?method=calculateTimeDifference(${exchangeProperty.startTime},${exchangeProperty.endTime})")
.log(LoggingLevel.INFO,"Time Taken to complete the route " + routeId + " on ${date:now} ${body}");
}
}
Aanbevelingen om je te helpen jouw eigen architectuur te ontwerpen en daarbij een selectie te maken
Nu downloadenTopicToRestRouteTemplate.java
Dit route-sjabloon is ontworpen om berichten op te halen uit het topic en ze vervolgens door te sturen naar een backend, terwijl het ook de mogelijkheid biedt tot opnieuw verzenden.
package com.integration.camel.file.common.route.templates;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.http.base.HttpOperationFailedException;
/*
* This Route Template created for the reuse of a typical Topic To Rest Route.
*/
public class TopicToRestRouteTemplate extends RouteBuilder {
private final CamelContext camelContext;
public TopicToRestRouteTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public void configure() {
String routeId = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.routeId}}");
String listenerUri = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.listener.uri}}");
String lockPeriodMilliSeconds = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.throttle.lockPeriodMilliSeconds}}");
String requestCount = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.throttle.requestCount}}");
String receiverToken = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.token}}");
String receiverDelay = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.reDelivery.delay}}");
String deadLetterQueue = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.reDelivery.deadLetterQueue}}");
String receiverUri = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.uri}}");
String reDeliveryAttempts = camelContext.resolvePropertyPlaceholders("{{topic-to-rest.receiver.reDelivery.attempts}}");
routeTemplate("topic-to-rest-route-template")
.from(listenerUri)
.routeId(routeId)
.throttle(constant(requestCount)).timePeriodMillis(lockPeriodMilliSeconds)
.onException(HttpOperationFailedException.class)
.onWhen(simple("${exception.statusCode} == 422"))
.maximumRedeliveries(reDeliveryAttempts)
.redeliveryDelay(receiverDelay)
.handled(true)
.log(LoggingLevel.ERROR, "HTTP error occurred with status ${exception.statusCode}. Response body: ${exception.message}")
.to(deadLetterQueue)
.end()
.setHeader("Content-Type", constant("application/json"))
.setHeader("Authorization", constant(receiverToken))
.to(receiverUri)
.choice()
.when(simple("${header.CamelHttpResponseCode} == 200"))
.log("Message Successfully sent to Rest Endpoint and Received status code: ${header.CamelHttpResponseCode}")
.endChoice();
}
}
application.yml. Omdat dit een sjabloonproject is dat als bibliotheek wordt gebruikt, zullen de test/resources de benodigde configuraties bevatten..
camel:
springboot:
name: camel-file-templates
Onder “test” zullen we InputCsvMapper.java en InputCsvProcessor.java hebben, deze klassen worden gebruikt voor testdoeleinden.
package com.integration.camel.file.common.route.mapper;
import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;
@Data
@CsvRecord(separator = ",")
public class InputCsvMapper {
@DataField(pos = 1, columnName = "id")
private int id;
@DataField(pos = 2, columnName = "firstname")
private String firstName;
@DataField(pos = 3, columnName = "lastname")
private String lastName;
@DataField(pos = 4, columnName = "email")
private String email;
@DataField(pos = 5, columnName = "email2")
private String email2;
@DataField(pos = 6, columnName = "profession")
private String profession;
}
Aangezien ons doel is om de payload te wijzigen voordat deze naar de backend wordt verzonden, gebruiken we de InputCsvProcessor om hieraan tegemoet te komen.
package com.integration.camel.file.common.route.process;
import com.integration.camel.file.common.route.mapper.InputCsvMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.json.JsonObject;
import org.springframework.stereotype.Component;
@Component
public class InputCsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
InputCsvMapper csvRecord = exchange.getIn().getBody(InputCsvMapper.class);
JsonObject jsonObject = new JsonObject();
jsonObject.put("updatedId", csvRecord.getId());
jsonObject.put("updateName", csvRecord.getFirstName());
exchange.getIn().setBody(jsonObject);
}
}
Daarna hebben we de FileToTopicRouteTest.java die de geïmplementeerde unit-test bevat voor een positief scenario in de FileToTopicRouteTemplate.
package com.integration.camel.file.common.route;
import com.integration.camel.file.common.route.templates.FileToTopicRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@CamelSpringBootTest
@EnableAutoConfiguration
@ActiveProfiles("test")
class FileToTopicRouteTest {
@EndpointInject("mock:activemq:topic:camel.testtopic")
MockEndpoint mockEndpoint;
@Configuration
static class TestConfig {
@Bean
RoutesBuilder route() {
return new RouteBuilder() {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new FileToTopicRouteTemplate(camelContext));
templatedRoute("file-to-topic-route-template").routeId("file-to-topic-route-test");
}
};
}
}
@Test
void verifyTheMessageCountReceived_Success() throws InterruptedException {
mockEndpoint.setExpectedMessageCount(7);
mockEndpoint.assertIsSatisfied();
}
}
Vervolgens het configuratiebestand voor test – application-test.yml
file-to-topic:
routeId: "file-to-topic-route"
file:
uri: "file:src/test/resources?noop=true&delay=20000&antInclude=data_test_*.csv"
token: "\n"
noOfLinesToSkip: 1
noOfLinesToReadAtOnce: 2
skipFirstLine: true
streaming: true
mapperClass: "com.integration.camel.file.common.route.mapper.InputCsvMapper"
processorClass: "com.integration.camel.file.common.route.process.InputCsvProcessor"
endpoint:
uri: "mock:activemq:topic:camel.testtopic"
data_test_1.csv is het bestand dat zal worden gebruikt voor de integratie-uitvoering en zal de onderstaande headers bevatten.
id,firstname,lastname,email,email2,profession
Tot slot de pom.xml, hier zoals eerder vermeld tijdens de XML-DSL-implementatie, moet de bibliotheek worden gebouwd met behulp van de maven-jar-plugin.
Raadpleeg de pom.xml op https://bitbucket.org/yenlo/yenlo_camel/src/master/java-dsl/camel-file-templates/pom.xml.
Voer mvn clean install uit om de artefacten naar het lokale repository te implementeren, zodat ze gebruikt kunnen worden in het volgende project.
Uitleg over het camel-file-integration-sample-project
InputCsvMapper.java en InputCsvProcessor.java: InputCsvMapper is degene die de csv-attributen in kaart brengt en de InputCsvProcessor zal helpen om de payload te wijzigen voordat deze naar de backend wordt verzonden.
package com.integration.camel.sample.mapper;
import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;
@Data
@CsvRecord(separator = ",")
public class InputCsvMapper {
@DataField(pos = 1, columnName = "id")
private int id;
@DataField(pos = 2, columnName = "firstname")
private String firstName;
@DataField(pos = 3, columnName = "lastname")
private String lastName;
@DataField(pos = 4, columnName = "email")
private String email;
@DataField(pos = 5, columnName = "email2")
private String email2;
@DataField(pos = 6, columnName = "profession")
private String profession;
}
package com.integration.camel.sample.process;
import com.integration.camel.sample.mapper.InputCsvMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.json.JsonObject;
import org.springframework.stereotype.Component;
@Component
public class InputCsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
InputCsvMapper csvRecord = exchange.getIn().getBody(InputCsvMapper.class);
JsonObject jsonObject = new JsonObject();
jsonObject.put("updatedId", csvRecord.getId());
jsonObject.put("updateName", csvRecord.getFirstName());
exchange.getIn().setBody(jsonObject);
}
}
Routebuilders gedefinieerd onder het pakket ‘builders’, waar we de routesjablonen zullen laden voor ons daadwerkelijke integratiegebruik.
FileToTopicRoute.java
package com.integration.camel.sample.builders;
import com.integration.camel.file.common.route.templates.FileToTopicRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FileToTopicRoute extends RouteBuilder {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new FileToTopicRouteTemplate(camelContext));
templatedRoute("file-to-topic-route-template").routeId(camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}"));
}
}
TopicToRestRoute.java
package com.integration.camel.sample.builders;
import com.integration.camel.file.common.route.templates.FileToTopicRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FileToTopicRoute extends RouteBuilder {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new FileToTopicRouteTemplate(camelContext));
templatedRoute("file-to-topic-route-template").routeId(camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}"));
}
}
TopicToRestRoute.java
package com.integration.camel.sample.builders;
import com.integration.camel.file.common.route.templates.TopicToRestRouteTemplate;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicToRestRoute extends RouteBuilder {
@Autowired
CamelContext camelContext;
@Override
public void configure() throws Exception {
camelContext.addRoutes(new TopicToRestRouteTemplate(camelContext));
templatedRoute("topic-to-rest-route-template").routeId(camelContext.resolvePropertyPlaceholders("{{topic-to-rest.routeId}}"));
}
}
Configuratie van de eigenschappen in application.yml
camel:
springboot:
name: camel-file-integration-sample
logging:
level:
org:
apache:
camel: DEBUG
spring:
activemq:
broker-url: "tcp://XXXXXXXX:61616"
user: XXXXXX
password: XXXXXX
file-to-topic:
routeId: "file-to-topic-route"
file:
uri: "file:src/main/resources?noop=true&delay=20000&antInclude=file_*.csv"
token: "\n"
noOfLinesToSkip: 1
noOfLinesToReadAtOnce: 2
skipFirstLine: true
streaming: true
mapperClass: "com.integration.camel.sample.mapper.InputCsvMapper"
processorClass: "com.integration.camel.sample.process.InputCsvProcessor"
endpoint:
uri: "activemq:topic:camel.testtopic"
topic-to-rest:
routeId: "topic-to-rest-route"
listener:
uri: "activemq:topic:camel.testtopic"
receiver:
uri: "https://XXXXXXXXXXXXX"
token: "12323444552211"
reDelivery:
attempts: 3
delay: 5000
deadLetterQueue: "activemq:queue:dead-letter"
throttle:
lockPeriodMilliSeconds: 10000
requestCount: 1
data_2.csv is het bestand dat gebruikt zal worden voor de integratie-uitvoering en zal de volgende koppen hebben:
id, voornaam, achternaam, e-mail, e-mail2, beroep
De pom.xml kan worden gevonden op https://bitbucket.org/yenlo/yenlo_camel/src/master/java-dsl/camel-file-integration-sample/pom.xml. Zoals eerder vermeld kan een test worden geïmplementeerd onder test, omdat we al dezelfde soort testen hebben uitgevoerd in het sjabloonproject, zullen we deze hier overslaan.
Voer “mvn clean package” uit en voer vervolgens de toepassing uit met “java -jar target/camel-file-integration-sample-1.0.0-SNAPSHOT.jar”.
Dat is het… we hebben dezelfde functionaliteit geïmplementeerd met behulp van de Java-DSL.
Prestatievergelijking tussen XML-DSL en Java-DSL
Het is altijd een vraag: welke moeten we kiezen? Gebaseerd op de implementatie of het ontwikkelingsproces, hoewel we vinden dat de XML-DSL gemakkelijker te begrijpen zal zijn voor een niet-ontwikkelaar, als we rekening houden met de juiste verpakking en ontwikkeling van herbruikbare code, dan is Java-DSL beter.
Zoals ik al tijdens de implementatie van XML-DSL heb genoemd, wanneer we de verpakking overwegen: Java-DSL is beter dan XML-DSL, omdat wanneer we de verpakking doen met XML-DSL, we ook wat extra configuratie moeten doen om de XML-route-sjablonen naar Camel Context te laden vanuit de afhankelijkheidsprojecten.
Verder om de prestaties tussen de twee methoden te vergelijken, voor dit voorbeeldgebruik, heb ik een test gedaan voor een bestand van 10 MB en de resultaten zijn als volgt:
Based on the results we can conclude that, if we have the Java Expertise in the Team, it is better to go with the Java-DSL Implementation, which will help to us maintain the performance along with the support of Unit, Integration Testing and Debugging capabilities.
Appendix
Bij het gebruik van Idempotent Consumer zijn er verschillende opties om te gebruiken als een Idempotent Repository. Hier hebben we de File Idempotent Repository gebruikt. Voor andere opties kunt u de link raadplegen op https://camel.apache.org/components/4.0.x/eips/idempotentConsumer-eip.html. In deze methode, wanneer de route wordt geïnitieerd, zal deze eerst het bestand lezen en een track.txt-bestand persistent in een map van de pod die zal worden gekoppeld aan een NFS-locatie. Vervolgens wordt bij de start van de verwerking de bestandsnaam die moet worden verwerkt in het gedeelde bestand geschreven. Wanneer een andere pod probeert hetzelfde bestand te beluisteren, wordt het overgeslagen omdat het record al bestaat.
Bij deze techniek bestaat de mogelijkheid dat het bestand meerdere keren wordt gelezen, om dit te voorkomen is het altijd beter om een willekeurige vertraging te hebben voordat de idempotente consument wordt gebruikt. Zo kunnen we ervoor zorgen dat het bestand niet opnieuw wordt verwerkt.
Hieronder staat de FileToTopicRouteTemplate.java aangepast na toevoeging van de FileIdempotentRepository.
package com.integration.camel.file.common.route.templates;
import com.integration.camel.file.common.route.aggregate.JsonAggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.BindyType;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.support.processor.idempotent.FileIdempotentRepository;
import java.io.File;
/*
* This Route Template created for the reuse of a typical File To Topic Route.
*/
public class FileToTopicRouteTemplate extends RouteBuilder {
private final CamelContext camelContext;
public FileToTopicRouteTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public void configure() throws Exception {
String routeId = camelContext.resolvePropertyPlaceholders("{{file-to-topic.routeId}}");
String fileUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.uri}}");
String token = camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.token}}");
int noOfLinesToReadAtOnce = Integer.parseInt(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.noOfLinesToReadAtOnce}}"));
boolean skipFirstLine = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.skipFirstLine}}"));
boolean streaming = Boolean.parseBoolean(camelContext.resolvePropertyPlaceholders("{{file-to-topic.file.streaming}}"));
String inputMapperClassName = camelContext.resolvePropertyPlaceholders("{{file-to-topic.mapperClass}}");
String processorClass = camelContext.resolvePropertyPlaceholders("{{file-to-topic.processorClass}}");
String endpointUri = camelContext.resolvePropertyPlaceholders("{{file-to-topic.endpoint.uri}}");
String trackFile = camelContext.resolvePropertyPlaceholders("{{file-to-topic.trackFile}}");
routeTemplate("file-to-topic-route-template")
.templateBean("timeGapBean")
.typeClass("com.integration.camel.file.common.route.utils.TimeGap")
.end()
.from(fileUri)
.delay(simple("${random(1000,5000)}"))
.idempotentConsumer(header("CamelFileName"),
FileIdempotentRepository.fileIdempotentRepository(new File(trackFile)))
.routeId(routeId)
.log(LoggingLevel.INFO, "Starting to process the file: ${header.CamelFileName} and ${header.camelFileLength} Bytes")
.setProperty("startTime", simple("${date:now}"))
.split(body().tokenize(token, noOfLinesToReadAtOnce , skipFirstLine)).streaming(streaming)
.log(LoggingLevel.DEBUG, "Message in process after the initial split: ${body}")
.unmarshal().bindy(BindyType.Csv, Class.forName(inputMapperClassName))
.split(body(), new JsonAggregationStrategy())
.log(LoggingLevel.DEBUG, "Message in process after the individual split: ${body}")
.bean(processorClass)
.marshal().json(JsonLibrary.Jackson)
.log(LoggingLevel.DEBUG, "Message in process at the end of individual split: ${body}")
.end()
.log(LoggingLevel.DEBUG, "Message will be sent after aggregation: ${body}")
.to(endpointUri)
.end()
.setProperty("endTime", simple("${date:now}"))
.log(LoggingLevel.INFO, "Done processing the file: ${header.CamelFileName}")
.to("bean:{{timeGapBean}}?method=calculateTimeDifference(${exchangeProperty.startTime},${exchangeProperty.endTime})")
.log(LoggingLevel.INFO,"Time Taken to complete the route " + routeId + " on ${date:now} ${body}");
}
}
Dat concludeert onze discussie over Efficiënte verwerking van grote bestanden met Apache Camel. Tijdens het eerste deel van deze serie hebben we de uitdagingen in de verwerking van grote bestanden behandeld en hoe we deze kunnen implementeren met de Apache Camel XML-DSL en hier in Deel 2 hebben we de implementatie behandeld met Java DSL en de vergelijking tussen beide implementaties Java DSL en XML DSL.
Ik vertrouw erop dat deze reeks blogs u voldoende informatie heeft gegeven om uw reis naar bestandsverwerking met Camel te beginnen. Ik kijk ernaar uit om in een andere blog opnieuw verbinding te maken. Blijf op de hoogte voor meer inzichten en updates.