Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Metrics

`{prefix}` can be `read` or `write` but doesn't automatically mean that metric will appear in both.
`{prefix}` can be:
- `read` or `write` for the `api.cache-path` endpoints (it doesn't automatically mean that metric will appear in both)
- `module_storage.read` or `module_storage.write` for the `api.storage-path` endpoints (it doesn't automatically mean that metric will appear in both)

## The List of Metrics
- `pbc.{prefix}.request.duration` - the time it took to store or retrieve creative in ns.
- `pbc.{prefix}.request` - the count of incoming requests.
- `pbc.{prefix}.trustedRequest` - the count of incoming requests carrying a matching API key.
Expand All @@ -14,8 +17,10 @@
- `pbc.{prefix}.err.db` - the count of storage backend errors.
- `pbc.{prefix}.json` - the count of JSON (banner) creatives.
- `pbc.{prefix}.xml` - the count of XML (video) creatives.
- `pbc.{prefix}.text` - the count of TEXT (module storage entries) creatives.
- `pbc.err.secondaryWrite` - the count of secondary write errors.
- `pbc.err.existingId` - the count of errors due to existing UUID key in the storage backend.
- `pbc.err.existingId` - the count of errors due to existing UUID key in the storage backend
- `pbc.${prefix}.err.duplicateId` - the count of errors due to existing UUID key in the storage backend (applicable only for `write` or `module_storage.write`)
- `pbc.err.rejectedExternalId` - the count of rejected writes due to specifying external UUID not being allowed.
- `pbc.proxy.success` - the count of successful proxying requests.
- `pbc.proxy.failure` - the count of failed proxying requests.
4 changes: 2 additions & 2 deletions src/main/java/org/prebid/cache/config/RepositoryConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.prebid.cache.repository.CircuitBreakerSecuredReactiveRepository;
import org.prebid.cache.repository.ReactiveRepository;
import org.prebid.cache.repository.TimeOutCapableReactiveRepository;
import org.prebid.cache.repository.aerospike.AerospikePropertyConfiguration;
import org.prebid.cache.repository.aerospike.AerospikeConfigurationProperties;
import org.prebid.cache.repository.aerospike.AerospikeRepositoryImpl;
import org.prebid.cache.repository.ignite.IgniteRepositoryImpl;
import org.prebid.cache.repository.redis.RedisRepositoryImpl;
Expand All @@ -35,7 +35,7 @@ ReactiveRepository<PayloadWrapper, String> redisRepository(

@Bean
@ConditionalOnProperty(prefix = "spring.aerospike", name = {"host"})
ReactiveRepository<PayloadWrapper, String> aerospikeRepository(AerospikePropertyConfiguration configuration,
ReactiveRepository<PayloadWrapper, String> aerospikeRepository(AerospikeConfigurationProperties configuration,
AerospikeClient client,
EventLoops eventLoops,
Policy policy) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package org.prebid.cache.handlers.storage;

import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.exceptions.BadRequestException;
import org.prebid.cache.exceptions.ResourceNotFoundException;
import org.prebid.cache.repository.redis.module.storage.ModuleCompositeRepository;
import org.prebid.cache.exceptions.UnsupportedMediaTypeException;
import org.prebid.cache.handlers.PayloadType;
import org.prebid.cache.metrics.MeasurementTag;
import org.prebid.cache.metrics.MetricsRecorder;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.module.storage.ModuleCompositeRepository;
import org.prebid.cache.routers.ApiConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
Expand All @@ -16,37 +21,71 @@
import static org.springframework.web.reactive.function.BodyInserters.fromValue;

@Component
@RequiredArgsConstructor
public class GetStorageHandler {

private static final String API_KEY_HEADER = "x-pbc-api-key";

private static final String KEY = "k";
private static final String APPLICATION = "a";
private static final String MODULE_STORAGE_READ_METRIC_PREFIX = "module_storage.read";

private final ModuleCompositeRepository moduleRepository;
private final PrebidServerResponseBuilder responseBuilder;
private final ApiConfig apiConfig;
private final StorageMetricsRecorder metricsRecorder;

@Autowired
public GetStorageHandler(ModuleCompositeRepository moduleRepository,
PrebidServerResponseBuilder responseBuilder,
ApiConfig apiConfig,
MetricsRecorder metricsRecorder) {

this.moduleRepository = moduleRepository;
this.apiConfig = apiConfig;
this.metricsRecorder = new StorageMetricsRecorder(
responseBuilder, metricsRecorder, MODULE_STORAGE_READ_METRIC_PREFIX);
}

public Mono<ServerResponse> fetch(final ServerRequest request) {
if (!isApiKeyValid(request)) {
metricsRecorder.recordMetric(MeasurementTag.ERROR_UNAUTHORIZED);
return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
}

final String key = request.queryParam(KEY).orElse(null);
final String application = request.queryParam(APPLICATION).orElse(null);

if (key == null || application == null) {
metricsRecorder.recordMetric(MeasurementTag.ERROR_BAD_REQUEST);
return Mono.error(new BadRequestException("Invalid parameters: key and application are required"));
}

metricsRecorder.recordMetric(MeasurementTag.REQUEST);
final var timerContext = metricsRecorder.createRequestTimer();
return moduleRepository.findById(application, key)
.flatMap(value -> ServerResponse.ok().body(fromValue(value.getPayload())))
.flatMap(this::createServerResponse)
.switchIfEmpty(Mono.error(new ResourceNotFoundException("Invalid application or key")))
.onErrorResume(error -> responseBuilder.error(Mono.just(error), request));
.onErrorResume(error -> metricsRecorder.handleErrorMetrics(error, request))
.doOnEach(signal -> {
if (timerContext != null)
timerContext.stop();
});
}

private boolean isApiKeyValid(final ServerRequest request) {
return StringUtils.equals(request.headers().firstHeader(API_KEY_HEADER), apiConfig.getApiKey());
}

private Mono<ServerResponse> createServerResponse(final PayloadWrapper wrapper) {
if (wrapper.getPayload().getType().equals(PayloadType.JSON.toString())) {
metricsRecorder.recordMetric(MeasurementTag.JSON);
return ServerResponse.ok().body(fromValue(wrapper.getPayload()));
} else if (wrapper.getPayload().getType().equals(PayloadType.XML.toString())) {
metricsRecorder.recordMetric(MeasurementTag.XML);
return ServerResponse.ok().body(fromValue(wrapper.getPayload()));
} else if (wrapper.getPayload().getType().equals(PayloadType.TEXT.toString())) {
metricsRecorder.recordMetric(MeasurementTag.TEXT);
return ServerResponse.ok().body(fromValue(wrapper.getPayload()));
}

return Mono.error(new UnsupportedMediaTypeException("Unsupported Media Type."));
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package org.prebid.cache.handlers.storage;

import jakarta.validation.Validator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.config.StorageConfig;
import org.prebid.cache.exceptions.BadRequestException;
import org.prebid.cache.model.StoragePayload;
import org.prebid.cache.metrics.MeasurementTag;
import org.prebid.cache.metrics.MetricsRecorder;
import org.prebid.cache.model.Payload;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.redis.module.storage.ModuleCompositeRepository;
import org.prebid.cache.model.StoragePayload;
import org.prebid.cache.repository.module.storage.ModuleCompositeRepository;
import org.prebid.cache.routers.ApiConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
Expand All @@ -26,22 +28,42 @@

@Slf4j
@Component
@RequiredArgsConstructor
public class PostStorageHandler {

private static final String API_KEY_HEADER = "x-pbc-api-key";
private static final String MODULE_STORAGE_WRITE_METRIC_PREFIX = "module_storage.write";

private final Validator validator;
private final ModuleCompositeRepository moduleRepository;
private final PrebidServerResponseBuilder responseBuilder;
private final ApiConfig apiConfig;
private final StorageConfig storageConfig;
private final StorageMetricsRecorder metricsRecorder;

@Autowired
public PostStorageHandler(Validator validator,
ModuleCompositeRepository moduleRepository,
PrebidServerResponseBuilder responseBuilder,
ApiConfig apiConfig,
StorageConfig storageConfig,
MetricsRecorder metricsRecorder) {

this.validator = validator;
this.moduleRepository = moduleRepository;
this.apiConfig = apiConfig;
this.storageConfig = storageConfig;
this.metricsRecorder = new StorageMetricsRecorder(
responseBuilder, metricsRecorder, MODULE_STORAGE_WRITE_METRIC_PREFIX);
}

public Mono<ServerResponse> save(final ServerRequest request) {
if (!isApiKeyValid(request)) {
metricsRecorder.recordMetric(MeasurementTag.ERROR_UNAUTHORIZED);
return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
}

metricsRecorder.recordMetric(MeasurementTag.REQUEST);
final var timerContext = metricsRecorder.createRequestTimer();

return request.body(BodyExtractors.toMono(StoragePayload.class))
.switchIfEmpty(Mono.error(new BadRequestException("Empty body")))
.handle(this::validateModulePayload)
Expand All @@ -50,7 +72,11 @@ public Mono<ServerResponse> save(final ServerRequest request) {
mapToPayloadWrapper(storagePayload)))
.subscribeOn(Schedulers.parallel())
.flatMap(ignored -> ServerResponse.noContent().build())
.onErrorResume(error -> responseBuilder.error(Mono.just(error), request));
.onErrorResume(error -> metricsRecorder.handleErrorMetrics(error, request))
.doOnEach(signal -> {
if (timerContext != null)
timerContext.stop();
});
}

private boolean isApiKeyValid(final ServerRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.prebid.cache.handlers.storage;

import lombok.AllArgsConstructor;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.exceptions.BadRequestException;
import org.prebid.cache.exceptions.DuplicateKeyException;
import org.prebid.cache.exceptions.RepositoryException;
import org.prebid.cache.exceptions.ResourceNotFoundException;
import org.prebid.cache.metrics.MeasurementTag;
import org.prebid.cache.metrics.MetricsRecorder;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeoutException;

@AllArgsConstructor
public class StorageMetricsRecorder {

private final PrebidServerResponseBuilder responseBuilder;
private final MetricsRecorder metricsRecorder;
private final String prefix;

public Mono<ServerResponse> handleErrorMetrics(final Throwable error, final ServerRequest request) {
if (error instanceof DuplicateKeyException) {
recordMetric(MeasurementTag.ERROR_DUPLICATE_KEY);
} else if (error instanceof RepositoryException) {
recordMetric(MeasurementTag.ERROR_DB);
} else if (error instanceof ResourceNotFoundException || error instanceof BadRequestException) {
recordMetric(MeasurementTag.ERROR_BAD_REQUEST);
} else if (error instanceof TimeoutException) {
recordMetric(MeasurementTag.ERROR_TIMED_OUT);
} else {
recordMetric(MeasurementTag.ERROR_UNKNOWN);
}

return responseBuilder.error(Mono.just(error), request)
.doOnNext(response -> handleErrorStatusCodes(request, response));
}

private void handleErrorStatusCodes(ServerRequest request, ServerResponse response) {
final HttpMethod method = request.method();
if (method == null || response == null) {
recordMetric(MeasurementTag.ERROR_UNKNOWN);
} else if (response.statusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
recordMetric(MeasurementTag.ERROR_UNKNOWN);
} else if (response.statusCode() == HttpStatus.BAD_REQUEST) {
recordMetric(MeasurementTag.ERROR_BAD_REQUEST);
} else if (response.statusCode() == HttpStatus.NOT_FOUND) {
recordMetric(MeasurementTag.ERROR_MISSING_ID);
}
}

public void recordMetric(MeasurementTag tag) {
metricsRecorder.markMeterForTag(prefix, tag);
}

public MetricsRecorder.MetricsRecorderTimer createRequestTimer() {
return metricsRecorder.createRequestTimerForTag(prefix);
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/prebid/cache/metrics/MeasurementTag.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ public enum MeasurementTag {
ERROR_UNAUTHORIZED("pbc.${prefix}.err.unauthorized"),
ERROR_DB("pbc.${prefix}.err.db"),
JSON("pbc.${prefix}.json"),
TEXT("pbc.${prefix}.text"),
XML("pbc.${prefix}.xml"),
ERROR_SECONDARY_WRITE("pbc.err.secondaryWrite"),
ERROR_EXISTING_ID("pbc.err.existingId"),
ERROR_DUPLICATE_KEY("pbc.${prefix}.err.duplicateId"),
ERROR_REJECTED_EXTERNAL_ID("pbc.err.rejectedExternalId"),
PROXY_SUCCESS("pbc.proxy.success"),
PROXY_FAILURE("pbc.proxy.failure"),
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/prebid/cache/metrics/MetricsRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void markMeterForTag(final String prefix, final MeasurementTag measuremen
meterForTag(prefix, measurementTag).increment();
}

public MetricsRecorderTimer createRequestTimerForTag(final String prefix) {
return new MetricsRecorderTimer(
MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, prefix));
}

public MetricsRecorderTimer createRequestTimerForServiceType(final ServiceType serviceType) {
if (serviceType.equals(ServiceType.FETCH)) {
return new MetricsRecorderTimer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.prebid.cache.repository.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(prefix = "spring.aerospike", name = {"host"})
public class AerospikeConfiguration {

@Bean
@ConfigurationProperties(prefix = "spring.aerospike")
public AerospikeConfigurationProperties aerospikeConfigurationProperties() {
return new AerospikeConfigurationProperties();
}

@Bean
Policy readPolicy(AerospikeConfigurationProperties properties) {
final Policy policy = new Policy();
policy.setConnectTimeout(properties.getConnectTimeout());
policy.setTimeouts(properties.getSocketTimeout(), properties.getTotalTimeout());
policy.setReplica(properties.getReadPolicy());
return policy;
}

@Bean
EventPolicy eventPolicy() {
return new EventPolicy();
}

@Bean
EventLoopGroup eventGroup(AerospikeConfigurationProperties properties) {
return new NioEventLoopGroup(properties.getCores());
}

@Bean
EventLoops eventLoops(EventLoopGroup eventGroup) {
return new NettyEventLoops(eventPolicy(), eventGroup);
}

@Bean(destroyMethod = "close")
AerospikeClient client(AerospikeConfigurationProperties properties, EventLoops eventLoops) {
final ClientPolicy clientPolicy = clientPolicy(properties, eventLoops);
final String host = properties.getHost();
return AerospikeConfigurationProperties.isAerospikeCluster(host)
? new AerospikeClient(clientPolicy, AerospikeConfigurationProperties.extractHosts(host))
: new AerospikeClient(clientPolicy, host, properties.getPort());
}

private ClientPolicy clientPolicy(AerospikeConfigurationProperties properties, EventLoops eventLoops) {
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.setEventLoops(eventLoops);
clientPolicy.setMinConnsPerNode(properties.getMinConnsPerNode());
clientPolicy.setMaxConnsPerNode(properties.getMaxConnsPerNode());
return clientPolicy;
}
}
Loading