Родитель не ждет завершения дочерних процессов, несмотря на сбор урожая

Я полностью осознаю, что есть тонны статей, объясняющих внутреннюю работу динамики процессов родитель-потомок. Я прошел через них и заставил свои вещи работать почти так, как я хочу. Но есть одна вещь, которая меня беспокоит, и я не могу ее понять, несмотря на многочисленные попытки.

Проблема: несмотря на сбор дочерних элементов, main не ждет, пока закончатся все дочерние элементы, и преждевременно завершает работу. Я считаю, что сделал правильный выход из дочернего процесса, и я установил REAPER в дочерний процесс - так как же основной выход до завершения дочернего процесса?

Не ищу здесь решения, но мне нужно новое направление, в котором я мог бы биться головой всю следующую неделю. На данный момент я чувствую, что исчерпал свои возможности и много чего перепробовал, но безрезультатно.

Некоторые сведения о том, чего я пытаюсь достичь:

В общем - хочу, чтобы все дети кончили, и только потом хочу приступить к чему-то дальше. Каждый дочерний процесс порождает группу потоков, и эти потоки должным образом соединяются указанным дочерним процессом, который затем переходит к выходу с exit(0).

Дополнительная шумиха, которую вы можете наблюдать в программе, — это не что иное, как наше требование, согласно которому мы должны использовать 5 API (движков), но только с фиксированным размером партии, скажем, 10 для каждого за раз. Я запускаю дочерний процесс для каждого движка и запускаю поток для каждого запроса, а затем жду завершения всех потоков, присоединяюсь к ним, и только после этого дочерний процесс завершается. Только теперь я могу отправить следующую партию запросов на тот же движок, и я делаю это для всех движков, пока не исчерпаю все свои запросы, скажем, 10000.

Каждый запрос может занять от 1 секунды до 2 часов — в основном это отчеты в формате CSV, получаемые из HTTP API.

Моя проблема заключается в том, что когда я исчерпал свой общий набор запросов, я не могу ждать, чтобы ГЛАВНЫЙ дождался завершения всех дочерних процессов. Это странно, и это проблема, которую я пытаюсь решить.

Любые идеи?

Вывод моей программы:

[compuser@lenovoe470:little-stuff]$  perl 07--20190526-batch-processing-using-threads-with-busy-pool-detection-2.pl 12
26710: STARTING TASKS IN BATCHES
26710: RUNNING batch_engine 1_e1 tasks (1 2)
26710: RUNNING batch_engine 2_e2 tasks (3 4)
26710: RUNNING batch_engine 3_e3 tasks (5 6 7)
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710: BUSY_ENGINE: e3.
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710:26712: TASK_ORCHESTRATOR: >> finished batch_engine (2_e2) tasks (3 4)
26710: PID (26712) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e3.
26710:26713: TASK_ORCHESTRATOR: >> finished batch_engine (3_e3) tasks (5 6 7)
26710:26711: TASK_ORCHESTRATOR: >> finished batch_engine (1_e1) tasks (1 2)
26710: PID (26713) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e1.
26710: PID (26711) has finished with status (0). updating proc hash
26710: RUNNING batch_engine 4_e2 tasks (8 9)
26710: RUNNING batch_engine 5_e3 tasks (10 11 12)
26710: FINISHED TASKS IN BATCHES
[compuser@lenovoe470:little-stuff]$  1:26722: TASK_ORCHESTRATOR: >> finished batch_engine (5_e3) tasks (10 11 12)
1:26721: TASK_ORCHESTRATOR: >> finished batch_engine (4_e2) tasks (8 9)

Вывод выше:

  • Запуск batch_engine означает, что я запускаю пакет пронумерованных задач.
  • BUSY_ENGINE означает, что конечная точка/движок занята, так как она уже занята обработкой максимального размера пакета запросов. Мне нужно подождать.
  • Finished batch_engine означает, что дочерний процесс завершил обработку данного пакета запросов для определенного механизма/конечной точки. Он завершает работу, и main обнаруживает, что текущий движок теперь свободен и можно поставить в очередь следующую партию.
  • если мы видим последние 2 строки, очевидно, что выходные данные дочерних процессов вышли за пределы, и основной завершился преждевременно, не дожидаясь работающих дочерних процессов. Почему? любая помощь?

Моя программа:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);


STDOUT->autoflush(1);


# doesn't work
  sub reaper {
    my $reaped;
    while (($reaped = waitpid (-1,&WNOHANG) > 0)) {
      print "$$: reaped: $reaped\n";
      sleep(1);
    }
    $SIG{CHLD} = \&reaper;
  }
# doesn't work


my @total_tasks = (1 .. shift || 9);
my @engines = (qw/e1 e2 e3/);
my $sizes = { e1 => 2, e2 => 2, e3 => 3, };

my $proc_hash;
my $global_string = "ENGINE";

# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
  sub REAPER {
    local ($!, $?);
    while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
      if ( WIFEXITED($?) ) 
      {
        # my
        my $ret_code = WEXITSTATUS($?);
        print "$$: PID ($reaped_pid) has finished with status ($ret_code). updating proc hash\n";
        my $engine_name = $proc_hash->{$reaped_pid};
        delete ($proc_hash->{$reaped_pid});
        delete ($proc_hash->{$engine_name});
        # my

        # original
        #my $ret_code = WEXITSTATUS($?);
        #print "child process:$pid exit with code:$ret_code\n";
        # original
      }
    }
  }
#

$SIG{CHLD} = \&REAPER;

sub random_sleep_time {
  return (int(rand(5)+1))
  #return (sprintf "%.2f",(rand(1)+1))
}

sub task_runner {
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  STDOUT->autoflush(1);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  threads->exit(0);
  #print "$$:".(threads->tid()).": TASK_RUNNER: $global_string ($batch_engine) task ($task) finished in $task_time seconds\n";
  #return;
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my $engine = (split (/_/,$batch_engine))[1];
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) { 
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    my $ppid = getppid();
    foreach my $tid (@tids) {$tid->join()}
    print "$ppid:$$: TASK_ORCHESTRATOR: >> finished batch_engine ($batch_engine) tasks (@tasks)\n";
    exit (0);
  }
}

sub update_proc_hash {
  my $finished_pid = waitpid (-1, POSIX->WNOHANG);
  if ($finished_pid > 0) {
    print "$$: PID ($finished_pid) has finished. updating proc hash\n";
    my $engine_name = $proc_hash->{$finished_pid};
    delete ($proc_hash->{$finished_pid});
    delete ($proc_hash->{$engine_name});
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks) {
  foreach my $engine (@engines) {
    update_proc_hash();
    if (exists $proc_hash->{$engine}) {
      print "$$: BUSY_ENGINE: $engine.\n";
      sleep (1);
      next;
    }
    else {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0) {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks) {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: RUNNING batch_engine $batch_engine tasks (@engine_tasks)\n";
        task_orchestrator ("$batch_engine",@engine_tasks);
        $batch++;
      }
    }
  }
}

REAPER();

print "$$: FINISHED TASKS IN BATCHES\n";

__END__

Обновление через 3 дня: Спасибо сообществу SO. Еще раз, я благодарен всем вам, кто потратил свое время, чтобы разобраться в этом и помог обнаружить и исправить проблему. Большое спасибо.

Позвольте мне поделиться новым выводом с окончательной программой для всеобщего ознакомления.

ВЫВОД после использования исправления:

User@Host:/cygdrive/c/bash-home> perl test.pl
22044: STARTING TASKS IN BATCHES
22044: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
22044: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
22044: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
41456: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (1.80) seconds
41456: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (1.31) seconds
41456: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (41456) has finished with status (0).
18252: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (1.04) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (1.91) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #5 in (1.63) seconds
18252: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (18252) has finished with status (0).
14544: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (1.42) seconds
14544: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.84) seconds
14544: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (14544) has finished with status (0).
22044: MAIN: engine (e1) is RUNNING batch #4 tasks: (8 9)
22044: MAIN: engine (e2) is RUNNING batch #5 tasks: (10)
37612: TASK_RUNNER: engine (e1) finished batch #4 task #8 in (1.19) seconds
37612: TASK_RUNNER: engine (e1) finished batch #4 task #9 in (1.31) seconds
37612: TASK_ORCHESTRATOR: engine (e1) finished batch #4 tasks in (1.00) seconds.
16300: TASK_RUNNER: engine (e2) finished batch #5 task #10 in (1.53) seconds
16300: TASK_ORCHESTRATOR: engine (e2) finished batch #5 tasks in (1.00) seconds.
22044: ALL ORCHESTRATORS HAVE FINISHED
22044: FINISHED TASKS IN BATCHES

