package Mojo::Webqq::Run;
use List::Util qw(first);
use Mojo::Webqq::Base -base;
use bytes;
use Carp;
use Errno;
use Socket;
use Time::HiRes qw(time gettimeofday);
use Scalar::Util qw(blessed);
use Storable qw(thaw nfreeze);
use POSIX ":sys_wait_h";
use Mojo::Webqq::Log;
use Mojo::IOLoop;
use Mojo::Reactor;
has 'num_forks'  => sub { 0 };
has 'max_forks'  => sub { 0 };
has 'log'        => sub { Mojo::Webqq::Log->new };
has 'ioloop'     => sub { Mojo::IOLoop->singleton };
has [qw/reactor error is_child/];
 
our $VERSION = '0.3';
 
my $_obj  = undef;
 
BEGIN {
        *portable_pipe = sub () { my ($r, $w);
                pipe $r, $w or return;
                 
                ($r, $w);
        };
        *portable_socketpair = sub () {
                socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
                        or return;
                $fh1->autoflush(1);
                $fh2->autoflush(1);
                 
                ($fh1, $fh2)
        };      
}
 
sub new {my $class = shift; __PACKAGE__->singleton(@_) }
 
sub singleton {
        return $_obj if defined $_obj;
        my $class = shift;
        return $_obj = __PACKAGE__->_constructor(@_);
}
 
sub _constructor {
        my $proto = shift;
        my $class = ref($proto) || $proto;
        my $self  = $class->SUPER::new(@_);
 
        bless $self => $class;
         
        # install SIGCHLD handler
        $SIG{'CHLD'} = sub { _sig_chld($self, @_) };
         
        return $self;
}
 
sub log_level {
        my ($self, $level) = @_;
         
        $self->log->level($level) if defined $level;
         
        return $self->log->level;
}
 
sub spawn {
        my ($self, %opt) = @_;
         
        unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
                my $obj = __PACKAGE__->new;
                return $obj->spawn(%opt);
        }
         
        $self->error('');
         
        if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
                $self->error("Unable to spawn another subprocess: "
                        ."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
                );
                return 0;
        }
         
        # normalize and validate run parameters...
        my $proc = $self->_getRunStruct(\%opt);
        return 0 unless $self->_validateRunStruct($proc);
         
        $self->log->debug("Spawning command "
                ."timeout: "
                .($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
                ." : [$proc->{cmd}]"
        );
        my ($stdout_p, $stdout_c) = portable_socketpair;
        my ($stderr_p, $stderr_c) = portable_socketpair;
        my ($stdres_p, $stdres_c) = portable_socketpair;
         
        $proc->{time_started} = time;
        $proc->{running     } = 1;
        $proc->{hdr_stdout  } = $stdout_c;
        $proc->{hdr_stderr  } = $stderr_c;
        $proc->{hdr_stdres  } = $stdres_c;
         
        my $pid = fork;
         
        if ($pid) {
                # parent
                $self->num_forks($self->num_forks + 1);
                 
                $self->log->debug("Subprocess spawned as pid $pid.");
                 
                $proc->{pid} = $pid;
                 
                # exec timeout
                if (defined $proc->{exec_timeout} && $proc->{exec_timeout} > 0) {
                        $self->log->debug(
                                "[process $pid]: Setting execution timeout to " .
                                sprintf("%-.3f seconds.", $proc->{exec_timeout})
                        );
                        my $timer = $self->ioloop->timer(
                                $proc->{exec_timeout},
                                sub { _timeout_cb($self, $pid) }
                        );
         
                        # save timer
                        $proc->{id_timeout} = $timer;
                }
                 
                $self->{_data}->{$pid} = $proc;
                 
                close $stdout_p;
                close $stderr_p;
                close $stdres_p;
                 
                $self->watch('stdout', $pid);
                $self->watch('stderr', $pid);
                $self->watch('stdres', $pid);
        } else {
                # child
                 
                $self->is_child(1);
                 
                close $stdout_c;
                close $stderr_c;
                close $stdres_c;
                 
                # Stdio should not be tied.
                if (tied *STDOUT) {
                        carp "Cannot redirect into tied STDOUT.  Untying it";
                        untie *STDOUT;
                }
                if (tied *STDERR) {
                        carp "Cannot redirect into tied STDERR.  Untying it";
                        untie *STDERR;
                }
                 
                # Redirect STDOUT
                open STDOUT, ">&" . fileno($stdout_p)
                        or croak "can't redirect stdout in child pid $$: $!";
                # Redirect STDERR
                open STDERR, ">&" . fileno($stderr_p)
                        or croak "can't redirect stderr in child pid $$: $!";
 
                select STDERR; $| = 1;
                select STDOUT; $| = 1;
                 
                if (ref $proc->{cmd} eq 'CODE') {
                        my @rv = eval { $proc->{cmd}->($$, $proc->{param}); };
                         
                        if ($@) {
                                carp "exec of coderef failed: $@\n";
                                exit 255;
                        }
                         
                        print $stdres_p nfreeze(\ @rv);
                         
                } else {
                        exec(ref $proc->{cmd} eq 'ARRAY' ? @{ $proc->{cmd} } : $proc->{cmd}) or do {
                                carp "exec failed";
                                exit 255;
                        };
                }
                 
                close $stdout_p;
                close $stderr_p;
                close $stdres_p;
                 
                exit 1;
        }
         
        return $pid;
}
 
