Skip to content

Commit

Permalink
Shell: Actually process for loop entries as a stream
Browse files Browse the repository at this point in the history
This actually does what d4bcc68 meant to do.
  • Loading branch information
alimpfard authored and awesomekling committed Aug 22, 2020
1 parent 2c14abe commit 103f659
Showing 1 changed file with 45 additions and 33 deletions.
78 changes: 45 additions & 33 deletions Shell/AST.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@

#include "AST.h"
#include "Shell.h"
#include <AK/ScopeGuard.h>
#include <AK/String.h>
#include <AK/StringBuilder.h>
#include <AK/URL.h>
#include <LibCore/EventLoop.h>
#include <LibCore/File.h>
#include <signal.h>

Expand Down Expand Up @@ -793,37 +795,36 @@ RefPtr<Value> ForLoop::run(RefPtr<Shell> shell)

size_t consecutive_interruptions = 0;

NonnullRefPtrVector<Value> values;
auto resolved = m_iterated_expression->run(shell)->resolve_without_cast(shell);
if (resolved->is_list_without_resolution())
values = static_cast<ListValue*>(resolved.ptr())->values();
else
values = create<ListValue>(resolved->resolve_as_list(shell))->values();

for (auto& value : values) {
m_iterated_expression->for_each_entry(shell, [&](auto value) {
if (consecutive_interruptions == 2)
break;
return IterationDecision::Break;

RefPtr<Value> block_value;

{
auto frame = shell->push_frame();
shell->set_local_variable(m_variable_name, value);

auto frame = shell->push_frame();
shell->set_local_variable(m_variable_name, value);
block_value = m_block->run(shell);
}

auto block_value = m_block->run(shell)->resolve_without_cast(shell);
if (block_value->is_job()) {
auto job = static_cast<JobValue*>(block_value.ptr())->job();
if (!job || job->is_running_in_background())
continue;
return IterationDecision::Continue;
shell->block_on_job(job);

if (job->signaled()) {
if (job->termination_signal() == SIGINT)
++consecutive_interruptions;
else
break;
return IterationDecision::Break;
} else {
consecutive_interruptions = 0;
}
}
}
return IterationDecision::Continue;
});

return create<ListValue>({});
}
Expand Down Expand Up @@ -923,11 +924,13 @@ void Execute::for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(Ref
}
auto& last_in_commands = commands.last();

last_in_commands.redirections.prepend(FdRedirection::create(STDOUT_FILENO, pipefd[1], Rewiring::Close::Destination));
last_in_commands.should_wait = true;
last_in_commands.redirections.append(FdRedirection::create(STDOUT_FILENO, pipefd[1], Rewiring::Close::Destination));
last_in_commands.should_wait = false;
last_in_commands.should_notify_if_in_background = false;
last_in_commands.is_pipe_source = false;

Core::EventLoop loop;

auto notifier = Core::Notifier::construct(pipefd[0], Core::Notifier::Read);
DuplexMemoryStream stream;

Expand All @@ -947,8 +950,8 @@ void Execute::for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(Ref

if (shell->options.inline_exec_keep_empty_segments)
if (callback(create<StringValue>("")) == IterationDecision::Break) {
loop.quit(Break);
notifier->set_enabled(false);
// FIXME: Kill all the jobs here.
return Break;
}
} else {
Expand All @@ -958,8 +961,8 @@ void Execute::for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(Ref

auto str = StringView(entry.data(), entry.size() - ifs.length());
if (callback(create<StringValue>(str)) == IterationDecision::Break) {
loop.quit(Break);
notifier->set_enabled(false);
// FIXME: Kill all the jobs here.
return Break;
}
}
Expand All @@ -969,42 +972,51 @@ void Execute::for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(Ref

return NothingLeft;
};
auto try_read = [&] {
constexpr static auto buffer_size = 4096;

notifier->on_ready_to_read = [&] {
constexpr static auto buffer_size = 16;
u8 buffer[buffer_size];
size_t remaining_size = buffer_size;

for (;;) {
notifier->set_event_mask(Core::Notifier::None);
bool should_enable_notifier = false;

ScopeGuard notifier_enabler { [&] {
if (should_enable_notifier)
notifier->set_event_mask(Core::Notifier::Read);
} };

if (check_and_call() == Break)
return;

auto read_size = read(pipefd[0], buffer, remaining_size);
if (read_size < 0) {
if (errno == EINTR)
int saved_errno = errno;
if (saved_errno == EINTR) {
should_enable_notifier = true;
continue;
}
if (saved_errno == 0)
continue;
if (errno == 0)
break;
dbg() << "read() failed: " << strerror(errno);
dbg() << "read() failed: " << strerror(saved_errno);
break;
}
if (read_size == 0)
break;

should_enable_notifier = true;
stream.write({ buffer, (size_t)read_size });
}
};

notifier->on_ready_to_read = [&] {
try_read();
loop.quit(Break);
};

for (auto& job : shell->run_commands(commands)) {
shell->block_on_job(job);
}
shell->run_commands(commands);

notifier->on_ready_to_read = nullptr;
loop.exec();

try_read();
notifier->on_ready_to_read = nullptr;

if (close(pipefd[0]) < 0) {
dbg() << "close() failed: " << strerror(errno);
Expand Down

0 comments on commit 103f659

Please sign in to comment.