ЗАКЛЮЧИТЕЛЬНАЯ ПРОГРАММА РАБОТЫ:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use threads;

STDOUT->autoflush(1);

my @total_tasks = (1 .. 10);
my $sleep_time = 1;
my @engines = (qw/e1 e2 e3/);
my $sizes = {
  e1 => 2,
  e2 => 3,
  e3 => 2,
};

my $proc_hash;
my $global_string = "engine";

sub REAPER {
  local ($!, $?);
  while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
    if ( WIFEXITED($?) ) {
      my $ret_code = WEXITSTATUS($?);
      print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
      my $engine_name = $proc_hash->{$reaped_pid};
      delete ($proc_hash->{$reaped_pid});
      delete ($proc_hash->{$engine_name});
    }
  }
}

$SIG{CHLD} = \&REAPER;

sub random_sleep_time { return sprintf ("%.2f",(rand ($sleep_time||5) + 1)) }

sub task_runner {
  STDOUT->autoflush(1);
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
  threads->exit(0);
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    my $start_time = time;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) {
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    foreach my $tid (@tids) {$tid->join()}
    my $end_time = time;
    my $total_time = sprintf ("%.2f",($end_time - $start_time));
    print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($total_time) seconds.\n";
    exit (0);
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
{
  foreach my $engine (@engines)
  {
    if (exists $proc_hash->{$engine})
    {
      sleep (1);
      next;
    }
    else
    {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0)
      {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks)
      {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
        task_orchestrator ($batch_engine,@engine_tasks);
        $batch++;
      }
    }
  }
}

# All 3 below work properly
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);

print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__

person User9102d82    schedule 27.05.2019    source источник


Ответы (2)


waitpid

может вернуть 0, если есть дочерние процессы, соответствующие PID, но ни один из них еще не завершился

С -1 это относится к любому дочернему процессу, поэтому ваш код с несколькими дочерними процессами наверняка столкнется с нулевым возвратом из неблокирующего waitpid в REAPER; именно так мы можем ждать, пока есть незавершенные дочерние процессы. Но ваш цикл while выходит сначала таким нулем.

Один из способов сделать это — провести опрос на предмет неотрицательных результатов.

use warnings;
use strict;
use feature 'say';

use POSIX ':sys_wait_h';
use Time::HiRes qw(sleep) ;

for (1..4) { 
    my $pid = fork // die "Can't fork: $!";
    if ($pid == 0) { 
        sleep rand 4;  
        say "\tkid $$ exiting"; 
        exit;
    };  
}; 

while ( (my $kid = waitpid -1, WNOHANG) > -1 ) { 
    say "got $kid" if $kid > 0;
    sleep 0.2;
}

Отпечатки

        kid 12687 exiting
got 12687
        kid 12690 exiting
got 12690
        kid 12689 exiting
got 12689
        kid 12688 exiting
got 12688

Пожалуйста, скорректируйте период опроса по своему усмотрению. Обратите внимание, что поскольку это перехватывает любой дочерний процесс, он может помешать другим форкам, если к этому моменту были какие-то нежданные.

Или вы можете заблокировать с ожиданием

while ( (my $kid = waitpid -1, 0) > -1 ) { 
    say "got $kid";
}

где теперь вы также можете сделать > 0, так как здесь не будет возврата 0, поскольку вызов блокируется. В то время как нам нужно, чтобы цикл завершился только после того, как -1 вернется (нет больше процессов), как и раньше.

Основное отличие состоит в том, что блок выполняется только после фактического выхода дочернего процесса, поэтому, если вам нужно следить за тем, что делают некоторые долго работающие дочерние процессы (и, возможно, ограничивать время их выполнения или защищать от зависших заданий), это не так просто. в этой форме; вам нужна неблокирующая операция для этого.

