| # Copyright (C) all contributors <meta@public-inbox.org> |
| # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> |
| |
| # Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable. |
| package PublicInbox::XapHelper; |
| use v5.12; |
| use Getopt::Long (); # good API even if we only use short options |
| our $GLP = Getopt::Long::Parser->new; |
| $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev)); |
| use PublicInbox::Search qw(xap_terms); |
| use PublicInbox::CodeSearch; |
| use PublicInbox::IPC; |
| use PublicInbox::IO qw(read_all); |
| use Socket qw(SOL_SOCKET SO_TYPE SOCK_SEQPACKET AF_UNIX); |
| use PublicInbox::DS qw(awaitpid); |
| use autodie qw(open getsockopt); |
| use POSIX qw(:signal_h); |
| use Fcntl qw(LOCK_UN LOCK_EX); |
| use Carp qw(croak); |
| my $X = \%PublicInbox::Search::X; |
| our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX); |
| our $stderr = \*STDERR; |
| |
| sub cmd_test_inspect { |
| my ($req) = @_; |
| print { $req->{0} } "pid=$$ has_threadid=", |
| ($req->{srch}->has_threadid ? 1 : 0) |
| } |
| |
| sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 } |
| |
| sub iter_retry_check ($) { |
| if (ref($@) =~ /\bDatabaseModifiedError\b/) { |
| $_[0]->{srch}->reopen; |
| undef; # retries |
| } elsif (ref($@) =~ /\bDocNotFoundError\b/) { |
| warn "doc not found: $@"; |
| 0; # continue to next doc |
| } else { |
| die; |
| } |
| } |
| |
| sub term_length_extract ($) { |
| my ($req) = @_; |
| @{$req->{A_len}} = map { |
| my $len = s/([0-9]+)\z// ? ($1 + 0) : undef; |
| [ $_, $len ]; |
| } @{$req->{A}}; |
| } |
| |
| sub dump_ibx_iter ($$$) { |
| my ($req, $ibx_id, $it) = @_; |
| my $out = $req->{0}; |
| eval { |
| my $doc = $it->get_document; |
| for my $pair (@{$req->{A_len}}) { |
| my ($pfx, $len) = @$pair; |
| my @t = xap_terms($pfx, $doc); |
| @t = grep { length == $len } @t if defined($len); |
| for (@t) { |
| print $out "$_ $ibx_id\n" or die "print: $!"; |
| ++$req->{nr_out}; |
| } |
| } |
| }; |
| $@ ? iter_retry_check($req) : 0; |
| } |
| |
| sub emit_mset_stats ($$) { |
| my ($req, $mset) = @_; |
| my $err = $req->{1} or croak "BUG: caller only passed 1 FD"; |
| say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out} |
| } |
| |
| sub cmd_dump_ibx { |
| my ($req, $ibx_id, $qry_str) = @_; |
| $qry_str // die 'usage: dump_ibx [OPTIONS] IBX_ID QRY_STR'; |
| $req->{A} or die 'dump_ibx requires -A PREFIX'; |
| term_length_extract $req; |
| my $max = $req->{'m'} // $req->{srch}->{xdb}->get_doccount; |
| my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 }; |
| $opt->{eidx_key} = $req->{O} if defined $req->{O}; |
| my $mset = $req->{srch}->mset($qry_str, $opt); |
| $req->{0}->autoflush(1); |
| for my $it ($mset->items) { |
| for (my $t = 10; $t > 0; --$t) { |
| $t = dump_ibx_iter($req, $ibx_id, $it) // $t; |
| } |
| } |
| emit_mset_stats($req, $mset); |
| } |
| |
| sub dump_roots_iter ($$$) { |
| my ($req, $root2off, $it) = @_; |
| eval { |
| my $doc = $it->get_document; |
| my $G = join(' ', map { $root2off->{$_} } xap_terms('G', $doc)); |
| for my $pair (@{$req->{A_len}}) { |
| my ($pfx, $len) = @$pair; |
| my @t = xap_terms($pfx, $doc); |
| @t = grep { length == $len } @t if defined($len); |
| for (@t) { |
| $req->{wbuf} .= "$_ $G\n"; |
| ++$req->{nr_out}; |
| } |
| } |
| }; |
| $@ ? iter_retry_check($req) : 0; |
| } |
| |
| sub dump_roots_flush ($$) { |
| my ($req, $fh) = @_; |
| if ($req->{wbuf} ne '') { |
| until (flock($fh, LOCK_EX)) { die "LOCK_EX: $!" if !$!{EINTR} } |
| print { $req->{0} } $req->{wbuf} or die "print: $!"; |
| until (flock($fh, LOCK_UN)) { die "LOCK_UN: $!" if !$!{EINTR} } |
| $req->{wbuf} = ''; |
| } |
| } |
| |
| sub cmd_dump_roots { |
| my ($req, $root2off_file, $qry_str) = @_; |
| $qry_str // die 'usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR'; |
| $req->{A} or die 'dump_roots requires -A PREFIX'; |
| term_length_extract $req; |
| open my $fh, '<', $root2off_file; |
| my $root2off; # record format: $OIDHEX "\0" uint32_t |
| my @x = split(/\0/, read_all $fh); |
| while (defined(my $oidhex = shift @x)) { |
| $root2off->{$oidhex} = shift @x; |
| } |
| my $opt = { relevance => -1, limit => $req->{'m'}, |
| offset => $req->{o} // 0 }; |
| my $mset = $req->{srch}->mset($qry_str, $opt); |
| $req->{0}->autoflush(1); |
| $req->{wbuf} = ''; |
| for my $it ($mset->items) { |
| for (my $t = 10; $t > 0; --$t) { |
| $t = dump_roots_iter($req, $root2off, $it) // $t; |
| } |
| if (!($req->{nr_out} & 0x3fff)) { |
| dump_roots_flush($req, $fh); |
| } |
| } |
| dump_roots_flush($req, $fh); |
| emit_mset_stats($req, $mset); |
| } |
| |
| sub mset_iter ($$) { |
| my ($req, $it) = @_; |
| say { $req->{0} } $it->get_docid, "\0", |
| $it->get_percent, "\0", $it->get_rank; |
| } |
| |
| sub cmd_mset { # to be used by WWW + IMAP |
| my ($req, $qry_str) = @_; |
| $qry_str // die 'usage: mset [OPTIONS] QRY_STR'; |
| my $opt = { limit => $req->{'m'}, offset => $req->{o} // 0 }; |
| $opt->{relevance} = 1 if $req->{r}; |
| $opt->{threads} = 1 if defined $req->{t}; |
| $opt->{git_dir} = $req->{g} if defined $req->{g}; |
| $opt->{eidx_key} = $req->{O} if defined $req->{O}; |
| $opt->{threadid} = $req->{T} if defined $req->{T}; |
| my $mset = $req->{srch}->mset($qry_str, $opt); |
| say { $req->{0} } 'mset.size=', $mset->size, |
| ' .get_matches_estimated=', $mset->get_matches_estimated; |
| for my $it ($mset->items) { |
| for (my $t = 10; $t > 0; --$t) { |
| $t = mset_iter($req, $it) // $t; |
| } |
| } |
| } |
| |
| sub srch_init_extra ($) { |
| my ($req) = @_; |
| my $qp = $req->{srch}->{qp}; |
| for (@{$req->{Q}}) { |
| my ($upfx, $m, $xpfx) = split /([:=])/; |
| $xpfx // die "E: bad -Q $_"; |
| $m = $m eq '=' ? 'add_boolean_prefix' : 'add_prefix'; |
| $qp->$m($upfx, $xpfx); |
| } |
| $req->{srch}->{qp_extra_done} = 1; |
| } |
| |
| sub dispatch { |
| my ($req, $cmd, @argv) = @_; |
| my $fn = $req->can("cmd_$cmd") or return; |
| $GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC) |
| or return; |
| my $dirs = delete $req->{d} or die 'no -d args'; |
| my $key = "-d\0".join("\0-d\0", @$dirs); |
| $key .= "\0".join("\0", map { ('-Q', $_) } @{$req->{Q}}) if $req->{Q}; |
| my $new; |
| $req->{srch} = $SRCH{$key} // do { |
| $new = { qp_flags => $PublicInbox::Search::QP_FLAGS }; |
| my $nfd = scalar(@$dirs) * PublicInbox::Search::SHARD_COST; |
| $SHARD_NFD += $nfd; |
| if ($SHARD_NFD > $MY_FD_MAX) { |
| $SHARD_NFD = $nfd; |
| %SRCH = (); |
| } |
| my $first = shift @$dirs; |
| for my $retried (0, 1) { |
| my $slow_phrase = -f "$first/iamchert"; |
| eval { |
| $new->{xdb} = $X->{Database}->new($first); |
| for (@$dirs) { |
| $slow_phrase ||= -f "$_/iamchert"; |
| $new->{xdb}->add_database( |
| $X->{Database}->new($_)) |
| } |
| }; |
| last unless $@; |
| if ($retried) { |
| die "E: $@\n"; |
| } else { # may be EMFILE/ENFILE/ENOMEM.... |
| warn "W: $@, retrying...\n"; |
| %SRCH = (); |
| $SHARD_NFD = $nfd; |
| } |
| $slow_phrase or $new->{qp_flags} |
| |= PublicInbox::Search::FLAG_PHRASE(); |
| } |
| bless $new, $req->{c} ? 'PublicInbox::CodeSearch' : |
| 'PublicInbox::Search'; |
| $new->qparse_new; |
| $SRCH{$key} = $new; |
| }; |
| $req->{srch}->{xdb}->reopen unless $new; |
| $req->{Q} && !$req->{srch}->{qp_extra_done} and |
| srch_init_extra $req; |
| my $timeo = $req->{K}; |
| alarm($timeo) if $timeo; |
| $fn->($req, @argv); |
| alarm(0) if $timeo; |
| } |
| |
| sub recv_loop { |
| local $SIG{__WARN__} = sub { print $stderr @_ }; |
| my $rbuf; |
| local $SIG{TERM} = sub { undef $in }; |
| local $SIG{USR1} = \&reopen_logs; |
| while (defined($in)) { |
| PublicInbox::DS::sig_setmask($workerset); |
| my @fds = eval { # we undef $in in SIG{TERM} |
| $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33) |
| }; |
| if ($@) { |
| exit if !$in; # hit by SIGTERM |
| die; |
| } |
| scalar(@fds) or exit(66); # EX_NOINPUT |
| die "recvmsg: $!" if !defined($fds[0]); |
| PublicInbox::DS::block_signals(POSIX::SIGALRM); |
| my $req = bless {}, __PACKAGE__; |
| my $i = 0; |
| open($req->{$i++}, '+<&=', $_) for @fds; |
| local $stderr = $req->{1} // \*STDERR; |
| die "not NUL-terminated" if chop($rbuf) ne "\0"; |
| my @argv = split(/\0/, $rbuf); |
| $req->{nr_out} = 0; |
| $req->dispatch(@argv) if @argv; |
| } |
| } |
| |
| sub reap_worker { # awaitpid CB |
| my ($pid, $nr) = @_; |
| delete $WORKERS{$nr}; |
| if (($? >> 8) == 66) { # EX_NOINPUT |
| undef $in; |
| } elsif ($?) { |
| warn "worker[$nr] died \$?=$?\n"; |
| } |
| PublicInbox::DS::requeue(\&start_workers) if $in; |
| } |
| |
| sub start_worker ($) { |
| my ($nr) = @_; |
| my $pid = eval { PublicInbox::DS::fork_persist } // return(warn($@)); |
| if ($pid == 0) { |
| undef %WORKERS; |
| $SIG{TTIN} = $SIG{TTOU} = 'IGNORE'; |
| $SIG{CHLD} = 'DEFAULT'; # Xapian may use this |
| recv_loop(); |
| exit(0); |
| } else { |
| $WORKERS{$nr} = $pid; |
| awaitpid($pid, \&reap_worker, $nr); |
| } |
| } |
| |
| sub start_workers { |
| for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) { |
| start_worker($nr) if $in; |
| } |
| } |
| |
| sub do_sigttou { |
| if ($in && $nworker > 1) { |
| --$nworker; |
| my @nr = grep { $_ >= $nworker } keys %WORKERS; |
| kill('TERM', @WORKERS{@nr}); |
| } |
| } |
| |
| sub reopen_logs { |
| my $p = $ENV{STDOUT_PATH}; |
| defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1); |
| $p = $ENV{STDERR_PATH}; |
| defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1); |
| } |
| |
| sub parent_reopen_logs { |
| reopen_logs(); |
| kill('USR1', values %WORKERS); |
| } |
| |
| sub xh_alive { $in || scalar(keys %WORKERS) } |
| |
| sub start (@) { |
| my (@argv) = @_; |
| my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE); |
| unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET'; |
| |
| local (%SRCH, %WORKERS, $SHARD_NFD, $MY_FD_MAX); |
| PublicInbox::Search::load_xapian(); |
| $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or |
| die 'bad args'; |
| local $workerset = POSIX::SigSet->new; |
| $workerset->fillset or die "fillset: $!"; |
| for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) { |
| $workerset->delset($_) or die "delset($_): $!"; |
| } |
| $MY_FD_MAX = PublicInbox::Search::ulimit_n // |
| die "E: unable to get RLIMIT_NOFILE: $!"; |
| warn "W: RLIMIT_NOFILE=$MY_FD_MAX too low\n" if $MY_FD_MAX < 72; |
| $MY_FD_MAX -= 64; |
| |
| local $nworker = $opt->{j}; |
| return recv_loop() if $nworker == 0; |
| die '-j must be >= 0' if $nworker < 0; |
| for (POSIX::SIGTERM, POSIX::SIGCHLD) { |
| $workerset->delset($_) or die "delset($_): $!"; |
| } |
| my $sig = { |
| TTIN => sub { |
| if ($in) { |
| ++$nworker; |
| PublicInbox::DS::requeue(\&start_workers) |
| } |
| }, |
| TTOU => \&do_sigttou, |
| CHLD => \&PublicInbox::DS::enqueue_reap, |
| USR1 => \&parent_reopen_logs, |
| }; |
| PublicInbox::DS::block_signals(); |
| start_workers(); |
| @PublicInbox::DS::post_loop_do = \&xh_alive; |
| PublicInbox::DS::event_loop($sig); |
| } |
| |
| 1; |