Skip to content

Commit

Permalink
[FLINK-18307][scripts] Rename 'slaves' file to 'workers'
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jun 16, 2020
1 parent 4517dec commit c47fb47
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 33 deletions.
6 changes: 3 additions & 3 deletions docs/ops/deployment/cluster_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Set the `jobmanager.rpc.address` key to point to your master node. You should al

These values are given in MB. If some worker nodes have more main memory which you want to allocate to the Flink system you can overwrite the default value by setting `taskmanager.memory.process.size` or `taskmanager.memory.flink.size` in *conf/flink-conf.yaml* on those specific nodes.

Finally, you must provide a list of all nodes in your cluster which shall be used as worker nodes. Therefore, similar to the HDFS configuration, edit the file *conf/slaves* and enter the IP/host name of each worker node. Each worker node will later run a TaskManager.
Finally, you must provide a list of all nodes in your cluster which shall be used as worker nodes. Therefore, similar to the HDFS configuration, edit the file *conf/workers* and enter the IP/host name of each worker node. Each worker node will later run a TaskManager.

The following example illustrates the setup with three nodes (with IP addresses from _10.0.0.1_
to _10.0.0.3_ and hostnames _master_, _worker1_, _worker2_) and shows the contents of the
Expand All @@ -91,7 +91,7 @@ configuration files (which need to be accessible at the same path on all machine
</div>
<div class="row" style="margin-top: 1em;">
<p class="lead text-center">
/path/to/<strong>flink/<br>conf/slaves</strong>
/path/to/<strong>flink/<br>conf/workers</strong>
<pre>
10.0.0.2
10.0.0.3</pre>
Expand All @@ -118,7 +118,7 @@ are very important configuration values.

### Starting Flink

The following script starts a JobManager on the local node and connects via SSH to all worker nodes listed in the *slaves* file to start the TaskManager on each node. Now your Flink system is up and running. The JobManager running on the local node will now accept jobs at the configured RPC port.
The following script starts a JobManager on the local node and connects via SSH to all worker nodes listed in the *workers* file to start the TaskManager on each node. Now your Flink system is up and running. The JobManager running on the local node will now accept jobs at the configured RPC port.

Assuming that you are on the master node and inside the Flink directory:

Expand Down
6 changes: 3 additions & 3 deletions docs/ops/deployment/cluster_setup.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Set the `jobmanager.rpc.address` key to point to your master node. You should al

These values are given in MB. If some worker nodes have more main memory which you want to allocate to the Flink system you can overwrite the default value by setting setting `taskmanager.memory.process.size` or `taskmanager.memory.flink.size` in *conf/flink-conf.yaml* on those specific nodes.

Finally, you must provide a list of all nodes in your cluster which shall be used as worker nodes. Therefore, similar to the HDFS configuration, edit the file *conf/slaves* and enter the IP/host name of each worker node. Each worker node will later run a TaskManager.
Finally, you must provide a list of all nodes in your cluster which shall be used as worker nodes. Therefore, similar to the HDFS configuration, edit the file *conf/workers* and enter the IP/host name of each worker node. Each worker node will later run a TaskManager.

The following example illustrates the setup with three nodes (with IP addresses from _10.0.0.1_
to _10.0.0.3_ and hostnames _master_, _worker1_, _worker2_) and shows the contents of the
Expand All @@ -91,7 +91,7 @@ configuration files (which need to be accessible at the same path on all machine
</div>
<div class="row" style="margin-top: 1em;">
<p class="lead text-center">
/path/to/<strong>flink/<br>conf/slaves</strong>
/path/to/<strong>flink/<br>conf/workers</strong>
<pre>
10.0.0.2
10.0.0.3</pre>
Expand All @@ -118,7 +118,7 @@ are very important configuration values.

### Starting Flink

The following script starts a JobManager on the local node and connects via SSH to all worker nodes listed in the *slaves* file to start the TaskManager on each node. Now your Flink system is up and running. The JobManager running on the local node will now accept jobs at the configured RPC port.
The following script starts a JobManager on the local node and connects via SSH to all worker nodes listed in the *workers* file to start the TaskManager on each node. Now your Flink system is up and running. The JobManager running on the local node will now accept jobs at the configured RPC port.

Assuming that you are on the master node and inside the Flink directory:

Expand Down
46 changes: 23 additions & 23 deletions flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,14 @@ fi
# also potentially includes topology information and the taskManager type
extractHostName() {
# handle comments: extract first part of string (before first # character)
SLAVE=`echo $1 | cut -d'#' -f 1`
WORKER=`echo $1 | cut -d'#' -f 1`

# Extract the hostname from the network hierarchy
if [[ "$SLAVE" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
SLAVE=${BASH_REMATCH[1]}
if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
WORKER=${BASH_REMATCH[1]}
fi

echo $SLAVE
echo $WORKER
}

# Auxilliary functions for log file rotation
Expand Down Expand Up @@ -446,52 +446,52 @@ readMasters() {
done < "$MASTERS_FILE"
}

readSlaves() {
SLAVES_FILE="${FLINK_CONF_DIR}/slaves"
readWorkers() {
WORKERS_FILE="${FLINK_CONF_DIR}/workers"

if [[ ! -f "$SLAVES_FILE" ]]; then
echo "No slaves file. Please specify slaves in 'conf/slaves'."
if [[ ! -f "$WORKERS_FILE" ]]; then
echo "No workers file. Please specify workers in 'conf/workers'."
exit 1
fi

SLAVES=()
WORKERS=()

SLAVES_ALL_LOCALHOST=true
WORKERS_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOST=$( extractHostName $line)
if [ -n "$HOST" ] ; then
SLAVES+=(${HOST})
WORKERS+=(${HOST})
if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
SLAVES_ALL_LOCALHOST=false
WORKERS_ALL_LOCALHOST=false
fi
fi
done < "$SLAVES_FILE"
done < "$WORKERS_FILE"
}

# starts or stops TMs on all slaves
# TMSlaves start|stop
TMSlaves() {
# starts or stops TMs on all workers
# TMWorkers start|stop
TMWorkers() {
CMD=$1

readSlaves
readWorkers

if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then
if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
# all-local setup
for slave in ${SLAVES[@]}; do
for worker in ${WORKERS[@]}; do
"${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
done
else
# non-local setup
# Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
# start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
command -v pdsh >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
for slave in ${SLAVES[@]}; do
ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
for worker in ${WORKERS[@]}; do
ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
done
else
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
fi
fi
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/bin/start-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ fi
shopt -u nocasematch

# Start TaskManager instance(s)
TMSlaves start
TMWorkers start
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/bin/stop-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh

# Stop TaskManager instance(s)
TMSlaves stop
TMWorkers stop

# Stop JobManager instance(s)
shopt -s nocasematch
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void appendConfiguration(Configuration config) throws IOException {
}

public void setTaskExecutorHosts(Collection<String> taskExecutorHosts) throws IOException {
Files.write(conf.resolve("slaves"), taskExecutorHosts);
Files.write(conf.resolve("workers"), taskExecutorHosts);
}

public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor) throws IOException {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,7 @@ under the License.
<!-- netty test file, still Apache License 2.0 but with a different header -->
<exclude>flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java</exclude>
<!-- Configuration Files. -->
<exclude>**/flink-bin/conf/slaves</exclude>
<exclude>**/flink-bin/conf/workers</exclude>
<exclude>**/flink-bin/conf/masters</exclude>
<!-- Administrative files in the main trunk. -->
<exclude>**/README.md</exclude>
Expand Down

0 comments on commit c47fb47

Please sign in to comment.