sub start { shift->ioloop->start }
 
sub watch {
        my $self = shift;
        my $io   = lc(shift || '');
        my $pid  = shift;
         
        my $proc = $self->get_proc($pid);
         
        $self->log->error('Cant start IO watcher off NULL process'           ) and return unless $proc;
        $self->log->error("[process $proc->{pid}]: IO ($io) is unsupported"  ) and return unless first {$io eq $_} qw/stdout stderr stdres/;
        $self->log->error("[process $proc->{pid}]: IO handler ($io) is EMPTY") and return unless $proc->{"hdr_$io"};
         
        my $id = fileno $proc->{"hdr_$io"};
         
        $self->ioloop->reactor->io($proc->{"hdr_$io"}, sub {
                my $chunk = undef;
                my $len   = sysread $proc->{"hdr_$io"}, $chunk, 65536;
                 
                return unless defined $len or $! != Errno::EINTR;
                 
                if (!$len) {
                        $self->drop_handle($pid, $io);
                        return;
                }
                 
                if (defined $proc->{"$io\_cb"}) {
                        $self->log->debug("[process $proc->{pid}]: (handle: $id) Invoking ".uc($io)." callback.");
                         
                        eval { $proc->{"$io\_cb"}->($proc->{pid}, $chunk) };
                         
                        if ($@) {
                                $self->log->error("[process $proc->{pid}]: (handle: $id) Exception in $io\_cb: $@");
                        }
                }   
                #else {
                {
                        # append to buffer
                        $self->log->debug("[process $proc->{pid}]: (handle: $id) Appending $len bytes to ".uc($io)." buffer.");
                        $proc->{"buf_$io"} .= $chunk;
                }
        })->watch($proc->{"hdr_$io"}, 1, 0);
}
 
sub drop_handle {
        my $self = shift;
        my $pid  = shift;
        my $io   = lc(shift || '');
         
        my $proc = $self->get_proc($pid);
        return unless $proc;
         
        $self->log->debug("[process $pid]: Got HUP for unmanaged handle ".$proc->{"hdr_$io"}."; ignoring.") and return
                unless $proc->{"hdr_$io"};
         
         
        $self->ioloop->remove( $proc->{"hdr_$io"} );
        undef $proc->{"hdr_$io"};
         
        $self->log->debug("[process $pid]: ".uc($io)." closed.");
         
        $self->complete($pid);
}
 
sub get_proc {
        my ($self, $pid) = @_;
         
        no warnings;
        my $err = "[process $pid]: Unable to get process data structure: ";
         
        unless (defined $pid) {
                $self->error($err . "Undefined pid.");
                return undef;
        }
         
        unless (
                exists $self->{_data}->{$pid}
                && defined $self->{_data}->{$pid}
        ) {
                $self->error($err . "Non-managed process pid: $pid");
                return undef;
        }
 
        return $self->{_data}->{$pid};
}
 
