Skip to content

Commit

Permalink
ProtocolServer: Stream the downloaded data if possible
Browse files Browse the repository at this point in the history
This patchset makes ProtocolServer stream the downloads to its client
(LibProtocol), and as such changes the download API; a possible
download lifecycle could be as such:
notation = client->server:'>', server->client:'<', pipe activity:'*'
```
> StartDownload(GET, url, headers, {})
< Response(0, fd 8)
* {data, 1024b}
< HeadersBecameAvailable(0, response_headers, 200)
< DownloadProgress(0, 4K, 1024)
* {data, 1024b}
* {data, 1024b}
< DownloadProgress(0, 4K, 2048)
* {data, 1024b}
< DownloadProgress(0, 4K, 1024)
< DownloadFinished(0, true, 4K)
```

Since managing the received file descriptor is a pain, LibProtocol
implements `Download::stream_into(OutputStream)`, which can be used to
stream the download into any given output stream (be it a file, or
memory, or writing stuff with a delay, etc.).
Also, as some of the users of this API require all the downloaded data
upfront, LibProtocol also implements `set_should_buffer_all_input()`,
which causes the download instance to buffer all the data until the
download is complete, and to call the `on_buffered_download_finish`
hook.
  • Loading branch information
alimpfard authored and awesomekling committed Dec 30, 2020
1 parent 36d642e commit 4a2da10
Show file tree
Hide file tree
Showing 55 changed files with 524 additions and 231 deletions.
30 changes: 15 additions & 15 deletions Applications/Browser/DownloadWidget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <AK/SharedBuffer.h>
#include <AK/StringBuilder.h>
#include <LibCore/File.h>
#include <LibCore/FileStream.h>
#include <LibCore/StandardPaths.h>
#include <LibDesktop/Launcher.h>
#include <LibGUI/BoxLayout.h>
Expand Down Expand Up @@ -61,9 +62,19 @@ DownloadWidget::DownloadWidget(const URL& url)
m_download->on_progress = [this](Optional<u32> total_size, u32 downloaded_size) {
did_progress(total_size.value(), downloaded_size);
};
m_download->on_finish = [this](bool success, auto payload, auto payload_storage, auto& response_headers, auto) {
did_finish(success, payload, payload_storage, response_headers);
};

{
auto file_or_error = Core::File::open(m_destination_path, Core::IODevice::WriteOnly);
if (file_or_error.is_error()) {
GUI::MessageBox::show(window(), String::formatted("Cannot open {} for writing", m_destination_path), "Download failed", GUI::MessageBox::Type::Error);
window()->close();
return;
}
m_output_file_stream = make<Core::OutputFileStream>(*file_or_error.value());
}

m_download->on_finish = [this](bool success, auto) { did_finish(success); };
m_download->stream_into(*m_output_file_stream);

set_fill_with_background_color(true);
auto& layout = set_layout<GUI::VerticalBoxLayout>();
Expand Down Expand Up @@ -149,7 +160,7 @@ void DownloadWidget::did_progress(Optional<u32> total_size, u32 downloaded_size)
}
}

