Skip to content

Commit

Permalink
[FLINK-7841] [docs] Update AWS docs with respect to S3 file system ch…
Browse files Browse the repository at this point in the history
…anges

This closes apache#5029
  • Loading branch information
Nico Kruber authored and StephanEwen committed Nov 22, 2017
1 parent 98241d5 commit 7c07d6d
Showing 1 changed file with 91 additions and 35 deletions.
126 changes: 91 additions & 35 deletions docs/ops/deployment/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ After creating your cluster, you can [connect to the master node](https://docs.aw
2. Extract the Flink distribution and you are ready to deploy [Flink jobs via YARN](yarn_setup.html) after **setting the Hadoop config directory**:

```bash
HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar
HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar
```

{% top %}

## S3: Simple Storage Service

[Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl}}/ops/state/state_backends.html).
[Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl}}/ops/state/state_backends.html) or even as a YARN object storage.

You can use S3 objects like regular files by specifying paths in the following format:

Expand All @@ -89,20 +89,62 @@ env.setStateBackend(new FsStateBackend("s3:https://<your-bucket>/<endpoint>"));

Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI.

### Set S3 FileSystem
For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3
filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as
YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem
implementation. Both ways are described below.

### Shaded Hadoop/Presto S3 file systems (recommended)

{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %}

To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the
`opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g.
```
cp ./opt/flink-s3-fs-presto-1.4-SNAPSHOT.jar ./lib/
```

#### Configure Access Credentials

After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.

##### Identity and Access Management (IAM) (Recommended)

The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](https://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).

If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink.

##### Access Keys (Discouraged)

Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).

You need to configure both `s3.access-key` and `s3.secret-key` in Flink's `flink-conf.yaml`:

```
s3.access-key: your-access-key
s3.secret-key: your-secret-key
```

{% top %}

### Hadoop-provided S3 file systems - manual setup

{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %}

S3 is treated by Flink as a regular FileSystem. Interaction with S3 happens via a Hadoop [S3 FileSystem client](https://wiki.apache.org/hadoop/AmazonS3).
This setup is a bit more complex and we recommend using our shaded Hadoop/Presto file systems
instead (see above) unless required otherwise, e.g. for using S3 as YARN's resource storage dir
via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`.

#### Set S3 FileSystem

There are two popular S3 file system implementations available:
Interaction with S3 happens via one of [Hadoop's S3 FileSystem clients](https://wiki.apache.org/hadoop/AmazonS3):

1. `S3AFileSystem` (**recommended**): file system for reading and writing regular files using Amazon's SDK internally. No maximum file size and works with IAM roles.
2. `NativeS3FileSystem`: file system for reading and writing regular files. Maximum object size is 5GB and does not work with IAM roles.
1. `S3AFileSystem` (**recommended** for Hadoop 2.7 and later): file system for reading and writing regular files using Amazon's SDK internally. No maximum file size and works with IAM roles.
2. `NativeS3FileSystem` (for Hadoop 2.6 and earlier): file system for reading and writing regular files. Maximum object size is 5GB and does not work with IAM roles.

#### `S3AFileSystem` (Recommended)
##### `S3AFileSystem` (Recommended)

This is the recommended S3 FileSystem implementation to use. It uses Amazon's SDK internally and works with IAM roles (see [Configure Access Credential](#configure-access-credentials)).
This is the recommended S3 FileSystem implementation to use. It uses Amazon's SDK internally and works with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)).

You need to point Flink to a valid Hadoop configuration, which contains the following properties in `core-site.xml`:

Expand All @@ -126,9 +168,9 @@ You need to point Flink to a valid Hadoop configuration, which contains the foll

This registers `S3AFileSystem` as the default FileSystem for URIs with the `s3a:https://` scheme.

#### `NativeS3FileSystem`
##### `NativeS3FileSystem`

This file system is limited to files up to 5GB in size and it does not work IAM roles (see [Configure Access Credential](#configure-access-credentials)), meaning that you have to manually configure your AWS credentials in the Hadoop config file.
This file system is limited to files up to 5GB in size and it does not work with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)), meaning that you have to manually configure your AWS credentials in the Hadoop config file.

You need to point Flink to a valid Hadoop configuration, which contains the following property in `core-site.xml`:

Expand All @@ -141,10 +183,14 @@ You need to point Flink to a valid Hadoop configuration, which contains the foll

This registers `NativeS3FileSystem` as the default FileSystem for URIs with the `s3:https://` scheme.

#### Hadoop Configuration
{% top %}

You can specify the [Hadoop configuration](../config.html#hdfs) in various ways, for example by configuring the path to the Hadoop configuration directory in `flink-conf.yaml`:
#### Hadoop Configuration

You can specify the [Hadoop configuration](../config.html#hdfs) in various ways pointing Flink to
the path of the Hadoop configuration directory, for example
- by setting the environment variable `HADOOP_CONF_DIR`, or
- by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`:
```
fs.hdfs.hadoopconf: /path/to/etc/hadoop
```
Expand All @@ -153,23 +199,23 @@ This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with Fl

{% top %}

### Configure Access Credentials
#### Configure Access Credentials

{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %}

After setting up the S3 FileSystem, you need to make sure that Flink is allowed to access your S3 buckets.

#### Identity and Access Management (IAM) (Recommended)
##### Identity and Access Management (IAM) (Recommended)

When using `S3AFileSystem` the recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](https://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
When using `S3AFileSystem`, the recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](https://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).

If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink.

Note that this only works with `S3AFileSystem` and not `NativeS3FileSystem`.

{% top %}

#### Access Keys with S3AFileSystem (Discouraged)
##### Access Keys with `S3AFileSystem` (Discouraged)

Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).

Expand All @@ -189,7 +235,7 @@ For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and `fs.s3a.s

{% top %}

#### Access Keys with NativeS3FileSystem (Discouraged)
##### Access Keys with `NativeS3FileSystem` (Discouraged)

Access to S3 can be granted via your **access and secret key pair**. But this is discouraged and you should use `S3AFileSystem` [with the required IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).

Expand All @@ -209,47 +255,57 @@ For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and `

{% top %}

### Provide S3 FileSystem Dependency
#### Provide S3 FileSystem Dependency

{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %}

Hadoop's S3 FileSystem clients are packaged in the `hadoop-aws`. This JAR and all its dependencies need to be added to Flink's classpath, i.e. the class path of both Job and TaskManagers. Depending on which FileSystem implementation and which Flink and Hadoop version you use, you need to provide different dependencies (see below).
Hadoop's S3 FileSystem clients are packaged in the `hadoop-aws` artifact (Hadoop version 2.6 and later). This JAR and all its dependencies need to be added to Flink's classpath, i.e. the class path of both Job and TaskManagers. Depending on which FileSystem implementation and which Flink and Hadoop version you use, you need to provide different dependencies (see below).

There are multiple ways of adding JARs to Flink's class path, the easiest being simply to drop the JARs in Flink's `/lib` folder. You need to copy the `hadoop-aws` JAR with all its dependencies. You can also export the directory containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on all machines.
There are multiple ways of adding JARs to Flink's class path, the easiest being simply to drop the JARs in Flink's `lib` folder. You need to copy the `hadoop-aws` JAR with all its dependencies. You can also export the directory containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on all machines.

#### Flink for Hadoop 2.7
##### Flink for Hadoop 2.7

Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.7/share/hadoop/tools/lib`:

- `S3AFileSystem`:
- `hadoop-aws-2.7.2.jar`
- `aws-java-sdk-1.7.4.jar`
- `httpcore-4.2.5.jar`
- `httpclient-4.2.5.jar`
- `hadoop-aws-2.7.3.jar`
- `aws-java-sdk-s3-1.11.183.jar` and its dependencies:
- `aws-java-sdk-core-1.11.183.jar`
- `aws-java-sdk-kms-1.11.183.jar`
- `jackson-annotations-2.6.7.jar`
- `jackson-core-2.6.7.jar`
- `jackson-databind-2.6.7.jar`
- `joda-time-2.8.1.jar`
- `httpcore-4.4.4.jar`
- `httpclient-4.5.3.jar`

- `NativeS3FileSystem`:
- `hadoop-aws-2.7.2.jar`
- `hadoop-aws-2.7.3.jar`
- `guava-11.0.2.jar`

Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink.

#### Flink for Hadoop 2.6
##### Flink for Hadoop 2.6

Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.6/share/hadoop/tools/lib`:

- `S3AFileSystem`:
- `hadoop-aws-2.6.4.jar`
- `aws-java-sdk-1.7.4.jar`
- `httpcore-4.2.5.jar`
- `httpclient-4.2.5.jar`
- `aws-java-sdk-1.7.4.jar` and its dependencies:
- `jackson-annotations-2.1.1.jar`
- `jackson-core-2.1.1.jar`
- `jackson-databind-2.1.1.jar`
- `joda-time-2.2.jar`
- `httpcore-4.2.5.jar`
- `httpclient-4.2.5.jar`

- `NativeS3FileSystem`:
- `hadoop-aws-2.6.4.jar`
- `guava-11.0.2.jar`

Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink.

#### Flink for Hadoop 2.4 and earlier
##### Flink for Hadoop 2.4 and earlier

These Hadoop versions only have support for `NativeS3FileSystem`. This comes pre-packaged with Flink for Hadoop 2 as part of `hadoop-common`. You don't need to add anything to the classpath.

Expand All @@ -261,7 +317,7 @@ The following sections lists common issues when working with Flink on AWS.

### Missing S3 FileSystem Configuration

If your job submission fails with an Exception message noting that `No file system found with scheme s3` this means that no FileSystem has been configured for S3. Please check out the [FileSystem Configuration section](#set-s3-filesystem) for details on how to configure this properly.
If your job submission fails with an Exception message noting that `No file system found with scheme s3` this means that no FileSystem has been configured for S3. Please check out the configuration sections for our [shaded Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic Hadoop](#set-s3-filesystem) file systems for details on how to configure this properly.

```
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Expand All @@ -281,7 +337,7 @@ Caused by: java.io.IOException: No file system found with scheme s3,

### AWS Access Key ID and Secret Access Key Not Specified

If you see your job failing with an Exception noting that the `AWS Access Key ID and Secret Access Key must be specified as the username or password`, your access credentials have not been set up properly. Please refer to the [access credential section](#configure-access-credentials) for details on how to configure this.
If you see your job failing with an Exception noting that the `AWS Access Key ID and Secret Access Key must be specified as the username or password`, your access credentials have not been set up properly. Please refer to the access credential section for our [shaded Hadoop/Presto](#configure-access-credentials) or [generic Hadoop](#configure-access-credentials-1) file systems for details on how to configure this.

```
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Expand Down Expand Up @@ -354,7 +410,7 @@ Caused by: org.jets3t.service.impl.rest.HttpException [...]

### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator

This Exception is usually caused by skipping the local buffer directory configuration `fs.s3.buffer.dir` for the `S3AFileSystem`. Please refer to the [S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to configure the `S3AFileSystem` properly.
This Exception is usually caused by skipping the local buffer directory configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the [S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to configure the `S3AFileSystem` properly.

```
[...]
Expand Down

0 comments on commit 7c07d6d

Please sign in to comment.