sub cleanup {
        my ($self, $pid, $exit_val, $signum, $core) = @_;
         
        my $proc = $self->get_proc($pid);
        unless (defined $proc) {
                no warnings;
                $self->log->warn("Untracked process pid $pid exited with exit status $exit_val by signal $signum, core: $core.");
                return 0;
        }
        return 0 if $proc->{cleanup};
         
        $proc->{cleanup} = 1;
 
        $self->log->debug("[process $pid]: Got SIGCHLD, "
                . "exited with exit status: $exit_val by signal $signum"
                . (($core) ? "with core dump." : ".")
        );
 
        if (defined $proc->{id_timeout}) {
                $self->ioloop->remove($proc->{id_timeout});
                $proc->{id_timeout} = undef;
        }
        if ($proc->{hard_kill}) {
                for (qw/stderr stdout stdres/) {
                        $self->drop_handle($pid, $_) if $proc->{"hdr_$_"};
                }
        }
        $proc->{exit_status} = $exit_val;
        $proc->{exit_core  } = $core;
        $proc->{exit_signal} = $signum;
 
        # command timings...
        my $te = time;
        $proc->{time_stopped      } = $te;
        $proc->{time_duration_exec} = $te - $proc->{time_started};
 
        # this process is no longer running
        $proc->{running} = 0;
 
        $self->complete($pid);
}
 
sub complete {
        my ($self, $pid, $force) = @_;
         
        my $proc = $self->get_proc($pid);
         
        return 0 if !$force
                && (
                        $proc->{running}
                        || defined $proc->{hdr_stdout}
                        || defined $proc->{hdr_stdres}
                        || defined $proc->{hdr_stderr}
                );
         
 
        if ($proc && %$proc) {
                $self->log->debug("[process $pid]: All streams closed, process execution complete.");
                 
                $proc->{time_duration_total} = time - $proc->{time_started};
 
                # fire exit callback!
                if (defined $proc->{exit_cb} && ref $proc->{exit_cb} eq 'CODE') {
                        my $result = eval { $proc->{buf_stdres} ? thaw($proc->{buf_stdres}) : undef};
                         
                        if ($@) {
                                croak "Error de-serializing subprocess data: $@";
                        }
                         
                        # prepare callback structure
                        my $cb_d = {
                                cmd => ref $proc->{cmd} eq 'CODE'  ? 'CODE'                     :
                                           ref $proc->{cmd} eq 'ARRAY' ? join(' ', @{$proc->{cmd}}) :
                                           $proc->{cmd}
                                ,
                                param               => $proc->{param},
                                is_timeout          => $proc->{is_timeout},
                                exit_status         => $proc->{exit_status},
                                exit_signal         => $proc->{exit_signal},
                                exit_core           => $proc->{exit_core},
                                stdout              => $proc->{buf_stdout},
                                stderr              => ($proc->{buf_stderr} ? $proc->{buf_stderr} : '').($proc->{stderr} ? $proc->{stderr} : ''),
                                result              => $result,
                                time_started        => $proc->{time_started},
                                time_stopped        => $proc->{time_stopped},
                                time_duration_exec  => $proc->{time_duration_exec},
                                time_duration_total => $proc->{time_duration_total},
                        };
 
                        # safely invoke callback
                        $self->log->debug("[process $pid]: invoking exit_cb callback.");
                        eval { $proc->{exit_cb}->($pid, $cb_d); };
                         
                        $self->log->error("[process $pid]: Error running exit_cb: $@") if $@;
                } else {
                        $self->log->error("[process $pid]: No exit_cb callback!");
                }
        }
 
        delete $self->{_data}->{$pid};
        $self->num_forks($self->num_forks - 1);
}
 
sub _sig_chld {
        my ($self) = @_;
 
        no strict 'subs';
         
        my $i = 0;
        while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
                $i++;
                my $exit_val = $? >> 8;
                my $signum   = $? & 127;
                my $core     = $? & 128;
 
                # do process cleanup
                $self->cleanup($pid, $exit_val, $signum, $core);
        }
         
        $self->log->debug("SIGCHLD handler cleaned up after $i process(es).")
          if $i > 0;
}
 