void DownloadWidget::did_finish(bool success, [[maybe_unused]] ReadonlyBytes payload, [[maybe_unused]] RefPtr<SharedBuffer> payload_storage, [[maybe_unused]] const HashMap<String, String, CaseInsensitiveStringTraits>& response_headers)
void DownloadWidget::did_finish(bool success)
{
dbg() << "did_finish, success=" << success;

Expand All @@ -166,17 +177,6 @@ void DownloadWidget::did_finish(bool success, [[maybe_unused]] ReadonlyBytes pay
window()->close();
return;
}

auto file_or_error = Core::File::open(m_destination_path, Core::IODevice::WriteOnly);
if (file_or_error.is_error()) {
GUI::MessageBox::show(window(), String::formatted("Cannot open {} for writing", m_destination_path), "Download failed", GUI::MessageBox::Type::Error);
window()->close();
return;
}

auto& file = *file_or_error.value();
bool write_success = file.write(payload.data(), payload.size());
ASSERT(write_success);
}

}
4 changes: 3 additions & 1 deletion Applications/Browser/DownloadWidget.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <AK/URL.h>
#include <LibCore/ElapsedTimer.h>
#include <LibCore/FileStream.h>
#include <LibGUI/ProgressBar.h>
#include <LibGUI/Widget.h>
#include <LibProtocol/Download.h>
Expand All @@ -44,7 +45,7 @@ class DownloadWidget final : public GUI::Widget {
explicit DownloadWidget(const URL&);

void did_progress(Optional<u32> total_size, u32 downloaded_size);
void did_finish(bool success, ReadonlyBytes payload, RefPtr<SharedBuffer> payload_storage, const HashMap<String, String, CaseInsensitiveStringTraits>& response_headers);
void did_finish(bool success);

URL m_url;
String m_destination_path;
Expand All @@ -53,6 +54,7 @@ class DownloadWidget final : public GUI::Widget {
RefPtr<GUI::Label> m_progress_label;
RefPtr<GUI::Button> m_cancel_button;
RefPtr<GUI::Button> m_close_button;
OwnPtr<Core::OutputFileStream> m_output_file_stream;
Core::ElapsedTimer m_elapsed_timer;
};

Expand Down
4 changes: 2 additions & 2 deletions Applications/Browser/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ int main(int argc, char** argv)
return 1;
}

