This repository contains a set of Numaflow plugins, including custom sources and sinks, map and reduce UDFs.
- Map
Clone the repository and build it with
docker build . -t leoflow
Each plugin defines its own configuration options that can be specified using environment variables, as outlined in Numaflow documentation.
The plugin allows you to run jq expressions on the input JSON data.
jq is like sed for JSON data - you can use it to slice and filter and map and transform JSON with the same ease that sed, awk, grep and friends let you play with text.
This UDF applies a specified jq expression to the input data and sends the result in an output message.
If the expression produces multiple result sets, each result is published as a separate message.
Command for the UDF container:
/leoflow map jq
Example pipeline config:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
spec:
vertices:
# ...
- name: jq
udf:
container:
image: leoflow@latest
command: [ '/leoflow' ]
args: [ 'map', 'jq' ]
env:
- name: JQ_DEBUG
value: 'true' # booleans must be strings in env var declaration
- name: JQ_MODE
value: 'map' # can be "map" or "tag", see below
- name: JQ_EXPRESSION
value: |
{
"external_id": (.Data.id | tostring),
"new_field": "This is a new field"
}
# ...
Name | Type | Values | Description |
---|---|---|---|
JQ_DEBUG |
string | true , false |
Debug mode |
JQ_MODE |
string | map , tag |
Operation mode (see description below). Default is map |
JQ_EXPRESSION |
string | jq expression to execute | |
JQ_TIMEOUT |
string | "5s", "1m" |
Maximum execution time. Default is 1s |
This UDF supports two modes:
map
: in this mode the UDF will execute the expression and send its result to the output. Just like regularjq
behavior.tag
: in this mode, the expression must return a string or an array of strings which are then used to tag the input message before sending it to the output.
Input message:
{
"field1": "value1",
"field2": "value2"
}
Expression to update a field value:
.field1 = "new_value"
Output:
{
"field1": "new_value",
"field2": "value2"
}
Input message:
[
{
"field": "value1"
},
{
"field": "value2"
}
]
Expression to split the input array into separate elements:
.[]
Output:
Message 1:
{
"field": "value1",
}
Message 2:
{
"field": "value2",
}
Input message:
{
"data": 1,
}
Expression returning a tag or set of tags:
if .data % 2 == 0 then "even" else "odd" end
Output:
The output message will be tagged with "odd".
{
"data": 1,
}
The plugin extends jq with several handy functions.
Important
Note that function params in jq are separated with semicolon.
The function allows you to perform an HTTP(s) call to the specified URL and return the result in jq
expressions.
It is useful for scenarios when you need to enrich the message with the data from the API, to make a decision based on the API response, and in many other cases.
Parameters:
Name | Description |
---|---|
url |
The full request URL |
opts |
Request options, see below. |
The opts
parameter is a JSON object with the following fields:
Name | Type | Description |
---|---|---|
method |
string | HTTP verb. Examples: GET , POST , DELETE , etc. Default is GET . |
headers |
JSON object | Request headers. Keys are header names, values can be a string or an array of strings. If the value is an array of strings, multiple headers with the same name are added, one per value. |
body |
string | Request body. |
Note: to send piped input as request body, do:
{
"method": "POST",
"body": ( . | tostring)
}
Adding a new field called "api_response" with the data returned by an API call to the input:
.api_response = fetch(
"https://mydomain.example/internal-api";
{
"method": "POST",
"headers": {
"Authorization": "Bearer Njg1M2ViMGQtN2NlNC00MmIwLWExMzEtN2RhMDQ1NTQ3YjE2",
"X-Custom-Header": [
"header value 1",
"header value 2"
]
},
"body": (. | tostring)
}
)
The function returns the following JSON object:
{
"status_code": 200,
"headers": {
"X-Response-Header": [
"header value 1",
"header value 2",
],
},
"body": "response body",
}
An expression like this will split the input JSON array into separate elements.
.[]
In this case, each element returned by the expression will be passed to output as a separate message.