sub _getRunStruct {
        my ($self, $opt) = @_;
         
        my $s = {
                pid          => 0,
                cmd          => undef,
                param        => undef,
                error        => undef,
                stdout_cb    => undef,
                stderr_cb    => undef,
                exit_cb      => undef,
                is_timeout   => undef,
                exec_timeout => 0,
                buf_stdout   => '',
                buf_stderr   => '',
                buf_stdres   => '',
                hdr_stdout   => undef,
                hdr_stderr   => undef,
                hdr_stdres   => undef,
        };
 
        # apply user defined vars...
        $s->{$_} = $opt->{$_}
                for grep { exists $s->{$_} } keys %$opt;
 
        return $s;
}
 
sub _validateRunStruct {
        my ($self, $s) = @_;
 
        # command?
        $self->error('Undefined command.') and return
                unless defined $s->{cmd};
         
        # check command...
        my $cmd_ref = ref $s->{cmd};
        $self->error('Zero-length command.') and return
                if $cmd_ref eq '' && length $s->{cmd} == 0;
         
        $self->error('Command can be pure scalar, arrayref or coderef.') and return
                if $cmd_ref ne '' && not defined first {$cmd_ref eq $_} ('CODE', 'ARRAY');
 
        # callbacks...
        $self->error("STDOUT callback defined, but is not code reference.") and return
                if defined $s->{stdout_cb} && ref $s->{stdout_cb} ne 'CODE';
         
        $self->error("STDERR callback defined, but is not code reference.") and return
                if defined $s->{stderr_cb} && ref $s->{stderr_cb} ne 'CODE';
         
        $self->error("Process exit_cb callback defined, but is not code reference.") and return
                if defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE';
 
        # exec timeout
        { no warnings; $s->{exec_timeout} += 0; }
 
        return 1;
}
 
sub _timeout_cb {
        my ($self, $pid) = @_;
         
        my $proc = $self->get_proc($pid);
        return 0 unless $proc;
         
        # drop timer (can't hurt...)
        if (defined $proc->{id_timeout}) {
                $self->ioloop->remove($proc->{id_timeout});
                $proc->{id_timeout} = undef;
        }
 
        # is process still alive?
        return 0 unless kill 0, $pid;
 
        $self->log->debug("[process $pid]: Execution timeout ("
                .sprintf("%-.3f seconds).", $proc->{exec_timeout})
                ." Killing process.");
 
        $proc->{stderr} .= ";Execution timeout.";
        $proc->{is_timeout} = 1;
         
        # kill the motherfucker!
 
        unless (CORE::kill(9, $pid)) {
                $self->log->warn("[process $pid]: Unable to kill process: $!");
        }
        $proc->{hard_kill} = 1;
        $self->cleanup($pid, 0, 9, 0);
 
        return 1;
}
 
sub kill {
        my ($self, $pid, $signal) = @_;
        $signal = 15 unless defined $signal;
         
        my $proc = $self->get_proc($pid);
        return 0 unless $proc;
 
        # kill the process...
        unless (kill($signal, $pid)) {
                $self->error("Unable to send signal $signal to process $pid: $!");
                return 0;
        }
         
        return 1;
}
 
sub DESTROY {
        my ($self) = @_;
         
        # perform cleanup...
        unless ($self->is_child) {
                foreach my $pid (keys %{$self->{_data}}) {
                        my $proc = $self->{_data}->{$pid};
                         
                        $self->log->debug("Killing subprocess $pid with SIGKILL") if $self->log;
                        # kill process (HARD!)
                        $self->kill($pid, 9);
         
                        next unless defined $self->ioloop;
         
                        # drop fds
                        $self->drop_handle($pid, $_) for grep {$proc->{"hdr_$_"}} qw/stdout stderr stdres/;
         
                        # fire exit callbacks (if any)
                        $self->complete($pid, 1);
                }
        }
 
        # disable sigchld hander
        $SIG{'CHLD'} = 'IGNORE';
}
 
1;