# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>

# Like most Perl modules in public-inbox, this is internal and
# NOT subject to any stability guarantees!  It is only documented
# for other hackers.
#
# This is used to limit the number of processes spawned by the
# PSGI server, so it acts like a semaphore and queues up extra
# commands to be run if currently at the limit.  Multiple "limiters"
# may be configured which give inboxes different channels to
# operate in.  This can be useful to ensure smaller inboxes can
# be cloned while cloning of large inboxes is maxed out.
#
# This does not depend on the PublicInbox::DS::event_loop or any
# other external scheduling mechanism, you just need to call
# start() and finish() appropriately. However, public-inbox-httpd
# (which uses PublicInbox::DS)  will be able to schedule this
# based on readability of stdout from the spawned process.
# See GitHTTPBackend.pm and SolverGit.pm for usage examples.
# It does not depend on any form of threading.
#
# This is useful for scheduling CGI execution of both long-lived
# git-http-backend(1) process (for "git clone") as well as short-lived
# processes such as git-apply(1).

package PublicInbox::Qspawn;
use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
use PublicInbox::Syscall qw(EPOLLIN);
use PublicInbox::InputPipe;
use Carp qw(carp confess);

# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);

my $def_limiter;

# declares a command to spawn (but does not spawn it).
# $cmd is the command to spawn
# $cmd_env is the environ for the child process (not PSGI env)
# $opt can include redirects and perhaps other process spawning options
# {qsp_err} is an optional error buffer callers may access themselves
sub new {
	my ($class, $cmd, $cmd_env, $opt) = @_;
	bless { args => [ $cmd, $cmd_env, $opt ? { %$opt } : {} ] }, $class;
}

sub _do_spawn {
	my ($self, $start_cb, $limiter) = @_;
	my ($cmd, $cmd_env, $opt) = @{$self->{args}};
	my %o = %{$opt || {}};
	$self->{limiter} = $limiter;
	for my $k (@PublicInbox::Spawn::RLIMITS) {
		$opt->{$k} = $limiter->{$k} // next;
	}
	$self->{-quiet} = 1 if $o{quiet};
	$limiter->{running}++;
	if ($start_cb) {
		eval { # popen_rd may die on EMFILE, ENFILE
			$self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
							\&waitpid_err, $self);
			$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
		};
	} else {
		eval { run_await($cmd, $cmd_env, $opt, \&wait_await, $self) };
		warn "E: $@" if $@;
	}
	finish($self, $@) if $@;
}

sub psgi_status_err { # Qspawn itself is useful w/o PSGI
	require PublicInbox::WwwStatic;
	PublicInbox::WwwStatic::r($_[0] // 500);
}

sub finalize ($) {
	my ($self) = @_;

	# process is done, spawn whatever's in the queue
	my $limiter = delete $self->{limiter} or return;
	my $running = --$limiter->{running};

	if ($running < $limiter->{max}) {
		if (my $next = shift(@{$limiter->{run_queue}})) {
			_do_spawn(@$next, $limiter);
		}
	}
	if (my $err = $self->{_err}) { # set by finish or waitpid_err
		utf8::decode($err);
		if (my $dst = $self->{qsp_err}) {
			$$dst .= $$dst ? " $err" : "; $err";
		}
		warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet};
	}

	my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
	if ($qx_cb_arg) {
		my $cb = shift @$qx_cb_arg;
		eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) };
		return unless $@;
		warn "E: $@"; # hope qspawn.wcb can handle it
	}
	return if $self->{passed}; # another command chained it
	if (my $wcb = delete $env->{'qspawn.wcb'}) {
		# have we started writing, yet?
		$wcb->(psgi_status_err($env->{'qspawn.fallback'}));
	}
}

sub waitpid_err { # callback for awaitpid
	my (undef, $self) = @_; # $_[0]: pid
	$self->{_err} = ''; # for defined check in ->finish
	if ($?) { # XXX this may be redundant
		my $status = $? >> 8;
		my $sig = $? & 127;
		$self->{_err} .= "exit status=$status";
		$self->{_err} .= " signal=$sig" if $sig;
	}
	finalize($self) if !$self->{rpipe};
}

sub wait_await { # run_await cb
	my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
	waitpid_err($pid, $self);
}

sub yield_chunk { # $_[-1] is sysread buffer (or undef)
	my ($self, $ipipe) = @_;
	if (!defined($_[-1])) {
		warn "error reading body: $!";
	} elsif ($_[-1] eq '') { # normal EOF
		$self->finish;
		$self->{qfh}->close;
	} elsif (defined($self->{qfh}->write($_[-1]))) {
		return; # continue while HTTP client is reading our writes
	} # else { # HTTP client disconnected
	delete $self->{rpipe};
	$ipipe->close;
}

sub finish ($;$) {
	my ($self, $err) = @_;
	$self->{_err} //= $err; # only for $@

	# we can safely finalize if pipe was closed before, or if
	# {_err} is defined by waitpid_err.  Deleting {rpipe} will
	# trigger PublicInbox::IO::DESTROY -> waitpid_err,
	# but it may not fire right away if inside the event loop.
	my $closed_before = !delete($self->{rpipe});
	finalize($self) if $closed_before || defined($self->{_err});
}

sub start ($$$) {
	my ($self, $limiter, $start_cb) = @_;
	if ($limiter->{running} < $limiter->{max}) {
		_do_spawn($self, $start_cb, $limiter);
	} else {
		push @{$limiter->{run_queue}}, [ $self, $start_cb ];
	}
}

# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env.  As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
	my ($self, $env, $limiter, @qx_cb_arg) = @_;
	$self->{psgi_env} = $env;
	$self->{qx_cb_arg} = \@qx_cb_arg;
	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
	start($self, $limiter, undef);
}

sub yield_pass {
	my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
	my $env = $self->{psgi_env};
	my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
	if (ref($res) eq 'CODE') { # chain another command
		delete $self->{rpipe};
		$ipipe->close if $ipipe;
		$res->($wcb);
		$self->{passed} = 1;
		return; # all done
	}
	confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';

	my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
			pop(@$res) : delete($env->{'qspawn.filter'});
	$filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);

	if (scalar(@$res) == 3) { # done early (likely error or static file)
		delete $self->{rpipe};
		$ipipe->close if $ipipe;
		$wcb->($res); # all done
		return;
	}
	scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
	return ($wcb, $filter) if !$ipipe; # generic PSGI
	# streaming response
	my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
	$qfh = $filter->attach($qfh) if $filter;
	my ($bref) = @{delete $self->{yield_parse_hdr}};
	$qfh->write($$bref) if $$bref ne '';
	$self->{qfh} = $qfh; # keep $ipipe open
}

sub parse_hdr_done ($$) {
	my ($self) = @_;
	my ($ret, $err);
	if (defined $_[-1]) {
		my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
		$$bref .= $_[-1];
		$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
		if (($err = $@)) {
			$ret = psgi_status_err();
		} elsif (!$ret && $_[-1] eq '') {
			$err = 'EOF';
			$ret = psgi_status_err();
		}
	} else {
		$err = "$!";
		$ret = psgi_status_err();
	}
	carp <<EOM if $err;
E: $err @{$self->{args}->[0]} ($self->{psgi_env}->{REQUEST_URI})
EOM
	$ret; # undef if headers incomplete
}

sub ipipe_cb { # InputPipe callback
	my ($ipipe, $self) = @_; # $_[-1] rbuf
	if ($self->{qfh}) { # already streaming
		yield_chunk($self, $ipipe, $_[-1]);
	} elsif (my $res = parse_hdr_done($self, $_[-1])) {
		yield_pass($self, $ipipe, $res);
	} # else: headers incomplete, keep reading
}

sub _yield_start { # may run later, much later...
	my ($self) = @_;
	if ($self->{psgi_env}->{'pi-httpd.async'}) {
		my $rpipe = $self->{rpipe};
		PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
	} else {
		require PublicInbox::GetlineResponse;
		PublicInbox::GetlineResponse::response($self);
	}
}

# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
# optional keys in $env:
#   $env->{'qspawn.wcb'} - the write callback from the PSGI server
#                          optional, use this if you've already
#                          captured it elsewhere.  If not given,
#                          psgi_yield will return an anonymous
#                          sub for the PSGI server to call
#
#   $env->{'qspawn.filter'} - filter object, responds to ->attach for
#                             pi-httpd.async and ->translate for generic
#                             PSGI servers
#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
#              It will be given the return value of sysread from the pipe
#              and a string ref of the current buffer.  Returns an arrayref
#              for PSGI responses.  2-element arrays in PSGI mean the
#              body will be streamed, later, via writes (push-based) to
#              psgix.io.  3-element arrays means the body is available
#              immediately (or streamed via ->getline (pull-based)).

sub psgi_yield {
	my ($self, $env, $limiter, @parse_hdr_arg)= @_;
	$self->{psgi_env} = $env;
	$self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);

	# the caller already captured the PSGI write callback from
	# the PSGI server, so we can call ->start, here:
	$env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
		# the caller will return this sub to the PSGI server, so
		# it can set the response callback (that is, for
		# PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
		# but other HTTP servers are supported:
		$env->{'qspawn.wcb'} = $_[0];
		start($self, $limiter, \&_yield_start);
	}
}

no warnings 'once';
*DESTROY = \&finalize; # ->finalize is idempotent

1;