Обратите внимание, что некоторые детали, в частности касающиеся возврата, могут различаться в разных системах.


Наивная версия этого состоит в том, чтобы ждать только этих конкретных PID, собранных по мере того, как вы fork

foreach my $pid (@pids) {
    my $gone = waitpid $pid, 0;
    say "Process $gone exited with $?" if $gone > 0;  # -1 if reaped already
}

который блокируется с waitpid для каждого процесса. Проблема в том, что если один процесс работает намного дольше других (или зависает), этот цикл застрянет в ожидании. И вообще, мы бы предпочли, чтобы дочерние процессы пожинались по мере их выхода, а не в том порядке, в котором они были запущены.

person zdim    schedule 27.05.2019
comment
@ User9102d82 Отредактировал объяснение для ясности (надеюсь!). Добавлен более простой способ сделать это. - person zdim; 28.05.2019
comment
Почему бы просто не использовать wait() или waitpid(-1) без опции WNOHANG? Весь смысл неблокирующих ожиданий заключается в том, чтобы иметь дело с ситуациями, когда вы не хотите делать паузу и ждать, пока что-то завершится, вы просто хотите очистить те, которые уже сделали это. - person TFBW; 28.05.2019
comment
@TFBW Конечно, это тоже можно сделать. Неблокирующий позволяет делать другие вещи между проверками (возможно, проверять, как/что делают эти процессы), что может быть полезно при длительном ожидании. (Я мог бы добавить больше обсуждения) - person zdim; 28.05.2019
comment
@ User9102d82 Добавлен еще один способ - person zdim; 29.05.2019
comment
Спасибо @zdim, это сработало. Я пропустил непроверку больше -1, и это было ключевой проблемой. Я выбираю ваше объяснение в качестве ответа, поскольку оно прямо попадает в точку и является более подробным и проработанным. - person User9102d82; 30.05.2019

При выходе из основного цикла вы вызываете REAPER(), который выполняет неблокирующую функцию waitpid(). Неблокирующий. Нет. И это не блокировка. Так что выход.

Пока я здесь, я отмечаю, что ваша функция update_proc_hash() не зацикливается, как другие вещи, которые делают waitpid(), поэтому она не перехватывает все, что могла. Сделайте себе одолжение и аккуратно разложите все эти вещи.

person TFBW    schedule 27.05.2019
comment
Но разве REAPER не выполняет цикл while? Разве это не должно задерживать главное в момент исполнения? Цикл пытается выполнить пока собранные PID больше 0 - заснуть на 1 секунду? Что вы думаете. - person User9102d82; 28.05.2019
comment
@User9102d82 User9102d82 Да, цикл while будет делать это - пока он продолжает работать, то есть. Но он завершается, когда waitpid возвращает 0, и вы получаете, когда там есть дочерние процессы (что на самом деле именно то, что вы хотите). Так что проблема не в неблокирующей операции, а в условии > 0 в цикле while. - person zdim; 28.05.2019
comment
@ User9102d82 REAPER выполняет цикл while для waitpid() с параметром WNOHANG. Эта опция предотвращает фактическое ожидание функции waitpid(): она собирает дочерние процессы, которые уже завершены, возвращает pid умершего процесса, а затем возвращает ноль, если есть еще процессы, которые еще не завершились, или -1, если их нет. больше дочерних процессов. В конце вы хотите выполнить блокирующее ожидание (а не неблокирующее), чтобы не выйти, пока все дочерние элементы не будут выполнены. - person TFBW; 28.05.2019
comment
@TFBW Все правильно, но, как говорится, цикл while продолжает крутиться. Он продолжает возвращаться, чтобы проверить то, что вы описываете, снова и снова. (Там должен быть sleep!) Неблокирующий характер не является проблемой - до тех пор, пока он не завершается при возврате 0 (см. мой ответ). - person zdim; 28.05.2019
comment
@TFBW: Спасибо за ответ на мой вопрос, и я ценю, что вы нашли время, чтобы ответить на него. - person User9102d82; 30.05.2019