Skip to content

Commit

Permalink
[FLINK-21382][docs] Update documentation for standalone Flink on Kube…
Browse files Browse the repository at this point in the history
…rnetes with standby JobManagers

This closes apache#15248.
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Mar 23, 2021
1 parent 71ea41d commit d67c4d9
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ data:
Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps.
See [how to configure service accounts for pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/) for more information.

When High-Availability is enabled, Flink will use its own HA-services for service discovery.
Therefore, JobManager pods should be started with their IP address instead of a Kubernetes service as its `jobmanager.rpc.address`.
Refer to the [appendix](#appendix) for full configuration.

#### Standby JobManagers

Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes.
If you want to achieve faster recovery, configure the `replicas` in `jobmanager-session-deployment-ha.yaml` or `parallelism` in `jobmanager-application-ha.yaml` to a value greater than `1` to start standby JobManagers.

### Enabling Queryable State

You can access the queryable state of TaskManager if you create a `NodePort` service for it:
Expand Down Expand Up @@ -296,7 +305,7 @@ data:
logger.netty.level = OFF
```

`jobmanager-service.yaml`
`jobmanager-service.yaml` Optional service, which is only necessary for non-HA mode.
```yaml
apiVersion: v1
kind: Service
Expand Down Expand Up @@ -354,7 +363,7 @@ spec:

### Session cluster resource definitions

`jobmanager-session-deployment.yaml`
`jobmanager-session-deployment-non-ha.yaml`
```yaml
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -404,6 +413,64 @@ spec:
path: log4j-console.properties
```

`jobmanager-session-deployment-ha.yaml`
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1 # Set the value to greater than 1 to start standby JobManagers
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
```

`taskmanager-session-deployment.yaml`
```yaml
apiVersion: apps/v1
Expand Down Expand Up @@ -454,7 +521,7 @@ spec:

### Application cluster resource definitions

`jobmanager-application.yaml`
`jobmanager-application-non-ha.yaml`
```yaml
apiVersion: batch/v1
kind: Job
Expand Down Expand Up @@ -506,6 +573,66 @@ spec:
path: /host/path/to/job/artifacts
```

`jobmanager-application-ha.yaml`
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
```

`taskmanager-job-deployment.yaml`
```yaml
apiVersion: apps/v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ For high availability on Kubernetes, you can use the [existing high availability

#### Kubernetes High-Availability Services

Session Mode and Application Mode clusters support using the [Kubernetes high availability service]({{< ref "docs/deployment/ha/kubernetes_ha" >}}).
Session Mode and Application Mode clusters support using the [Kubernetes high availability service]({{< ref "docs/deployment/ha/kubernetes_ha" >}}).
You need to add the following Flink config options to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions).

<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#advanced-customization) and [enable plugins]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#using-filesystem-plugins) for more information.
Expand All @@ -218,6 +218,15 @@ data:
Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps.
See [how to configure service accounts for pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/) for more information.

When High-Availability is enabled, Flink will use its own HA-services for service discovery.
Therefore, JobManager pods should be started with their IP address instead of a Kubernetes service as its `jobmanager.rpc.address`.
Refer to the [appendix](#appendix) for full configuration.

#### Standby JobManagers

Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes.
If you want to achieve faster recovery, configure the `replicas` in `jobmanager-session-deployment-ha.yaml` or `parallelism` in `jobmanager-application-ha.yaml` to a value greater than `1` to start standby JobManagers.

### Enabling Queryable State

You can access the queryable state of TaskManager if you create a `NodePort` service for it:
Expand Down Expand Up @@ -296,7 +305,7 @@ data:
logger.netty.level = OFF
```

`jobmanager-service.yaml`
`jobmanager-service.yaml` Optional service, which is only necessary for non-HA mode.
```yaml
apiVersion: v1
kind: Service
Expand Down Expand Up @@ -354,7 +363,7 @@ spec:

### Session cluster resource definitions

`jobmanager-session-deployment.yaml`
`jobmanager-session-deployment-non-ha.yaml`
```yaml
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -404,6 +413,64 @@ spec:
path: log4j-console.properties
```

`jobmanager-session-deployment-ha.yaml`
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1 # Set the value to greater than 1 to start standby JobManagers
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
```

`taskmanager-session-deployment.yaml`
```yaml
apiVersion: apps/v1
Expand Down Expand Up @@ -454,7 +521,7 @@ spec:

### Application cluster resource definitions

`jobmanager-application.yaml`
`jobmanager-application-non-ha.yaml`
```yaml
apiVersion: batch/v1
kind: Job
Expand Down Expand Up @@ -506,6 +573,66 @@ spec:
path: /host/path/to/job/artifacts
```

`jobmanager-application-ha.yaml`
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
```

`taskmanager-job-deployment.yaml`
```yaml
apiVersion: apps/v1
Expand Down

0 comments on commit d67c4d9

Please sign in to comment.