Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Also allow passing buffer instead of path for retrieve_file and store_file methods in SFTPHook #44247

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

dabla
Copy link
Contributor

@dabla dabla commented Nov 21, 2024

In the FTPSHook it's also possible to pass a buffer (like StringIO/BytesIO) instead of a path, I've refactored the retrieve_file and store_file methods in SFTPHook so those also support those types. This avoids the need to create a file to be able to download a file to FTP, you can directly download it into a buffer and vice versa if you need to upload content to an FTP server you don't need to create a file locally to be able to upload it. I just delegate this to the underlying paramiko getfo and putfo methods, nothing fancy had to be done to achieve this as the paramiko library already supported this. This shortcoming was described as a pitfall for the SFTPHook.

Also using our in-house StreamOperator which allows executing an operator in a multi-threaded way within the same worker (similar to expand but then distributing the work over multiple threads within the same worker instead of multiple workers), we discovered that when using the SFTPHook in a PythonOperator to retrieve a file, the connections don't get closed, which seems probable as each thread has to instantiate a new SFTPHook as the underlying library paramiko isn't thread-safe. When you look at the source code you'll notice that each operation within the SFTPHook get's a connection but actually never closes it as you have to call the close_conn method manually.

    def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
        """
        Transfer the remote file to a local location.

        If local_full_path is a string path, the file will be put
        at that location.

        :param remote_full_path: full path to the remote file
        :param local_full_path: full path to the local file or a file-like buffer
        :param prefetch: controls whether prefetch is performed (default: True)
        """
        conn = self.get_conn()
        if isinstance(local_full_path, BytesIO):
            conn.getfo(remote_full_path, local_full_path, prefetch=prefetch)
        else:
            conn.get(remote_full_path, local_full_path, prefetch=prefetch)

In the past we didn't use the SFTPHook but used directly the paramiko library like this and didn't have any issues:

    def get_file(remote_full_path: str) -> str:
        buffer = BytesIO()
        conn = get_connection(conn_id=sftp_conn)
        with Transport((conn.host, conn.port)) as transport:
            transport.connect(username=conn.login, password=conn.password)
            with SFTPClient.from_transport(transport) as sftp:
                sftp.retrieve_file(remote_full_path, buffer)  # Download file to buffer
        return buffer.getvalue().decode("utf-8")

So I think we should refactor the SFTP hook to allow it being used in a context manager to make sure the ftp connection is closed after each call so we can invoke it like this:

    def get_file(remote_full_path: str) -> str:
        with SFTPHook.get_hook(conn_id=sftp_conn) as hook:
            buffer = BytesIO()
            hook.retrieve_file(remote_full_path, buffer)  # Download file to buffer
            return buffer.getvalue().decode("utf-8")

Otherwise we would need to call it like this, in the above you don't need to worry about the connection:

    def get_file(remote_full_path: str) -> str:
        hook = SFTPHook.get_hook(conn_id=sftp_conn)
        try:
            buffer = BytesIO()
            hook.retrieve_file(remote_full_path, buffer)  # Download file to buffer
            return buffer.getvalue().decode("utf-8")
        finally:
            hook.close_conn()

I've added a test case for both changes.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

… and store_file methods in SFTPHook like it's the case in FTPSHook
@dabla dabla changed the title Also allow passing buffer instead of path for retrieve_file and store_file methods in FTPSHook Also allow passing buffer instead of path for retrieve_file and store_file methods in SFTPHook Nov 21, 2024
@dabla dabla marked this pull request as draft November 22, 2024 13:50
@dabla dabla marked this pull request as ready for review November 22, 2024 14:11
@dabla dabla marked this pull request as draft November 23, 2024 07:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants