You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Provide an InteractiveQueriesService that can request a store and handle the RPC. Here is an implementation example using Spring Boot and KafkaPerson Avro:
@Slf4j@ServicepublicclassInteractiveQueriesService {
@AutowiredprivateKafkaStreamsInitializerkafkaStreamsInitializer;
privatefinalRestTemplaterestTemplate = newRestTemplate();
publicKafkaPersongetByKey(Stringkey) {
finalvarhost = getHostByStoreAndKey(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
key, newStringSerializer());
if (isNotCurrentHost(host)) {
log.info("The key {} has been located on another instance ({}:{})", key,
host.host(), host.port());
returngetRecordOnOtherInstance(host, "/api/v1/rpc/persons/" + key);
}
log.info("The key {} has been located on this instance ({}:{})", key,
host.host(), host.port());
finalReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
QueryableStoreTypes.keyValueStore()));
KafkaPersonperson = store.get(key);
if (person == null) {
log.info("No person found for the key {}", key);
returnnull;
}
returnperson;
}
publicList<KafkaPerson> getAll() {
finalList<HostInfo> hosts = getHostsByStore(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString());
if (hosts.isEmpty()) {
log.info("No host found for the given state store {}", PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE);
returnCollections.emptyList();
}
List<KafkaPerson> persons = newArrayList<>();
hosts.forEach(host -> {
if (isNotCurrentHost(host)) {
log.info("Fetching data on other instance ({}:{})", host.host(), host.port());
persons.addAll(getAllOnOtherInstance(host, "api/v1/rpc/persons/instance"));
} else {
log.info("Fetching data on this instance ({}:{})", host.host(), host.port());
persons.addAll(getAllOnCurrentInstance()
.stream()
.map(entry -> entry.value)
.toList());
}
});
returnpersons;
}
publicList<KeyValue<String, KafkaPerson>> getAllOnCurrentInstance() {
finalReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
QueryableStoreTypes.keyValueStore()));
List<KeyValue<String, KafkaPerson>> results = newArrayList<>();
try (KeyValueIterator<String, KafkaPerson> iterator = store.all()) {
while (iterator.hasNext()) {
results.add(iterator.next());
}
}
returnresults;
}
privateKafkaPersongetRecordOnOtherInstance(HostInfohost, StringendpointPath) {
try {
returnrestTemplate
.getForEntity(String.format("https://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson.class)
.getBody();
} catch (RestClientExceptione) {
log.info("The other instance ({}:{}) threw an error. Cannot retrieve the entity", host.host(), host.port(), e);
returnnull;
}
}
privateList<KafkaPerson> getAllOnOtherInstance(HostInfohost, StringendpointPath) {
KafkaPerson[] persons = restTemplate
.getForEntity(String.format("https://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson[].class)
.getBody();
if (persons == null) {
returnCollections.emptyList();
}
returnArrays.asList(persons);
}
publicList<HostInfo> getHostsByStore(finalStringstore) {
finalCollection<StreamsMetadata> metadata = kafkaStreamsInitializer
.getKafkaStreams()
.streamsMetadataForStore(store);
if (metadata == null || metadata.isEmpty()) {
returnCollections.emptyList();
}
returnmetadata
.stream()
.map(StreamsMetadata::hostInfo)
.toList();
}
public <K> HostInfogetHostByStoreAndKey(finalStringstore, finalKkey, finalSerializer<K> serializer) {
finalKeyQueryMetadatametadata = kafkaStreamsInitializer
.getKafkaStreams()
.queryMetadataForKey(store, key, serializer);
if (metadata == null) {
returnnull;
}
returnmetadata.activeHost();
}
/** * Compare the given host to the host of the current KStream instance * @param compareHostInfo The host information to compare * @return True if the hosts are different, false if equals */privatebooleanisNotCurrentHost(HostInfocompareHostInfo) {
return !kafkaStreamsInitializer.getHostInfo().host().equals(compareHostInfo.host())
|| kafkaStreamsInitializer.getHostInfo().port() != compareHostInfo.port();
}
}
➡ The implementation needs to be split between kstreamplify-core and kstreamplify-spring-boot. The Spring Boot module should reuse the core implementation as much as possible.
➡ The KafkaPerson is an example and needs to be templatized.
➡ The name of the store is hardcoded here and needs to be passed as parameter as well as the endpoint path to call on other instance.
➡ restTemplate is coming from Spring Boot and probably needs to be changed as the service will be implemented in the core-module.
💡 Expected result
As a result, I want to be able to inject the service in my project:
Kstreamplify is providing the necessary parameters to configure the
application.server
property required to implement interactive queries.Unfortunately, Kstreamplify does not provide anything to simplify the implementation of interactive queries.
The purpose of this improvement proposal is to simplify the implementation of interactive queries, including the RPC (Remote Procedure Call).
Option #1
Provide an
InteractiveQueriesService
that can request a store and handle the RPC. Here is an implementation example using Spring Boot andKafkaPerson
Avro:➡ The implementation needs to be split between
kstreamplify-core
andkstreamplify-spring-boot
. The Spring Boot module should reuse the core implementation as much as possible.➡ The
KafkaPerson
is an example and needs to be templatized.➡ The name of the store is hardcoded here and needs to be passed as parameter as well as the endpoint path to call on other instance.
➡
restTemplate
is coming from Spring Boot and probably needs to be changed as the service will be implemented in the core-module.💡 Expected result
As a result, I want to be able to inject the service in my project:
The text was updated successfully, but these errors were encountered: