Skip to content

Commit

Permalink
[hotfix][docs] Some fixes to FileSystem Documentation
Browse files Browse the repository at this point in the history
This closes apache#8326
  • Loading branch information
sjwiesman authored and StephanEwen committed May 10, 2019
1 parent 7d45b75 commit 4c0bbc4
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 1,228 deletions.
317 changes: 0 additions & 317 deletions docs/ops/deployment/aws.md

Large diffs are not rendered by default.

387 changes: 0 additions & 387 deletions docs/ops/deployment/aws.zh.md

Large diffs are not rendered by default.

27 changes: 14 additions & 13 deletions docs/ops/filesystems/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ specific language governing permissions and limitations
under the License.
-->

Apache Flink provides a number of common configuration settings that work across all file system implementations.
Apache Flink provides several standard configuration settings that work across all file system implementations.

* This will be replaced by the TOC
{:toc}

## Default File System

If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used.
A default scheme (and authority) is used if paths to files do not explicitly specify a file system scheme (and authority).

{% highlight yaml %}
fs.default-scheme: <default-fs>
Expand All @@ -40,10 +40,10 @@ For example, if the default file system configured as `fs.default-scheme: hdfs:/

## Connection limiting

You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number
of concurrent reads / writes or open connections at the same time.
You can limit the total number of connections that a file system can concurrently open which is useful when the file system cannot handle a large number
of concurrent reads/writes or open connections at the same time.

For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.
For example, small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.

To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by
its scheme.
Expand All @@ -56,13 +56,14 @@ fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
{% endhighlight %}

You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed.
If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail.
You can limit the number of input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation blocks until some streams close.
If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening fails.

To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams:
`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amount of time, it is forcibly closed.
To prevent inactive streams from taking up the full pool (preventing new connections to be opened), you can add an inactivity timeout which forcibly closes them if they do not read/write any bytes for at least that amount of time: `fs.<scheme>.limit.stream-timeout`.

These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections.
In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different
authorities will have their own connection pool. For example `hdfs:https://myhdfs:50010/` and `hdfs:https://anotherhdfs:4399/` will have separate pools.
Limit enforcment on a per TaskManager/file system basis.
Because file systems creation occurs per scheme and authority, different
authorities have independent connection pools. For example `hdfs:https://myhdfs:50010/` and `hdfs:https://anotherhdfs:4399/` will have separate pools.

{% top %}
27 changes: 14 additions & 13 deletions docs/ops/filesystems/common.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ specific language governing permissions and limitations
under the License.
-->

Apache Flink provides a number of common configuration settings that work across all file system implementations.
Apache Flink provides several standard configuration settings that work across all file system implementations.

* This will be replaced by the TOC
{:toc}

## Default File System

If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used.
A default scheme (and authority) is used if paths to files do not explicitly specify a file system scheme (and authority).

{% highlight yaml %}
fs.default-scheme: <default-fs>
Expand All @@ -40,10 +40,10 @@ For example, if the default file system configured as `fs.default-scheme: hdfs:/

## Connection limiting

You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number
of concurrent reads / writes or open connections at the same time.
You can limit the total number of connections that a file system can concurrently open which is useful when the file system cannot handle a large number
of concurrent reads/writes or open connections at the same time.

For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.
For example, small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.

To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by
its scheme.
Expand All @@ -56,13 +56,14 @@ fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
{% endhighlight %}

You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed.
If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail.
You can limit the number of input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation blocks until some streams close.
If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening fails.

To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams:
`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amount of time, it is forcibly closed.
To prevent inactive streams from taking up the full pool (preventing new connections to be opened), you can add an inactivity timeout which forcibly closes them if they do not read/write any bytes for at least that amount of time: `fs.<scheme>.limit.stream-timeout`.

These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections.
In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different
authorities will have their own connection pool. For example `hdfs:https://myhdfs:50010/` and `hdfs:https://anotherhdfs:4399/` will have separate pools.
Limit enforcment on a per TaskManager/file system basis.
Because file systems creation occurs per scheme and authority, different
authorities have independent connection pools. For example `hdfs:https://myhdfs:50010/` and `hdfs:https://anotherhdfs:4399/` will have separate pools.

{% top %}
36 changes: 23 additions & 13 deletions docs/ops/filesystems/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,37 @@ specific language governing permissions and limitations
under the License.
-->

Apache Flink uses file system for both ingest and output of data for streaming and batch applications as well as targets for checkpoint storage.
These file systems can be local such as *Unix*, distributed like *HDFS*, or even object stores such as *S3*.
Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyum OSS*.

The file system used for a particular file is determined by its URI scheme.
For example, `file:https:///home/user/text.txt` refers to a file in the local file system, while `hdfs:https://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.

FileSystem instances are instantiated once per process and then cached / pooled, to
avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits.
File system instances are instantiated once per process and then cached/pooled, to avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits.

* This will be replaced by the TOC
{:toc}

### Built-in File Systems

Flink ships with support for most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS* and *OpenStack Swift FS*.
Each is identified by the scheme included in the URI of the provide file path.

Flink ships with implementations for the following file systems:

- **local**: This file system is used when the scheme is *"file:https://"*, and it represents the file system of the local machine, including any NFS or SAN that is mounted into that local file system.
- **local**: This file system is used when the scheme is *"file:https://"*, and it represents the file system of the local machine, including any NFS or SAN drives mounted into that local file system.

- **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint.

- **MapR FS**: The MapR file system *"maprfs:https://"* is automatically available when the MapR libraries are in the classpath.

- **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift:https://"*.
The implementation `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.

#### HDFS and Hadoop File System support

For all schemes where Flink cannot find a directly supported file system, it will fall back to Hadoop.
All Hadoop file systems are automatically available when `flink-runtime` and the Hadoop libraries are in classpath.
For all schemes where Flink cannot find a directly supported file system, it falls back to Hadoop.
All Hadoop file systems are automatically available when `flink-runtime` and the Hadoop libraries are on the classpath.


This way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS).

Expand All @@ -67,12 +64,25 @@ This way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-c
- **har**
- ...

##### Hadoop Configuration

We recommend using Flink's built-in file systems them unless required otherwise, e.g., for using that file system as YARN's resource storage dir via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`.

If using a Hadoop file system, you can specify the [configuration](../config.html#hdfs) by setting the environment variable `HADOOP_CONF_DIR`, or by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`.

{% highlight yaml %}
fs.hdfs.hadoopconf: /path/to/etc/hadoop
{% endhighlight %}

This registers `/path/to/etc/hadoop` as Hadoop's configuration directory and is where Flink will look for the `core-site.xml` and `hdfs-site.xml` files.


## Adding new File System Implementations

File systems are represented via the `org.apache.flink.core.fs.FileSystem` class, which captures the ways to access and modify files and objects in that file system.
Implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations.
Implementations are discovered by Flink through Java's service abstraction, making it easy to add new file system implementations.

In order to add a new File System, the following steps are needed:
To add a new file system:

- Add the File System implementation, which is a subclass of `org.apache.flink.core.fs.FileSystem`.
- Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of `org.apache.flink.core.fs.FileSystemFactory`.
Expand Down
Loading

0 comments on commit 4c0bbc4

Please sign in to comment.