inotifywait с очередью FIFO в Bash

Я написал небольшой скрипт Bash, который использует инструменты inotify и интерфейс inotify. Моя проблема в том, что одна из команд в этой функции может заблокировать выполнение до его завершения. Таким образом, функция застревает.

Чтобы решить эту проблему, я хотел бы поставить в очередь обнаруженные файлы (по событию закрытия) и прочитать очередь из другой функции. Кто-нибудь знает, как это сделать в Bash?

Следующие переменные представляют собой простые строки для поиска каталогов или присвоения имен файлам.

inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE
do
    NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap"
    logger -i "$FILE was just closed"
    # cp "$FILE" "$DATA/$CAP/$ENV/$NAME"
    rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log
    RESULT=$?
    if [ $RESULT -eq 0 ] ; then
        logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT"
    else
        logger -i "Fail:    $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT"
    fi

    rm "$FILE"
    RESULT=$?
    if [ $RESULT -eq 0 ] ; then
        logger -i "Success: deletion successfull for $FILE, code $RESULT"
    else
        logger -i "Fail:    deletion failed for $FILE on SSD, code $RESULT"
    fi

    do_something()
    logger -i "$NAME was handled"
    # for stdout
    echo "`date`: Moved file" 
done

Я копирую файлы на том SAN, который иногда имеет разное время ответа. Вот почему эта функция может зависнуть на некоторое время. Я заменил cp на Rsync, потому что мне нужна статистика пропускной способности. Cp (из coreutils), по-видимому, этого не делает.


person wishi    schedule 30.03.2015    source источник


Ответы (2)


Пара идей:

1) Вы можете использовать именованный канал в качестве очереди ограниченного размера:

mkfifo pipe

your_message_source | while read MSG
do
  #collect files in a pipe 
  echo "$MSG" >> pipe
done &

while read MSG 
do
 #Do your blocking work here
done < pipe

Это будет блокироваться на echo "$MSG" >> pipe, когда буфер канала заполнится (вы можете получить размер этого буфера с помощью ulimit -p (умножить на 512). В некоторых случаях этого может быть достаточно.

2) Вы можете использовать файл в качестве очереди сообщений и блокировать его при каждой операции:

 #Feeder part
    your_message_source | while read MSG     
       do
            (
            flock 9
            echo "$MSG" >> file_based_queue 
            ) 9> file_based_queue 
       done &

   # Worker part
   while :
   do 
    #Lock shared queue and cut'n'paste it's content to the worker's private queue
    (
      flock 9
      cp file_based_queue workers_queue
      truncate -s0 file_based_queue   
    ) 9> file_based_queue

    #process private queue
    while read MSG 
    do
     #Do your blocking work here   
    done < workers_queue 
   done

Вы блокируете inotifywait только в том случае, если вы находитесь в рабочем цикле в подоболочке (flock ...) 9>file_based_queue и после команды flock одновременно. У вас могут быть очереди на RAMdisk (/dev/shm), чтобы свести к минимуму время, которое вы там проводите, чтобы не пропустить события FS.

3) Или вы можете использовать некоторый интерфейс bash (или выполнять сценарии на языках, которые имеют интерфейс) для очередей сообщений, поддерживаемых базой данных, или для очереди сообщений SysV.

person PSkocik    schedule 30.03.2015

Это пример использования файла в качестве очереди FIFO, имеет неограниченный размер, сохраняется при перезагрузке системы и допускает несколько операций чтения и записи.

#!/bin/bash

# manages a FIFO queue on a system file.
#  every message is a line of text.
#  supports multiple writers and multiple readers.
#
# Requires: bash, inotify-tools: /usr/bin/inotifywait,
# ed, util-linux: /usr/bin/flock

set -e

# Retrieves one element
# param:
#  pipe_name
# writes to stdout:
#  element_string
# returns:
#  true on succes, false on error or end of data
_pipe_pull() {
    local pipe="${1}"
    local msg pid

_pipe_pop() {
    local fd1
    ( if ! flock --timeout 1 --exclusive ${fd1}; then
        echo "Error: _pipe_pop can't get a lock." >&2
        return 1
    fi
        [ ! -s "${pipe}" ] || \
            ed -s "${pipe}" <<< $'1p\n1d\nw'
    ) {fd1}< "${pipe}"
    :
}

    msg=""
    while [ -z "${msg}" ]; do
        if [ ! -s "${pipe}" ]; then
            inotifywait -e modify "${pipe}" > /dev/null 2>&1 &
            pid="${!}"
            wait "${pid}" || return 1
        fi

        msg="$(_pipe_pop)" || \
            return 1

        if [ "${msg}" = $'\x04' ]; then
            echo "_pipe_pull: end of data." >&2
            return 1
        fi
    done
    printf '%s\n' "${msg}"
    :
}

# Adds multiple elements at once
# param:
#  pipe_name elem_1 ... elem_N
# returns:
#  true on succes, false on error
_pipe_push() {
    local pipe="${1}"
    local fd1
    shift

    ( if ! flock --timeout 10 --exclusive ${fd1}; then
        echo "Error: _pipe_push can't get a lock." >&2
        return 1
    fi
        printf '%s\n' "${@}" >> "${pipe}"
    ) {fd1}< "${pipe}"
}

pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)"

# submit first reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
    printf 'Reader 1:%s\n' "${msg}"
done &

# submit second reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
    printf 'Reader 2:%s\n' "${msg}"
done &

# submit first writer process
for i in {1..10}; do
    _pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}"
done &
pid1="${!}"

# submit second writer process
for i in {11..20}; do
    _pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}"
done &
pid2="${!}"

# submit third writer process
for i in {21..30}; do
    _pipe_push "${pipe_file_1}" "${i}"
done &
pid3="${!}"

# waiting for the end of writer processes
wait ${pid1} ${pid2} ${pid3}

# signal end of data to two readers
_pipe_push "${pipe_file_1}" $'\x04' $'\x04'

# waiting for the end of reader processes
wait

# cleaning
rm -vf "${pipe_file_1}"
:
person Quatro por Quatro    schedule 07.06.2016