null

Сказ о perlthr и watchdog или куда в perl потоки пропадают

Пришлось мне в последнее время прибегнуть к одной, довольно редко используемой фиче perl -- многопоточности. Дело в том, что потребовалось написать приложение, которое одновременно выполняет работу в нескольких местах, периодически синхронизируясь. А также, за этими потоками наблюдает ещё один мониторный поток. Но так уж случилось, что code review выполнял человек, который умеет программировать лишь на sh и sed, поэтому баги полезли только в продакшене. В данной статье хотелось бы как раз затронуть этот момент подробнее.
Проблема проектирования/понимания возникает тогда, когда программист сталкивается с функцией join().
Сразу рассмотрим вопрос на примере простенького кода: 
#!/usr/bin/perl
# made by: KorG

use strict;
use v5.18;
use warnings;
no warnings 'experimental';
use utf8;
binmode STDOUT, ':utf8';
$| = 1;

use threads;
use threads::shared;

my @arr :shared;
my $lock :shared = 0;
my $start_time = time;

printf "Program started with %d threads\n", 0+threads->list();

# CREATE THREADS
my @T = 1..4;
my %T; $T{$_} = threads->create(
   sub {
      lock $lock;
      printf(
         "[%d,%d,%d,%d]",
         $_,
         0+threads->list(),
         0+threads->list(threads::running),
         0+threads->list(threads::joinable),
      ) while sleep $_, $a++ != 2;
      threads->exit();
   }
) for @T;

# JOIN THREADS
$T{$_}->join() for @T;

printf "\nProgram stopped in %d seconds\n", time - $start_time;

Сначала мы создаем массив с именами функций, которые будут запускаться в потоках. Затем, в хеш по данным именам помещаем ссылки на объекты потоков. Такой код достаточно изящен, поскольку позволяет обращаться к потокам по именам, а также уменьшить количество китайского кода, если требуются разные тела функций потоков:

my @T = qw;t1 t2 t3 t4;;
# ...
sub t1 { print "[t1]" while(sleep 1); }
sub t2 { print "[t2]" while(sleep 1); }

Проблема крайне проста: видно, что в секции "JOIN THREADS" для всех потоков вызывается функция join(). И человек, программирующий на Си ожидает, что код будет работать следующим образом: последовательно вызовет блокирующий join() для каждого потока и продолжит выполнение. При этом, количество потоков, возвращаемое threads->list(), ожидаемо, никак не изменится -- потоки-то работают. Но не тут то было. В perl функция join() работает "несколько иначе": сначала уменьшает количество потоков во внутреннем списке интерпретатора. Поэтому, если один из потоков -- это импровизированный watchdog, программа будет завершаться:

sub watchdog {
   sleep(60); # initial sleep
   print "Started watchdog thread";
   for(;;sleep(15)){
      exit(0x13) if 5 != threads->list(threads::running);
   }
}
В этом примере, на случай вопросов, "5" -- это количество вызовов функции create().
Если же немного переделать политику вызова join(), то ситуация изменится в лучшую сторону:
while(sleep 1, threads->list() > 0){
   $_->join() for threads->list(threads::joinable);
}

Теперь наш watchdog будет работать корректно.

Вообще, мониторить состояние потоков, на мой взгляд, разумнее как-нибудь так:

my $count = eval join "+", map {$T{$_}->is_running()} @T;

А на самом деле, как и сообщает официальная документация, потоками в perl лучше вообще не пользоваться, поскольку они в некоторых местах демонстриуют неожиданное поведение. *(но хотя бы параллельные, в отличие от, например, потоков в python).

korg

 

Коротко о себе

Работаю в компании Tune-IT, администрирую инфраструктуру компании и вычислительную сеть кафедры Вычислительной ТехникиСПбНИУ ИТМО.

Интересы: администрирование UNIX и UNIX-like систем и активного сетевого оборудования, написание shell- и perl-скриптов, изучение технологий глобальных сетей.
Люблю собирать GNU/Linux и FreeBSD, использовать тайлинговые оконные менеджеры и писать системный софт.