Skip to content

Commit 4d12db3

Browse files
committed
MODLD-901: kafka listener saving ImportResultEvents
1 parent 6ef3e19 commit 4d12db3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1394
-45
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@
405405
<configOptions>
406406
<java8>true</java8>
407407
<useTags>true</useTags>
408-
<dateLibrary>java</dateLibrary>
408+
<dateLibrary>java8</dateLibrary>
409409
<interfaceOnly>true</interfaceOnly>
410410
<useSpringBoot3>true</useSpringBoot3>
411411
<requestMappingMode>api_interface</requestMappingMode>

src/main/java/org/folio/linked/data/imprt/batch/job/processor/Rdf2LdProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import java.util.Set;
1010
import lombok.extern.log4j.Log4j2;
1111
import org.folio.linked.data.imprt.domain.dto.ResourceWithLineNumber;
12-
import org.folio.linked.data.imprt.model.FailedRdfLine;
1312
import org.folio.linked.data.imprt.model.RdfLineWithNumber;
13+
import org.folio.linked.data.imprt.model.entity.FailedRdfLine;
1414
import org.folio.linked.data.imprt.repo.FailedRdfLineRepo;
1515
import org.folio.rdf4ld.service.Rdf4LdService;
1616
import org.springframework.batch.core.configuration.annotation.StepScope;

src/main/java/org/folio/linked/data/imprt/batch/job/writer/LdKafkaSender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import java.util.Set;
66
import lombok.extern.log4j.Log4j2;
7-
import org.folio.linked.data.imprt.domain.dto.ImportOutput;
7+
import org.folio.linked.data.imprt.domain.dto.ImportOutputEvent;
88
import org.folio.linked.data.imprt.domain.dto.ResourceWithLineNumber;
99
import org.folio.spring.tools.kafka.FolioMessageProducer;
1010
import org.springframework.batch.core.configuration.annotation.StepScope;
@@ -20,11 +20,11 @@
2020
public class LdKafkaSender implements ItemWriter<Set<ResourceWithLineNumber>> {
2121

2222
private final Long jobInstanceId;
23-
private final FolioMessageProducer<ImportOutput> importOutputFolioMessageProducer;
23+
private final FolioMessageProducer<ImportOutputEvent> importOutputFolioMessageProducer;
2424
private final Integer chunkSize;
2525

2626
public LdKafkaSender(@Value("#{jobInstanceId}") Long jobInstanceId,
27-
@Qualifier("importOutputMessageProducer") FolioMessageProducer<ImportOutput> producer,
27+
@Qualifier("importOutputMessageProducer") FolioMessageProducer<ImportOutputEvent> producer,
2828
@Value("${mod-linked-data-import.output-chunk-size}") Integer chunkSize) {
2929
this.jobInstanceId = jobInstanceId;
3030
this.importOutputFolioMessageProducer = producer;
@@ -36,7 +36,7 @@ public LdKafkaSender(@Value("#{jobInstanceId}") Long jobInstanceId,
3636
public void write(Chunk<? extends Set<ResourceWithLineNumber>> chunk) {
3737
var messages = chunked(chunk.getItems().stream().flatMap(Set::stream), chunkSize)
3838
.map(resourcesWithLineNumbers ->
39-
new ImportOutput(resourcesWithLineNumbers).jobInstanceId(jobInstanceId))
39+
new ImportOutputEvent(resourcesWithLineNumbers).jobInstanceId(jobInstanceId))
4040
.toList();
4141
importOutputFolioMessageProducer.sendMessages(messages);
4242
}

src/main/java/org/folio/linked/data/imprt/config/ObjectMapperConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.fasterxml.jackson.databind.DeserializationFeature;
55
import com.fasterxml.jackson.databind.ObjectMapper;
66
import com.fasterxml.jackson.databind.SerializationFeature;
7+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
78
import org.springframework.context.annotation.Bean;
89
import org.springframework.context.annotation.Configuration;
910
import org.springframework.context.annotation.Primary;
@@ -14,11 +15,14 @@ public class ObjectMapperConfig {
1415
@Bean
1516
@Primary
1617
public ObjectMapper objectMapper() {
17-
return new ObjectMapper()
18+
var objectMapper = new ObjectMapper()
1819
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
1920
.configure(SerializationFeature.USE_EQUALITY_FOR_OBJECT_ID, true)
21+
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
2022
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
2123
.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
24+
objectMapper.registerModule(new JavaTimeModule());
25+
return objectMapper;
2226
}
2327

2428
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package org.folio.linked.data.imprt.config.kafka;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import java.util.HashMap;
5+
import java.util.function.Supplier;
6+
import org.apache.kafka.common.serialization.Deserializer;
7+
import org.apache.kafka.common.serialization.StringDeserializer;
8+
import org.folio.linked.data.imprt.domain.dto.ImportResultEvent;
9+
import org.folio.spring.tools.kafka.FolioKafkaProperties;
10+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
11+
import org.springframework.boot.context.properties.ConfigurationProperties;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
15+
import org.springframework.kafka.core.ConsumerFactory;
16+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
17+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
18+
import org.springframework.kafka.support.serializer.JsonDeserializer;
19+
20+
@Configuration
21+
public class ListenerConfiguration {
22+
23+
@Bean
24+
@ConfigurationProperties("folio.kafka")
25+
public FolioKafkaProperties folioKafkaProperties() {
26+
return new FolioKafkaProperties();
27+
}
28+
29+
@Bean
30+
public ConcurrentKafkaListenerContainerFactory<String, ImportResultEvent> importResultEventListenerContainerFactory(
31+
ConsumerFactory<String, ImportResultEvent> importResultEventConsumerFactory
32+
) {
33+
return concurrentKafkaBatchListenerContainerFactory(importResultEventConsumerFactory, false);
34+
}
35+
36+
@Bean
37+
public ConsumerFactory<String, ImportResultEvent> importResultEventConsumerFactory(ObjectMapper mapper,
38+
KafkaProperties properties) {
39+
return errorHandlingConsumerFactory(ImportResultEvent.class, mapper, properties);
40+
}
41+
42+
43+
private <V> ConcurrentKafkaListenerContainerFactory<String, V> concurrentKafkaBatchListenerContainerFactory(
44+
ConsumerFactory<String, V> consumerFactory, boolean batch) {
45+
var factory = new ConcurrentKafkaListenerContainerFactory<String, V>();
46+
factory.setBatchListener(batch);
47+
factory.setConsumerFactory(consumerFactory);
48+
return factory;
49+
}
50+
51+
52+
private <V> ConsumerFactory<String, V> errorHandlingConsumerFactory(Class<V> clazz,
53+
ObjectMapper mapper,
54+
KafkaProperties kafkaProperties) {
55+
var properties = new HashMap<>(kafkaProperties.buildConsumerProperties(null));
56+
Supplier<Deserializer<String>> keyDeserializer = StringDeserializer::new;
57+
Supplier<Deserializer<V>> valueDeserializer = () ->
58+
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(clazz, mapper));
59+
return new DefaultKafkaConsumerFactory<>(properties, keyDeserializer, valueDeserializer);
60+
}
61+
}

src/main/java/org/folio/linked/data/imprt/config/kafka/ProducerConfiguration.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import lombok.RequiredArgsConstructor;
77
import org.apache.kafka.common.serialization.Serializer;
88
import org.apache.kafka.common.serialization.StringSerializer;
9-
import org.folio.linked.data.imprt.domain.dto.ImportOutput;
9+
import org.folio.linked.data.imprt.domain.dto.ImportOutputEvent;
1010
import org.folio.spring.tools.kafka.FolioMessageProducer;
1111
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
1212
import org.springframework.context.annotation.Bean;
@@ -23,24 +23,24 @@ public class ProducerConfiguration {
2323
private final TopicProperties topicProperties;
2424

2525
@Bean
26-
public FolioMessageProducer<ImportOutput> importOutputMessageProducer(
27-
KafkaTemplate<String, ImportOutput> importOutputMessageTemplate) {
26+
public FolioMessageProducer<ImportOutputEvent> importOutputMessageProducer(
27+
KafkaTemplate<String, ImportOutputEvent> importOutputMessageTemplate) {
2828
var producer = new FolioMessageProducer<>(importOutputMessageTemplate, topicProperties::getLinkedDataImportOutput);
29-
producer.setKeyMapper(ImportOutput::getTs);
29+
producer.setKeyMapper(ImportOutputEvent::getTs);
3030
return producer;
3131
}
3232

3333
@Bean
34-
public KafkaTemplate<String, ImportOutput> importOutputMessageTemplate(
35-
ProducerFactory<String, ImportOutput> importOutputMessageProducerFactory) {
34+
public KafkaTemplate<String, ImportOutputEvent> importOutputMessageTemplate(
35+
ProducerFactory<String, ImportOutputEvent> importOutputMessageProducerFactory) {
3636
return new KafkaTemplate<>(importOutputMessageProducerFactory);
3737
}
3838

3939
@Bean
40-
public ProducerFactory<String, ImportOutput> importOutputMessageProducerFactory(ObjectMapper objectMapper) {
40+
public ProducerFactory<String, ImportOutputEvent> importOutputMessageProducerFactory(ObjectMapper objectMapper) {
4141
var properties = new HashMap<>(kafkaProperties.buildProducerProperties(null));
4242
Supplier<Serializer<String>> keySerializer = StringSerializer::new;
43-
Supplier<Serializer<ImportOutput>> valueSerializer = () -> new JsonSerializer<>(objectMapper);
43+
Supplier<Serializer<ImportOutputEvent>> valueSerializer = () -> new JsonSerializer<>(objectMapper);
4444
return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);
4545
}
4646

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package org.folio.linked.data.imprt.integration.kafka;
2+
3+
import static java.util.Optional.ofNullable;
4+
import static org.folio.linked.data.imprt.util.KafkaUtils.handleForExistedTenant;
5+
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.log4j.Log4j2;
8+
import org.apache.kafka.clients.consumer.ConsumerRecord;
9+
import org.apache.logging.log4j.Level;
10+
import org.folio.linked.data.imprt.domain.dto.ImportResultEvent;
11+
import org.folio.linked.data.imprt.integration.kafka.handler.KafkaMessageHandler;
12+
import org.folio.linked.data.imprt.service.tenant.LinkedDataImportTenantService;
13+
import org.folio.linked.data.imprt.service.tenant.TenantScopedExecutionService;
14+
import org.springframework.kafka.annotation.KafkaListener;
15+
import org.springframework.retry.RetryContext;
16+
import org.springframework.stereotype.Component;
17+
18+
@Log4j2
19+
@Component
20+
@RequiredArgsConstructor
21+
public class ImportResultEventListener {
22+
23+
private static final String LISTENER_ID = "mod-linked-data-import-import-result-event-listener";
24+
private static final String CONTAINER_FACTORY = "importResultEventListenerContainerFactory";
25+
private final LinkedDataImportTenantService linkedDataTenantService;
26+
private final TenantScopedExecutionService tenantScopedExecutionService;
27+
private final KafkaMessageHandler<ImportResultEvent> importResultEventHandler;
28+
29+
@KafkaListener(
30+
id = LISTENER_ID,
31+
containerFactory = CONTAINER_FACTORY,
32+
groupId = "#{folioKafkaProperties.listener['import-result-event'].groupId}",
33+
concurrency = "#{folioKafkaProperties.listener['import-result-event'].concurrency}",
34+
topicPattern = "#{folioKafkaProperties.listener['import-result-event'].topicPattern}")
35+
public void handleImportOutputEvent(ConsumerRecord<String, ImportResultEvent> consumerRecord) {
36+
var event = consumerRecord.value();
37+
handleForExistedTenant(consumerRecord, event.getTs(), linkedDataTenantService, log, this::handleRecord);
38+
}
39+
40+
private void handleRecord(ConsumerRecord<String, ImportResultEvent> consumerRecord) {
41+
log.info("Processing import result event with Job ID {} and ts {}",
42+
consumerRecord.value().getJobInstanceId(), consumerRecord.value().getTs());
43+
var event = consumerRecord.value();
44+
tenantScopedExecutionService.executeAsyncWithRetry(
45+
consumerRecord.headers(),
46+
retryContext -> runRetryableJob(event, retryContext),
47+
ex -> logFailedEvent(event, ex, false)
48+
);
49+
}
50+
51+
private void runRetryableJob(ImportResultEvent event, RetryContext retryContext) {
52+
ofNullable(retryContext.getLastThrowable())
53+
.ifPresent(ex -> logFailedEvent(event, ex, true));
54+
importResultEventHandler.handle(event);
55+
}
56+
57+
private void logFailedEvent(ImportResultEvent event, Throwable ex, boolean isRetrying) {
58+
var logLevel = isRetrying ? Level.INFO : Level.ERROR;
59+
log.log(logLevel, "Failed to handle import result event with id {}. Retrying: {}",
60+
event.getTs(), isRetrying, ex);
61+
}
62+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.folio.linked.data.imprt.integration.kafka.handler;
2+
3+
import static org.folio.linked.data.imprt.batch.job.Parameters.FILE_URL;
4+
5+
import java.util.Optional;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.log4j.Log4j2;
8+
import org.folio.linked.data.imprt.domain.dto.ImportResultEvent;
9+
import org.folio.linked.data.imprt.model.entity.BatchJobExecutionParams;
10+
import org.folio.linked.data.imprt.model.mapper.ImportResultEventMapper;
11+
import org.folio.linked.data.imprt.repo.BatchJobExecutionParamsRepo;
12+
import org.folio.linked.data.imprt.repo.ImportResultEventRepo;
13+
import org.folio.linked.data.imprt.service.file.FileService;
14+
import org.springframework.stereotype.Component;
15+
16+
@Log4j2
17+
@Component
18+
@RequiredArgsConstructor
19+
public class ImportResultEventHandler implements KafkaMessageHandler<ImportResultEvent> {
20+
private static final String FILE_URL_NOT_FOUND_MESSAGE =
21+
"Job parameter [fileUrl] for jobInstanceId [%s] is not found. RDF line reading failed.";
22+
23+
private final FileService fileService;
24+
private final ImportResultEventRepo importResultEventRepo;
25+
private final ImportResultEventMapper importResultEventMapper;
26+
private final BatchJobExecutionParamsRepo batchJobExecutionParamsRepo;
27+
28+
@Override
29+
public void handle(ImportResultEvent importResultEvent) {
30+
var entity = importResultEventMapper.toEntity(importResultEvent);
31+
if (!entity.getFailedRdfLines().isEmpty()) {
32+
var fileUrl = getFileUrl(importResultEvent.getJobInstanceId());
33+
entity.getFailedRdfLines().forEach(failedLine ->
34+
failedLine.setFailedRdfLine(fileUrl.map(url -> fileService.readLineFromFile(url, failedLine.getLineNumber()))
35+
.orElse(FILE_URL_NOT_FOUND_MESSAGE.formatted(importResultEvent.getJobInstanceId()))
36+
)
37+
);
38+
}
39+
importResultEventRepo.save(entity);
40+
}
41+
42+
private Optional<String> getFileUrl(Long jobInstanceId) {
43+
return batchJobExecutionParamsRepo.findByJobInstanceIdAndParameterName(jobInstanceId, FILE_URL)
44+
.map(BatchJobExecutionParams::getParameterValue);
45+
}
46+
47+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.folio.linked.data.imprt.integration.kafka.handler;
2+
3+
public interface KafkaMessageHandler<T> {
4+
5+
void handle(T message);
6+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.folio.linked.data.imprt.model.entity;
2+
3+
import jakarta.persistence.Entity;
4+
import jakarta.persistence.Id;
5+
import jakarta.persistence.IdClass;
6+
import java.io.Serializable;
7+
import lombok.Data;
8+
import lombok.EqualsAndHashCode;
9+
10+
@Data
11+
@Entity
12+
@IdClass(BatchJobExecutionParams.BatchJobExecutionParamsId.class)
13+
@EqualsAndHashCode(of = {"jobExecutionId", "parameterName"})
14+
public class BatchJobExecutionParams {
15+
16+
@Id
17+
private Long jobExecutionId;
18+
@Id
19+
private String parameterName;
20+
private String parameterValue;
21+
22+
@Data
23+
public static class BatchJobExecutionParamsId implements Serializable {
24+
private Long jobExecutionId;
25+
private String parameterName;
26+
}
27+
}
28+

0 commit comments

Comments
 (0)