Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interactive Queries service #141

Closed
loicgreffier opened this issue Dec 1, 2023 · 0 comments · Fixed by #204
Closed

Interactive Queries service #141

loicgreffier opened this issue Dec 1, 2023 · 0 comments · Fixed by #204
Assignees
Labels
enhancement This issue or pull request improves a feature feature This issue adds a new feature

Comments

@loicgreffier
Copy link
Collaborator

Kstreamplify is providing the necessary parameters to configure the application.server property required to implement interactive queries.

Unfortunately, Kstreamplify does not provide anything to simplify the implementation of interactive queries.

The purpose of this improvement proposal is to simplify the implementation of interactive queries, including the RPC (Remote Procedure Call).

Option #1

Provide an InteractiveQueriesService that can request a store and handle the RPC. Here is an implementation example using Spring Boot and KafkaPerson Avro:

@Slf4j
@Service
public class InteractiveQueriesService {
    @Autowired
    private KafkaStreamsInitializer kafkaStreamsInitializer;

    private final RestTemplate restTemplate = new RestTemplate();

    public KafkaPerson getByKey(String key) {
        final var host = getHostByStoreAndKey(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
                key, new StringSerializer());

        if (isNotCurrentHost(host)) {
            log.info("The key {} has been located on another instance ({}:{})", key,
                    host.host(), host.port());

            return getRecordOnOtherInstance(host, "/api/v1/rpc/persons/" + key);
        }

        log.info("The key {} has been located on this instance ({}:{})", key,
                host.host(), host.port());

        final ReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
                StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
                        QueryableStoreTypes.keyValueStore()));

        KafkaPerson person = store.get(key);
        if (person == null) {
            log.info("No person found for the key {}", key);
            return null;
        }

        return person;
    }

    public List<KafkaPerson> getAll() {
        final List<HostInfo> hosts = getHostsByStore(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString());

        if (hosts.isEmpty()) {
            log.info("No host found for the given state store {}", PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE);
            return Collections.emptyList();
        }

        List<KafkaPerson> persons = new ArrayList<>();
        hosts.forEach(host -> {
            if (isNotCurrentHost(host)) {
                log.info("Fetching data on other instance ({}:{})", host.host(), host.port());
                persons.addAll(getAllOnOtherInstance(host, "api/v1/rpc/persons/instance"));
            } else {
                log.info("Fetching data on this instance ({}:{})", host.host(), host.port());
                persons.addAll(getAllOnCurrentInstance()
                        .stream()
                        .map(entry -> entry.value)
                        .toList());
            }
        });

        return persons;
    }

    public List<KeyValue<String, KafkaPerson>> getAllOnCurrentInstance() {
        final ReadOnlyKeyValueStore<String, KafkaPerson> store = kafkaStreamsInitializer.getKafkaStreams().store(
                StoreQueryParameters.fromNameAndType(PERSON_INTERACTIVE_QUERIES_RPC_STATE_STORE.toString(),
                        QueryableStoreTypes.keyValueStore()));

        List<KeyValue<String, KafkaPerson>> results = new ArrayList<>();
        try (KeyValueIterator<String, KafkaPerson> iterator = store.all()) {
            while (iterator.hasNext()) {
                results.add(iterator.next());
            }
        }

        return results;
    }

    private KafkaPerson getRecordOnOtherInstance(HostInfo host, String endpointPath) {
        try {
            return restTemplate
                    .getForEntity(String.format("https://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson.class)
                    .getBody();
        } catch (RestClientException e) {
            log.info("The other instance ({}:{}) threw an error. Cannot retrieve the entity", host.host(), host.port(), e);
            return null;
        }
    }

    private List<KafkaPerson> getAllOnOtherInstance(HostInfo host, String endpointPath) {
        KafkaPerson[] persons = restTemplate
                .getForEntity(String.format("https://%s:%d/%s", host.host(), host.port(), endpointPath), KafkaPerson[].class)
                .getBody();

        if (persons == null) {
            return Collections.emptyList();
        }

        return Arrays.asList(persons);
    }

    public List<HostInfo> getHostsByStore(final String store) {
        final Collection<StreamsMetadata> metadata = kafkaStreamsInitializer
                .getKafkaStreams()
                .streamsMetadataForStore(store);

        if (metadata == null || metadata.isEmpty()) {
            return Collections.emptyList();
        }

        return metadata
                .stream()
                .map(StreamsMetadata::hostInfo)
                .toList();
    }

    public <K> HostInfo getHostByStoreAndKey(final String store, final K key, final Serializer<K> serializer) {
        final KeyQueryMetadata metadata = kafkaStreamsInitializer
                .getKafkaStreams()
                .queryMetadataForKey(store, key, serializer);

        if (metadata == null) {
            return null;
        }

        return metadata.activeHost();
    }

    /**
     * Compare the given host to the host of the current KStream instance
     * @param compareHostInfo The host information to compare
     * @return True if the hosts are different, false if equals
     */
    private boolean isNotCurrentHost(HostInfo compareHostInfo) {
        return !kafkaStreamsInitializer.getHostInfo().host().equals(compareHostInfo.host())
                || kafkaStreamsInitializer.getHostInfo().port() != compareHostInfo.port();
    }
}

➡ The implementation needs to be split between kstreamplify-core and kstreamplify-spring-boot. The Spring Boot module should reuse the core implementation as much as possible.
➡ The KafkaPerson is an example and needs to be templatized.
➡ The name of the store is hardcoded here and needs to be passed as parameter as well as the endpoint path to call on other instance.
restTemplate is coming from Spring Boot and probably needs to be changed as the service will be implemented in the core-module.

💡 Expected result

As a result, I want to be able to inject the service in my project:

@Autowired
private InteractiveQueryService interactiveQueryService;

...

interactiveQueryService.getByKey(store, key, path);
interactiveQueryService.getAll(store, path);
interactiveQueryService.getAllOnCurrentInstance(store);
@loicgreffier loicgreffier added enhancement This issue or pull request improves a feature feature This issue adds a new feature labels Dec 1, 2023
@loicgreffier loicgreffier self-assigned this Dec 21, 2023
@loicgreffier loicgreffier linked a pull request May 14, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement This issue or pull request improves a feature feature This issue adds a new feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant