Skip to content

Commit

Permalink
Add Prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Feb 23, 2018
1 parent e43b4d9 commit f408ecc
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.github.bsideup.liiklus.positions;

import lombok.Value;
import org.reactivestreams.Publisher;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
Expand All @@ -9,4 +12,15 @@ public interface PositionsStorage {
CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions);

CompletionStage<Void> update(String topic, String groupId, int partition, long position);

Publisher<Positions> findAll();

@Value
class Positions {
String topic;

String groupId;

Map<Integer, Long> values;
}
}
3 changes: 3 additions & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ dependencies {
compile 'org.springframework.boot:spring-boot-starter-actuator'
compile 'org.springframework.boot:spring-boot-starter-web'

compile 'io.micrometer:micrometer-spring-legacy:latest.release'
compile 'io.micrometer:micrometer-registry-prometheus:latest.release'

testCompileOnly 'org.projectlombok:lombok'
testCompile 'org.springframework.boot:spring-boot-starter-test'
testCompile 'org.testcontainers:kafka'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.bsideup.liiklus.monitoring;

import com.github.bsideup.liiklus.positions.PositionsStorage;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.GaugeMetricFamily;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

@Component
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true)
@Slf4j
public class MetricsCollector extends Collector {

CollectorRegistry collectorRegistry;

PositionsStorage positionsStorage;

@PostConstruct
public void init() {
// TODO use MicroMeter abstractions
collectorRegistry.register(this);
}

@Override
public List<MetricFamilySamples> collect() {
return Flux.from(positionsStorage.findAll())
.<MetricFamilySamples>map(positions -> {
val gauge = new GaugeMetricFamily("liiklus_topic_position", "", Arrays.asList("topic", "groupId", "partition"));

for (val entry : positions.getValues().entrySet()) {
gauge.addMetric(Arrays.asList(positions.getTopic(), positions.getGroupId(), entry.getKey().toString()), entry.getValue().doubleValue());
}

return gauge;
})
.collectList()
.block();
}
}
6 changes: 5 additions & 1 deletion app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ spring:
enabled: false

cache:
type: NONE
type: NONE

endpoints:
prometheus:
sensitive: false
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import com.github.bsideup.liiklus.dynamodb.DynamoDBPositionsStorage;
import com.github.bsideup.liiklus.dynamodb.config.DynamoDBConfiguration.DynamoDBProperties;
import lombok.SneakyThrows;
import org.springframework.context.annotation.Profile;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;

import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Profile("test")
@org.springframework.boot.test.context.TestConfiguration
@Order(Ordered.HIGHEST_PRECEDENCE)
public class TestConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.val;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.stream.Collectors;

Expand All @@ -34,6 +34,48 @@ public class DynamoDBPositionsStorage implements PositionsStorage {

String tableName;

@Override
public Publisher<Positions> findAll() {
val request = new ScanRequest(tableName);

AtomicBoolean done = new AtomicBoolean(false);

return Mono
.<List<Map<String, AttributeValue>>>create(sink -> dynamoDB.scanAsync(request, new AsyncHandler<ScanRequest, ScanResult>() {
@Override
public void onError(Exception exception) {
sink.error(exception);
}

@Override
public void onSuccess(ScanRequest request, ScanResult scanResult) {
try {
if (scanResult.getCount() <= 0) {
done.set(true);
sink.success();
} else {
if (scanResult.getLastEvaluatedKey() == null || scanResult.getLastEvaluatedKey().isEmpty()) {
done.set(true);
}
sink.success(scanResult.getItems());
}
} catch (Exception e) {
sink.error(e);
}
}
}))
.repeat(() -> !done.get())
.flatMapIterable(it -> it)
.map(item -> new Positions(
item.get("topic").getS(),
item.get("groupId").getS(),
item.get("positions").getM().entrySet().stream().collect(Collectors.toMap(
it -> Integer.parseInt(it.getKey()),
it -> Long.parseLong(it.getValue().getN())
))
));
}

@Override
public CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions) {
val request = new GetItemRequest()
Expand Down

0 comments on commit f408ecc

Please sign in to comment.