RealtimeTrigger RealtimeTrigger

yaml
type: "io.kestra.plugin.pulsar.RealtimeTrigger"

Consume a message in real-time from Pulsar topics and create one execution per message.

If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.pulsar.Trigger instead.

Examples

Consume a message from a Pulsar topic in real-time.

yaml
id: pulsar
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ trigger.value }}"

triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.pulsar.RealtimeTrigger
    topic: kestra_trigger
    uri: pulsar:https://localhost:26650
    deserializer: JSON
    subscriptionName: kestra_trigger_sub

Properties

deserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

initialPosition

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: Earliest
  • Possible Values:
    • Latest
    • Earliest

The position of a subscription to the topic.

subscriptionName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The subscription name.

Using subscription name, we will fetch only records that haven't been consumed yet.

subscriptionType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: Exclusive
  • Possible Values:
    • Exclusive
    • Shared
    • Failover
    • Key_Shared

The subscription type.

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic(s) where to consume messages from.

Can be a string or a list of strings to consume from multiple topics.

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs.

You need to specify a Pulsar protocol URL.

  • Example of localhost: pulsar:https://localhost:6650
  • If you have multiple brokers: pulsar:https://localhost:6650,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl:https://pulsar.us-west.example.com:6651

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication token.

Authentication token that can be required by some providers such as Clever Cloud.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

consumerName

  • Type: string
  • Dynamic: ✔️
  • Required:

The consumer name.

consumerProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Add all the properties in the provided map to the consumer.

encryptionKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Add a public encryption key to the producer/consumer.

schemaString

  • Type: string
  • Dynamic: ✔️
  • Required:

JSON string of the topic's schema

Required for connecting with topics with a defined schema and strict schema checking

schemaType

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: NONE
  • Possible Values:
    • NONE
    • AVRO
    • JSON

The schema type of the topic

Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

tlsOptions

TLS authentication options.

You need to use "pulsar+ssl:https://" in serviceUrl to enable TLS support.

Outputs

eventTime

  • Type: string
  • Required:
  • Format: date-time

key

  • Type: string
  • Required:

messageId

  • Type: string
  • Required:

properties

  • Type: object
  • SubType: string
  • Required:

topic

  • Type: string
  • Required:

value

  • Type: object
  • Required:

Definitions

io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions

Properties

ca
  • Type: string
  • Dynamic:
  • Required:

The ca certificate.

Must be a base64-encoded pem file.

cert
  • Type: string
  • Dynamic:
  • Required:

The client certificate.

Must be a base64-encoded pem file.

key
  • Type: string
  • Dynamic:
  • Required:

The key certificate.

Must be a base64-encoded pem file.

Was this page helpful?