RealtimeTrigger
RealtimeTrigger
type: "io.kestra.plugin.pulsar.RealtimeTrigger"
Consume a message in real-time from Pulsar topics and create one execution per message.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.pulsar.Trigger instead.
Examples
Consume a message from a Pulsar topic in real-time.
id: pulsar
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.pulsar.RealtimeTrigger
topic: kestra_trigger
uri: pulsar:https://localhost:26650
deserializer: JSON
subscriptionName: kestra_trigger_sub
Properties
deserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
initialPosition
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Earliest
- Possible Values:
Latest
Earliest
The position of a subscription to the topic.
subscriptionName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The subscription name.
Using subscription name, we will fetch only records that haven't been consumed yet.
subscriptionType
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Exclusive
- Possible Values:
Exclusive
Shared
Failover
Key_Shared
The subscription type.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Pulsar topic(s) where to consume messages from.
Can be a string or a list of strings to consume from multiple topics.
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar:https://localhost:6650
- If you have multiple brokers:
pulsar:https://localhost:6650,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl:https://pulsar.us-west.example.com:6651
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
consumerName
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer name.
consumerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the consumer.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add a public encryption key to the producer/consumer.
schemaString
- Type: string
- Dynamic: ✔️
- Required: ❌
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaType
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
NONE
- Possible Values:
NONE
AVRO
JSON
The schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
tlsOptions
- Type: AbstractPulsarConnection-TlsOptions
- Dynamic: ❌
- Required: ❌
TLS authentication options.
You need to use "pulsar+ssl:https://" in serviceUrl to enable TLS support.
Outputs
eventTime
- Type: string
- Required: ❌
- Format:
date-time
key
- Type: string
- Required: ❌
messageId
- Type: string
- Required: ❌
properties
- Type: object
- SubType: string
- Required: ❌
topic
- Type: string
- Required: ❌
value
- Type: object
- Required: ❌
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
Properties
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate.
Must be a base64-encoded pem file.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate.
Must be a base64-encoded pem file.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate.
Must be a base64-encoded pem file.
Was this page helpful?