if (pledge("stdio shared_buffer accept unix cpath rpath wpath fattr", nullptr) < 0) {
if (pledge("stdio shared_buffer accept unix cpath rpath wpath fattr sendfd recvfd", nullptr) < 0) {
perror("pledge");
return 1;
}
Expand All @@ -86,7 +86,7 @@ int main(int argc, char** argv)
Web::ResourceLoader::the();

// FIXME: Once there is a standalone Download Manager, we can drop the "unix" pledge.
if (pledge("stdio shared_buffer accept unix cpath rpath wpath", nullptr) < 0) {
if (pledge("stdio shared_buffer accept unix cpath rpath wpath sendfd recvfd", nullptr) < 0) {
perror("pledge");
return 1;
}
Expand Down
3 changes: 2 additions & 1 deletion Libraries/LibCore/NetworkJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

namespace Core {

NetworkJob::NetworkJob()
NetworkJob::NetworkJob(OutputStream& output_stream)
: m_output_stream(output_stream)
{
}

Expand Down
8 changes: 7 additions & 1 deletion Libraries/LibCore/NetworkJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#pragma once

#include <AK/Function.h>
#include <AK/Stream.h>
#include <LibCore/Object.h>

namespace Core {
Expand All @@ -43,6 +44,8 @@ class NetworkJob : public Object {
};
virtual ~NetworkJob() override;

// Could fire twice, after Headers and after Trailers!
Function<void(const HashMap<String, String, CaseInsensitiveStringTraits>& response_headers, Optional<u32> response_code)> on_headers_received;
Function<void(bool success)> on_finish;
Function<void(Optional<u32>, u32)> on_progress;

Expand All @@ -62,13 +65,16 @@ class NetworkJob : public Object {
}

protected:
NetworkJob();
NetworkJob(OutputStream&);
void did_finish(NonnullRefPtr<NetworkResponse>&&);
void did_fail(Error);
void did_progress(Optional<u32> total_size, u32 downloaded);

size_t do_write(ReadonlyBytes bytes) { return m_output_stream.write(bytes); }

private:
RefPtr<NetworkResponse> m_response;
OutputStream& m_output_stream;
Error m_error { Error::None };
};

Expand Down
3 changes: 1 addition & 2 deletions Libraries/LibCore/NetworkResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@

namespace Core {

NetworkResponse::NetworkResponse(ByteBuffer&& payload)
: m_payload(payload)
NetworkResponse::NetworkResponse()
{
}

Expand Down
4 changes: 1 addition & 3 deletions Libraries/LibCore/NetworkResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ class NetworkResponse : public RefCounted<NetworkResponse> {
virtual ~NetworkResponse();

bool is_error() const { return m_error; }
const ByteBuffer& payload() const { return m_payload; }

protected:
explicit NetworkResponse(ByteBuffer&&);
explicit NetworkResponse();

bool m_error { false };
ByteBuffer m_payload;
};

}
4 changes: 2 additions & 2 deletions Libraries/LibGemini/GeminiJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ bool GeminiJob::eof() const
return m_socket->eof();
}

bool GeminiJob::write(const ByteBuffer& data)
bool GeminiJob::write(ReadonlyBytes bytes)
{
return m_socket->write(data);
return m_socket->write(bytes);
}

}
6 changes: 3 additions & 3 deletions Libraries/LibGemini/GeminiJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ namespace Gemini {
class GeminiJob final : public Job {
C_OBJECT(GeminiJob)
public:
explicit GeminiJob(const GeminiRequest& request, const Vector<Certificate>* override_certificates = nullptr)
: Job(request)
explicit GeminiJob(const GeminiRequest& request, OutputStream& output_stream, const Vector<Certificate>* override_certificates = nullptr)
: Job(request, output_stream)
, m_override_ca_certificates(override_certificates)
{
}
Expand All @@ -61,7 +61,7 @@ class GeminiJob final : public Job {
virtual bool can_read() const override;
virtual ByteBuffer receive(size_t) override;
virtual bool eof() const override;
virtual bool write(const ByteBuffer&) override;
virtual bool write(ReadonlyBytes) override;
virtual bool is_established() const override { return m_socket->is_established(); }
virtual bool should_fail_on_empty_payload() const override { return false; }
virtual void read_while_data_available(Function<IterationDecision()>) override;
Expand Down
5 changes: 2 additions & 3 deletions Libraries/LibGemini/GeminiResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@

namespace Gemini {

GeminiResponse::GeminiResponse(int status, String meta, ByteBuffer&& payload)
: Core::NetworkResponse(move(payload))
, m_status(status)
GeminiResponse::GeminiResponse(int status, String meta)
: m_status(status)
, m_meta(meta)
{
}
Expand Down
6 changes: 3 additions & 3 deletions Libraries/LibGemini/GeminiResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ namespace Gemini {
class GeminiResponse : public Core::NetworkResponse {
public:
virtual ~GeminiResponse() override;
static NonnullRefPtr<GeminiResponse> create(int status, String meta, ByteBuffer&& payload)
static NonnullRefPtr<GeminiResponse> create(int status, String meta)
{
return adopt(*new GeminiResponse(status, meta, move(payload)));
return adopt(*new GeminiResponse(status, meta));
}

int status() const { return m_status; }
String meta() const { return m_meta; }

private:
GeminiResponse(int status, String, ByteBuffer&&);
GeminiResponse(int status, String);

int m_status { 0 };
String m_meta;
Expand Down
39 changes: 30 additions & 9 deletions Libraries/LibGemini/Job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,33 @@

namespace Gemini {

Job::Job(const GeminiRequest& request)
: m_request(request)
Job::Job(const GeminiRequest& request, OutputStream& output_stream)
: Core::NetworkJob(output_stream)
, m_request(request)
{
}

Job::~Job()
{
}

void Job::flush_received_buffers()
{
for (size_t i = 0; i < m_received_buffers.size(); ++i) {
auto& payload = m_received_buffers[i];
auto written = do_write(payload);
m_received_size -= written;
if (written == payload.size()) {
// FIXME: Make this a take-first-friendly object?
m_received_buffers.take_first();
continue;
}
ASSERT(written < payload.size());
payload = payload.slice(written, payload.size() - written);
return;
}
}

void Job::on_socket_connected()
{
register_on_ready_to_write([this] {
Expand Down Expand Up @@ -126,6 +144,7 @@ void Job::on_socket_connected()

m_received_buffers.append(payload);
m_received_size += payload.size();
flush_received_buffers();

deferred_invoke([this](auto&) { did_progress({}, m_received_size); });

Expand All @@ -144,15 +163,17 @@ void Job::on_socket_connected()
void Job::finish_up()
{
m_state = State::Finished;
auto flattened_buffer = ByteBuffer::create_uninitialized(m_received_size);
u8* flat_ptr = flattened_buffer.data();
for (auto& received_buffer : m_received_buffers) {
memcpy(flat_ptr, received_buffer.data(), received_buffer.size());
flat_ptr += received_buffer.size();
flush_received_buffers();
if (m_received_size != 0) {
// FIXME: What do we do? ignore it?
// "Transmission failed" is not strictly correct, but let's roll with it for now.
deferred_invoke([this](auto&) {
did_fail(Error::TransmissionFailed);
});
return;
}
m_received_buffers.clear();

auto response = GeminiResponse::create(m_status, m_meta, move(flattened_buffer));
auto response = GeminiResponse::create(m_status, m_meta);
deferred_invoke([this, response](auto&) {
did_finish(move(response));
});
Expand Down
7 changes: 4 additions & 3 deletions Libraries/LibGemini/Job.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace Gemini {

class Job : public Core::NetworkJob {
public:
explicit Job(const GeminiRequest&);
explicit Job(const GeminiRequest&, OutputStream&);
virtual ~Job() override;

virtual void start() override = 0;
Expand All @@ -48,14 +48,15 @@ class Job : public Core::NetworkJob {
protected:
void finish_up();
void on_socket_connected();
void flush_received_buffers();
virtual void register_on_ready_to_read(Function<void()>) = 0;
virtual void register_on_ready_to_write(Function<void()>) = 0;
virtual bool can_read_line() const = 0;
virtual String read_line(size_t) = 0;
virtual bool can_read() const = 0;
virtual ByteBuffer receive(size_t) = 0;
virtual bool eof() const = 0;
virtual bool write(const ByteBuffer&) = 0;
virtual bool write(ReadonlyBytes) = 0;
virtual bool is_established() const = 0;
virtual bool should_fail_on_empty_payload() const { return false; }
virtual void read_while_data_available(Function<IterationDecision()> read) { read(); };
Expand All @@ -70,7 +71,7 @@ class Job : public Core::NetworkJob {
State m_state { State::InStatus };
int m_status { -1 };
String m_meta;
Vector<ByteBuffer> m_received_buffers;
Vector<ByteBuffer, 2> m_received_buffers;
size_t m_received_size { 0 };
bool m_sent_data { false };
bool m_should_have_payload { false };
Expand Down
4 changes: 2 additions & 2 deletions Libraries/LibHTTP/HttpJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ bool HttpJob::eof() const
return m_socket->eof();
}

bool HttpJob::write(const ByteBuffer& data)
bool HttpJob::write(ReadonlyBytes bytes)
{
return m_socket->write(data);
return m_socket->write(bytes);
}

}
6 changes: 3 additions & 3 deletions Libraries/LibHTTP/HttpJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace HTTP {
class HttpJob final : public Job {
C_OBJECT(HttpJob)
public:
explicit HttpJob(const HttpRequest& request)
: Job(request)
explicit HttpJob(const HttpRequest& request, OutputStream& output_stream)
: Job(request, output_stream)
{
}

Expand All @@ -59,7 +59,7 @@ class HttpJob final : public Job {
virtual bool can_read() const override;
virtual ByteBuffer receive(size_t) override;
virtual bool eof() const override;
virtual bool write(const ByteBuffer&) override;
virtual bool write(ReadonlyBytes) override;
virtual bool is_established() const override { return true; }

private:
Expand Down
5 changes: 3 additions & 2 deletions Libraries/LibHTTP/HttpRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ ByteBuffer HttpRequest::to_raw_request() const
builder.append(header.value);
builder.append("\r\n");
}
builder.append("Connection: close\r\n\r\n");
builder.append("Connection: close\r\n");
if (!m_body.is_empty()) {
builder.appendff("Content-Length: {}\r\n\r\n", m_body.size());
builder.append((const char*)m_body.data(), m_body.size());
builder.append("\r\n");
}
builder.append("\r\n");
return builder.to_byte_buffer();
}

Expand Down
3 changes: 2 additions & 1 deletion Libraries/LibHTTP/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class HttpRequest {
void set_method(Method method) { m_method = method; }

const ByteBuffer& body() const { return m_body; }
void set_body(const ByteBuffer& body) { m_body = body; }
void set_body(ReadonlyBytes body) { m_body = ByteBuffer::copy(body); }
void set_body(ByteBuffer&& body) { m_body = move(body); }

String method_name() const;
ByteBuffer to_raw_request() const;
Expand Down
Loading

0 comments on commit 4a2da10

Please sign in to comment.