Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline() cannot read stdout into IOBuffer #14437

Closed
c42f opened this issue Dec 18, 2015 · 8 comments
Closed

pipeline() cannot read stdout into IOBuffer #14437

c42f opened this issue Dec 18, 2015 · 8 comments
Labels
domain:io Involving the I/O subsystem: libuv, read, write, etc. needs docs Documentation for this change is required

Comments

@c42f
Copy link
Member

c42f commented Dec 18, 2015

Reproduce with

julia> run(pipeline(`ls`, stdout=IOBuffer()))
ERROR: MethodError: `uvhandle` has no method matching uvhandle(::Base.AbstractIOBuffer{Array{UInt8,1}})
 in _jl_spawn at process.jl:253
 in anonymous at process.jl:415
 in setup_stdio at process.jl:403
 in spawn at process.jl:414
 in spawn at process.jl:293
 in run at process.jl:530
@tkelman tkelman added the domain:io Involving the I/O subsystem: libuv, read, write, etc. label Dec 18, 2015
@vtjnash vtjnash added the status:won't change Indicates that work won't continue on an issue or pull request label Jul 27, 2016
@vtjnash
Copy link
Sponsor Member

vtjnash commented Jul 27, 2016

we can't readily support arbitrary Julia IO object by converting them to kernel objects. the right object to pass to STDOUT here is Pipe().

@tkelman tkelman added the needs docs Documentation for this change is required label Jul 27, 2016
@c42f
Copy link
Member Author

c42f commented Jul 27, 2016

Right, it sounds like I was doing something fundamentally misguided here.

To give a bit more context about where I went wrong, I was trying to read data from both stdout and stderr, and I wanted to read them into memory rather than going to temporary files. If these are both Pipe objects making sure things don't block becomes a bit more interesting since I can't just call readstring(). After some fiddling I still don't seem to have the right combination of spawn and @async.

out = Pipe()
err = Pipe()
spawn(pipeline(`ls`, stdout=out, stderr=err)) # avoiding run, as that'll potentially block when the pipe gets full
# what now!

@vtjnash
Copy link
Sponsor Member

vtjnash commented Jul 27, 2016

out = Pipe()
err = Pipe()
p = spawn(pipeline(`ls`, stdout=out, stderr=err))
close(out.in); close(err.in)
err_t = @async readstring(err)
out_s = readstring(out)
err_s = wait(err_t)

@c42f
Copy link
Member Author

c42f commented Jul 28, 2016

Thanks a lot for the snippet!

A question about the won't fix label - it seems like it'd be possible to support an arbitrary IO by delegating to Pipe() as the underlying object passed to libuv, and reading bytes asynchronously from that pipe into any arbitrary user-defined IO object. Is there a reason why this fundamentally won't work or is a bad idea?

@c42f
Copy link
Member Author

c42f commented Jul 28, 2016

Proof of concept, which does appear to work:

import Base: AbstractCmd, StdIOSet, Callback, ProcessChain, STDOUT_NO, STDERR_NO, STDIN_NO

# Copy stream src to dest in preferred chunks of size `bufsize`.  Perhaps there's a builtin function for this.
function copystream(src::IO, dest::IO, bufsize=10*1024)
    buf = Vector{UInt8}(bufsize)
    while isopen(src)
        nread = readbytes!(src, buf)
        write(dest, buf[1:nread]) # fixme - should avoid the slice, but unsure how to do so.
        #println("wrote $nread bytes")
    end
end

# Special duplicate of Base.CmdRedirect for redirecting to IOBuffer - would
# need generalization.
immutable CmdRedirect2 <: AbstractCmd
    cmd::AbstractCmd
    handle::IOBuffer
    stream_no::Int
    pipe::Pipe
end

CmdRedirect2(cmd, handle, stream_no) = CmdRedirect2(cmd, handle, stream_no, Pipe())


Base.redir_out(src::AbstractCmd, dest::IOBuffer) = CmdRedirect2(src, dest, STDOUT_NO)
Base.redir_err(src::AbstractCmd, dest::IOBuffer) = CmdRedirect2(src, dest, STDERR_NO)


function Base.spawn(redirect::CmdRedirect2, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}())
    p = spawn(redirect.cmd,
          (redirect.stream_no == STDIN_NO  ? redirect.pipe : stdios[1],
           redirect.stream_no == STDOUT_NO ? redirect.pipe : stdios[2],
           redirect.stream_no == STDERR_NO ? redirect.pipe : stdios[3]),
          exitcb, closecb, chain=chain)
    close(redirect.pipe.in)
    # Um.  Is using @async acceptable here?
    @async copystream(redirect.pipe.out, redirect.handle)
    p
end


#-------------------------------------------------------------------------------
# Usage
out = IOBuffer()
err = IOBuffer()
run(pipeline(`ls -R /home/cfoster/tmp`, stdout=out, stderr=err))

@c42f
Copy link
Member Author

c42f commented Jul 28, 2016

There's lots of (I think fixable) things wrong with the above PoC of course - eg, using async for the stream copy means that run() will probably exit before the contents of the pipe buffer have been fully copied across.

@vtjnash
Copy link
Sponsor Member

vtjnash commented Jul 28, 2016

That looks about right to me (the @async call looks correct as well). I don't see how you can signal EOF through the downstream IO objects. Maybe all that needs is an API for signaling it, but it also would seem to imply every IO object needs to contain a Condition object for EOF notification.

As a package for the above code, I could see this EOF information simply being expected to be handled via a side-channel. For example, you could subtype AbstractPipe to quickly make a new abstraction. Maybe something like:

import Base: Pipe, pipe_reader, pipe_writer, eof
immutable PipeFitting <: AbstractPipe
    eof::Condition
    out::IO
    in::Pipe
    function PipeFitting(out)
        p = new(Condition(), out, Pipe())
        @async copystream(p.out, p.in)
        return p
    end
end
pipe_reader(p::PipeFitting) = p.out
pipe_writer(p::PipeFitting) = p.in

seteof(p::PipeFitting) = (close(p.in); notify(p.eof))
eof(p::PipeFitting) = nb_available(p.out) ? false : (isopen(p.in) ? wait(p.eof) : true)

@JeffBezanson JeffBezanson removed the status:won't change Indicates that work won't continue on an issue or pull request label Aug 15, 2016
@c42f
Copy link
Member Author

c42f commented Feb 5, 2019

I think this was fixed in #30278. Nice!

@c42f c42f closed this as completed Feb 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain:io Involving the I/O subsystem: libuv, read, write, etc. needs docs Documentation for this change is required
Projects
None yet
Development

No branches or pull requests

4 participants