Skip to content

Commit 6ef3e19

Browse files
authored
MODLD-901: preserve initial RDF line number (#46)
1 parent 8f6fbf1 commit 6ef3e19

File tree

15 files changed

+199
-80
lines changed

15 files changed

+199
-80
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.folio.linked.data.imprt.batch.job.mapper;
2+
3+
import org.folio.linked.data.imprt.model.RdfLineWithNumber;
4+
import org.jetbrains.annotations.NotNull;
5+
import org.springframework.batch.item.file.LineMapper;
6+
7+
public class LineNumberCapturingMapper implements LineMapper<RdfLineWithNumber> {
8+
9+
@Override
10+
public @NotNull RdfLineWithNumber mapLine(@NotNull String line, int lineNumber) {
11+
return new RdfLineWithNumber(lineNumber, line);
12+
}
13+
}
14+

src/main/java/org/folio/linked/data/imprt/batch/job/processor/Rdf2LdProcessor.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
package org.folio.linked.data.imprt.batch.job.processor;
22

33
import static java.nio.charset.StandardCharsets.UTF_8;
4+
import static java.util.stream.Collectors.toCollection;
45
import static org.folio.linked.data.imprt.batch.job.Parameters.CONTENT_TYPE;
56

67
import java.io.ByteArrayInputStream;
8+
import java.util.LinkedHashSet;
79
import java.util.Set;
810
import lombok.extern.log4j.Log4j2;
9-
import org.folio.ld.dictionary.model.Resource;
11+
import org.folio.linked.data.imprt.domain.dto.ResourceWithLineNumber;
1012
import org.folio.linked.data.imprt.model.FailedRdfLine;
13+
import org.folio.linked.data.imprt.model.RdfLineWithNumber;
1114
import org.folio.linked.data.imprt.repo.FailedRdfLineRepo;
1215
import org.folio.rdf4ld.service.Rdf4LdService;
13-
import org.jetbrains.annotations.NotNull;
1416
import org.springframework.batch.core.configuration.annotation.StepScope;
1517
import org.springframework.batch.item.ItemProcessor;
1618
import org.springframework.beans.factory.annotation.Value;
19+
import org.springframework.lang.NonNull;
20+
import org.springframework.lang.Nullable;
1721
import org.springframework.stereotype.Component;
1822

1923
@Log4j2
2024
@Component
2125
@StepScope
22-
public class Rdf2LdProcessor implements ItemProcessor<String, Set<Resource>> {
26+
public class Rdf2LdProcessor implements ItemProcessor<RdfLineWithNumber, Set<ResourceWithLineNumber>> {
2327

2428
private static final String EMPTY_RESULT = "Empty result returned by rdf4ld library";
2529
private final Long jobInstanceId;
@@ -37,29 +41,36 @@ public Rdf2LdProcessor(@Value("#{jobInstanceId}") Long jobInstanceId,
3741
this.failedRdfLineRepo = failedRdfLineRepo;
3842
}
3943

40-
4144
@Override
42-
public Set<Resource> process(@NotNull String rdfLine) {
43-
log.trace("Processing RDF line of contentType[{}]: {}", contentType, rdfLine);
45+
@Nullable
46+
@SuppressWarnings("java:S2638")
47+
public Set<ResourceWithLineNumber> process(@NonNull RdfLineWithNumber rdfLineWithNumber) {
48+
var rdfLine = rdfLineWithNumber.getContent();
49+
var lineNumber = rdfLineWithNumber.getLineNumber();
50+
log.trace("Processing RDF line #{} of contentType[{}]: {}", lineNumber, contentType, rdfLine);
4451
try {
4552
var is = new ByteArrayInputStream(rdfLine.getBytes(UTF_8));
4653
var result = rdf4LdService.mapBibframe2RdfToLd(is, contentType);
4754
if (result.isEmpty()) {
48-
log.debug(EMPTY_RESULT + ", saving FailedRdfLine. JobInstanceId [{}]", jobInstanceId);
49-
saveFailedLine(rdfLine, EMPTY_RESULT);
55+
log.debug(EMPTY_RESULT + ", saving FailedRdfLine. JobInstanceId [{}], line #{}", jobInstanceId, lineNumber);
56+
saveFailedLine(lineNumber, rdfLine, EMPTY_RESULT);
5057
return null;
5158
}
52-
return result;
59+
return result.stream()
60+
.map(resource -> new ResourceWithLineNumber(lineNumber, resource))
61+
.collect(toCollection(LinkedHashSet::new));
5362
} catch (Exception e) {
54-
log.warn("Exception during processing RDF line, saving FailedRdfLine. JobInstanceId [{}]", jobInstanceId);
55-
saveFailedLine(rdfLine, e.getMessage());
63+
log.warn("Exception during processing RDF line #{}, saving FailedRdfLine. JobInstanceId [{}]", lineNumber,
64+
jobInstanceId);
65+
saveFailedLine(lineNumber, rdfLine, e.getMessage());
5666
return null;
5767
}
5868
}
5969

60-
private void saveFailedLine(String rdfLine, String message) {
70+
private void saveFailedLine(long lineNumber, String rdfLine, String message) {
6171
var frl = new FailedRdfLine()
6272
.setJobInstanceId(jobInstanceId)
73+
.setLineNumber(lineNumber)
6374
.setDescription(message)
6475
.setFailedRdfLine(rdfLine);
6576
failedRdfLineRepo.save(frl);

src/main/java/org/folio/linked/data/imprt/batch/job/writer/LdKafkaSender.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import java.util.Set;
66
import lombok.extern.log4j.Log4j2;
7-
import org.folio.ld.dictionary.model.Resource;
87
import org.folio.linked.data.imprt.domain.dto.ImportOutput;
8+
import org.folio.linked.data.imprt.domain.dto.ResourceWithLineNumber;
99
import org.folio.spring.tools.kafka.FolioMessageProducer;
1010
import org.springframework.batch.core.configuration.annotation.StepScope;
1111
import org.springframework.batch.item.Chunk;
@@ -17,7 +17,7 @@
1717
@Log4j2
1818
@Component
1919
@StepScope
20-
public class LdKafkaSender implements ItemWriter<Set<Resource>> {
20+
public class LdKafkaSender implements ItemWriter<Set<ResourceWithLineNumber>> {
2121

2222
private final Long jobInstanceId;
2323
private final FolioMessageProducer<ImportOutput> importOutputFolioMessageProducer;
@@ -33,9 +33,10 @@ public LdKafkaSender(@Value("#{jobInstanceId}") Long jobInstanceId,
3333

3434

3535
@Override
36-
public void write(Chunk<? extends Set<Resource>> chunk) {
36+
public void write(Chunk<? extends Set<ResourceWithLineNumber>> chunk) {
3737
var messages = chunked(chunk.getItems().stream().flatMap(Set::stream), chunkSize)
38-
.map(resources -> new ImportOutput(resources).jobInstanceId(jobInstanceId))
38+
.map(resourcesWithLineNumbers ->
39+
new ImportOutput(resourcesWithLineNumbers).jobInstanceId(jobInstanceId))
3940
.toList();
4041
importOutputFolioMessageProducer.sendMessages(messages);
4142
}

src/main/java/org/folio/linked/data/imprt/config/BatchConfig.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import java.io.File;
99
import java.util.Set;
1010
import javax.sql.DataSource;
11-
import org.folio.ld.dictionary.model.Resource;
11+
import org.folio.linked.data.imprt.batch.job.mapper.LineNumberCapturingMapper;
1212
import org.folio.linked.data.imprt.batch.job.processor.Rdf2LdProcessor;
1313
import org.folio.linked.data.imprt.batch.job.writer.LdKafkaSender;
14+
import org.folio.linked.data.imprt.domain.dto.ResourceWithLineNumber;
15+
import org.folio.linked.data.imprt.model.RdfLineWithNumber;
1416
import org.springframework.batch.core.Job;
1517
import org.springframework.batch.core.Step;
1618
import org.springframework.batch.core.configuration.annotation.StepScope;
@@ -21,9 +23,8 @@
2123
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
2224
import org.springframework.batch.core.step.builder.StepBuilder;
2325
import org.springframework.batch.core.step.tasklet.Tasklet;
24-
import org.springframework.batch.item.file.FlatFileItemReader;
2526
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
26-
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
27+
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
2728
import org.springframework.beans.factory.annotation.Value;
2829
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
2930
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -121,13 +122,13 @@ public TaskExecutor processFileTaskExecutor(
121122
@Bean
122123
public Step processFileStep(JobRepository jobRepository,
123124
PlatformTransactionManager transactionManager,
124-
FlatFileItemReader<String> rdfLineItemReader,
125+
SynchronizedItemStreamReader<RdfLineWithNumber> rdfLineItemReader,
125126
Rdf2LdProcessor rdf2LdProcessor,
126127
LdKafkaSender ldKafkaSender,
127128
@Value("${mod-linked-data-import.chunk-size}") int chunkSize,
128129
TaskExecutor processFileTaskExecutor) {
129130
return new StepBuilder("processFileStep", jobRepository)
130-
.<String, Set<Resource>>chunk(chunkSize, transactionManager)
131+
.<RdfLineWithNumber, Set<ResourceWithLineNumber>>chunk(chunkSize, transactionManager)
131132
.reader(rdfLineItemReader)
132133
.processor(rdf2LdProcessor)
133134
.writer(ldKafkaSender)
@@ -137,13 +138,18 @@ public Step processFileStep(JobRepository jobRepository,
137138

138139
@Bean
139140
@StepScope
140-
public FlatFileItemReader<String> rdfLineItemReader(@Value("#{jobParameters['" + FILE_URL + "']}") String fileUrl) {
141+
public SynchronizedItemStreamReader<RdfLineWithNumber> rdfLineItemReader(
142+
@Value("#{jobParameters['" + FILE_URL + "']}") String fileUrl
143+
) {
141144
var file = new File(TMP_DIR, extractFileName(fileUrl));
142-
return new FlatFileItemReaderBuilder<String>()
145+
var flatFileReader = new FlatFileItemReaderBuilder<RdfLineWithNumber>()
143146
.name("lineItemReader")
144147
.resource(new org.springframework.core.io.FileSystemResource(file))
145-
.lineMapper(new PassThroughLineMapper())
148+
.lineMapper(new LineNumberCapturingMapper())
146149
.build();
150+
var synchronizedReader = new SynchronizedItemStreamReader<RdfLineWithNumber>();
151+
synchronizedReader.setDelegate(flatFileReader);
152+
return synchronizedReader;
147153
}
148154

149155
@Bean

src/main/java/org/folio/linked/data/imprt/model/FailedRdfLine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class FailedRdfLine {
2424
@GeneratedValue(strategy = SEQUENCE, generator = FAILED_RDF_LINE_SEQ_GEN)
2525
private Long id;
2626
private Long jobInstanceId;
27+
private Long lineNumber;
2728
private String failedRdfLine;
2829
private String description;
2930

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.folio.linked.data.imprt.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
6+
@Data
7+
@AllArgsConstructor
8+
public class RdfLineWithNumber {
9+
10+
private long lineNumber;
11+
private String content;
12+
13+
}

src/main/java/org/folio/linked/data/imprt/util/CollectionUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.folio.linked.data.imprt.util;
22

3-
import static java.util.HashSet.newHashSet;
3+
import static java.util.LinkedHashSet.newLinkedHashSet;
44

55
import java.util.Iterator;
66
import java.util.Set;
@@ -21,7 +21,7 @@ public boolean hasNext() {
2121

2222
@Override
2323
public Set<T> next() {
24-
Set<T> chunk = newHashSet(chunkSize);
24+
Set<T> chunk = newLinkedHashSet(chunkSize);
2525
int count = 0;
2626
while (count < chunkSize && sourceIterator.hasNext()) {
2727
chunk.add(sourceIterator.next());

src/main/resources/changelog/scripts/v-1.0.0/metadata/tables/2_failed_rdf_line.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<constraints nullable="false" primaryKey="true" primaryKeyName="pk_failed_rdf_line"/>
3232
</column>
3333
<column name="job_instance_id" type="BIGINT"/>
34+
<column name="line_number" type="BIGINT"/>
3435
<column name="failed_rdf_line" type="TEXT"/>
3536
<column name="description" type="TEXT"/>
3637
</createTable>

src/main/resources/swagger.api/kafka/importOutput.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
"$schema": "http://json-schema.org/draft-04/schema#",
44
"description": "Linked Data Import result DTO",
55
"properties": {
6-
"resources": {
7-
"description": "Array of imported resources",
6+
"resourcesWithLineNumbers": {
7+
"description": "Array of imported resources with their line numbers",
88
"type": "array",
99
"uniqueItems": true,
1010
"items": {
1111
"type": "object",
12-
"$ref": "resourcePlaceholder.json"
12+
"$ref": "resourceWithLineNumber.json"
1313
}
1414
},
1515
"jobInstanceId": {
@@ -27,5 +27,5 @@
2727
}
2828
},
2929
"x-implements": "org.folio.spring.tools.kafka.BaseKafkaMessage",
30-
"required": ["resources"]
30+
"required": ["resourcesWithLineNumbers"]
3131
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"type": "object",
3+
"$schema": "http://json-schema.org/draft-04/schema#",
4+
"description": "Resource with its line number from the RDF file",
5+
"properties": {
6+
"lineNumber": {
7+
"description": "Line number from the RDF file",
8+
"type": "integer",
9+
"format": "int64"
10+
},
11+
"resource": {
12+
"description": "Mapped resource",
13+
"type": "object",
14+
"$ref": "resourcePlaceholder.json"
15+
}
16+
},
17+
"required": ["lineNumber", "resource"]
18+
}
19+

0 commit comments

Comments
 (0)