Буфер Golang с параллельными читателями

Я хочу создать буфер в Go, который поддерживает несколько одновременных операций чтения и одну запись. Все, что записывается в буфер, должно быть прочитано всеми читателями. Новые считыватели могут подключаться в любое время, а это означает, что уже записанные данные должны быть воспроизведены для поздних считывателей.

Буфер должен удовлетворять следующему интерфейсу:

type MyBuffer interface {
    Write(p []byte) (n int, err error)
    NextReader() io.Reader
}

Есть ли у вас какие-либо предложения по такой реализации, предпочтительно с использованием встроенных типов?


person Tympanix    schedule 01.06.2017    source источник
comment
Engineering.linkedin.com/distributed-systems/ kafka.apache.org/intro   -  person dm03514    schedule 01.06.2017
comment
Ничто в стандартной библиотеке этого не сделает. Однако вы можете использовать пользовательскую структуру, построенную вокруг каналов, где каждый читатель повторяет то, что он читает, обратно в канал, чтобы другие читатели могли это прочитать. Проблема заключается в том, чтобы определить, где находится предел. Вы хотите иметь возможность воспроизводить старые данные для поздних считывателей, но это означает сохранение всех этих данных навсегда (ну, на весь срок службы программы), поскольку вы никогда не знаете, когда присоединится новый считыватель. Это большой риск утечки памяти.   -  person Kaedys    schedule 01.06.2017
comment
Если сохранение и воспроизведение старых данных менее важно, это обеспечивает надежную реализацию системы «один вещатель — несколько получателей»: rogpeppe.wordpress.com/2009/12/01/   -  person Kaedys    schedule 01.06.2017
comment
github.com/djherbis/bufit   -  person wim    schedule 09.10.2020


Ответы (3)


В зависимости от характера этого средства записи и того, как вы его используете, хранить все в памяти (чтобы иметь возможность воспроизвести все для читателей, присоединившихся позже) очень рискованно и может потребовать много памяти или привести к сбою вашего приложения из-за недостаточно памяти.

Использование его для регистратора с низким трафиком, хранящего все в памяти, вероятно, нормально, но, например, для потоковой передачи аудио или видео, скорее всего, нет.

Если приведенные ниже реализации считывателя прочитают все данные, которые были записаны в буфер, их метод Read() сообщит io.EOF правильно. Следует соблюдать осторожность, так как некоторые конструкции (например, bufio.Scanner) могут не считывать больше данных после io.EOF встречается (но это не недостаток нашей реализации).

Если вы хотите, чтобы считыватели нашего буфера ждали, если в буфере больше нет доступных данных, чтобы дождаться записи новых данных вместо возврата io.EOF, вы можете обернуть возвращенные считыватели в «хвостовой считыватель», представленный здесь: Go: tail -f-подобный генератор.

Реализация файлов с "безопасной памятью"

Вот очень простое и элегантное решение. Он использует файл для записи, а также использует файлы для чтения. Синхронизация в основном обеспечивается операционной системой. Это не приводит к ошибке нехватки памяти, поскольку данные хранятся исключительно на диске. В зависимости от характера вашего писателя этого может быть или не быть достаточно.

Я скорее буду использовать следующий интерфейс, потому что Close() важен в случае файлов.

type MyBuf interface {
    io.WriteCloser
    NewReader() (io.ReadCloser, error)
}

А реализация предельно проста:

type mybuf struct {
    *os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
    f, err := os.Open(mb.Name())
    if err != nil {
        return nil, err
    }
    return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
    f, err := os.Create(name)
    if err != nil {
        return nil, err
    }
    return &mybuf{File: f}, nil
}

Наш тип mybuf встраивает *os.File, поэтому мы получаем методы Write() и Close() для "бесплатно".

NewReader() просто открывает существующий резервный файл для чтения (в режиме только для чтения) и возвращает его, снова пользуясь преимуществами реализации io.ReadCloser.

Создание нового значения MyBuf реализовано в функции NewMyBuf(), которая также может возвращать error в случае сбоя создания файла.

Примечания:

Обратите внимание, что поскольку mybuf включает *os.File, с помощью утверждения типа можно "достучаться" до других экспортируемых методы os.File, даже если они не являются частью интерфейса MyBuf. Я не считаю это недостатком, но если вы хотите запретить это, вам нужно изменить реализацию mybuf, чтобы не встраивать os.File, а иметь его как именованное поле (но тогда вы должны сами добавить методы Write() и Close(), правильно перенаправить в поле os.File).

Реализация в памяти

Если файловой реализации недостаточно, здесь используется in-memory реализация.

Поскольку теперь мы работаем только в памяти, мы будем использовать следующий интерфейс:

type MyBuf interface {
    io.Writer
    NewReader() io.Reader
}

Идея состоит в том, чтобы хранить все байтовые срезы, которые когда-либо передавались в наш буфер. Читатели предоставят сохраненные слайсы при вызове Read(), каждый читатель будет отслеживать, сколько сохраненных слайсов было обслужено его методом Read(). С синхронизацией нужно разобраться, мы будем использовать простой sync.RWMutex.

Без лишних слов, вот реализация:

type mybuf struct {
    data [][]byte
    sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Cannot retain p, so we must copy it:
    p2 := make([]byte, len(p))
    copy(p2, p)
    mb.Lock()
    mb.data = append(mb.data, p2)
    mb.Unlock()
    return len(p), nil
}

type mybufReader struct {
    mb   *mybuf // buffer we read from
    i    int    // next slice index
    data []byte // current data slice to serve
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Do we have data to send?
    if len(mbr.data) == 0 {
        mb := mbr.mb
        mb.RLock()
        if mbr.i < len(mb.data) {
            mbr.data = mb.data[mbr.i]
            mbr.i++
        }
        mb.RUnlock()
    }
    if len(mbr.data) == 0 {
        return 0, io.EOF
    }

    n = copy(p, mbr.data)
    mbr.data = mbr.data[n:]
    return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
    return &mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
    return &mybuf{}
}

Обратите внимание, что общий контракт Writer.Write() включает в себя то, что реализация не должна сохранять переданный фрагмент, поэтому мы должны сделать его копию, прежде чем «сохранить».

Также обратите внимание, что Read() считывателей пытаются заблокироваться на минимальное время. То есть он блокируется только в том случае, если нам нужен новый фрагмент данных из буфера, и блокирует только чтение, то есть, если у читателя есть частичный фрагмент данных, он отправит его в Read() без блокировки и касания буфера.

person icza    schedule 02.06.2017
comment
Большое тебе спасибо. Элегантное решение. Ваше предложение с io.WriteCloser — хорошая идея. В идеале метод чтения Read() будет ждать дополнительных данных, пока средство записи не будет закрыто, после чего они получат io.EOF. Что касается потребления памяти, можно запустить буфер в памяти и сбросить данные на диск, как только будет превышен определенный размер. Я воспользуюсь вашим предложением, чтобы добиться этого. Спасибо еще раз. - person Tympanix; 03.06.2017

Я связался с журналом фиксации только добавления, потому что он очень похож на ваши требования. Я довольно новичок в распределенных системах и журнале коммитов, поэтому, возможно, я разделываю пару концепций, но введение в кафку ясно объясняет все с помощью хороших диаграмм.

Go также довольно новичок для меня, поэтому я уверен, что есть лучший способ сделать это:

Но, возможно, вы могли бы смоделировать свой буфер как срез, я думаю, в нескольких случаях:

  • у буфера нет читателей, в буфер записываются новые данные, длина буфера увеличивается
  • буфер имеет один/много читателей:

    • reader subscribes to buffer
    • буфер создает и возвращает канал этому клиенту
    • буфер поддерживает список клиентских каналов
    • происходит запись -> перебирает все клиентские каналы и публикует в них (pub sub)

Это относится к потребительскому потоку pubsub в реальном времени, в котором сообщения разветвляются, но не затрагивает обратную засыпку.

Kafka позволяет выполнять обратную засыпку, и их введение иллюстрирует, как это можно сделать :)

Это смещение контролируется потребителем: обычно потребитель линейно увеличивает свое смещение по мере чтения записей, но на самом деле, поскольку позиция контролируется потребителем, он может потреблять записи в любом порядке, который ему нравится. Например, потребитель может вернуться к более старому смещению, чтобы повторно обработать данные из прошлого, или перейти к самой последней записи и начать потребление «сейчас».

Такое сочетание функций означает, что потребители Kafka очень дешевы — они могут приходить и уходить без особого влияния на кластер или других потребителей. Например, вы можете использовать наши инструменты командной строки, чтобы отслеживать содержимое любой темы, не меняя того, что потребляется любыми существующими потребителями.

person dm03514    schedule 01.06.2017

Мне пришлось сделать что-то подобное в рамках эксперимента, поэтому делюсь:

type MultiReaderBuffer struct {
    mu  sync.RWMutex
    buf []byte
}

func (b *MultiReaderBuffer) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    b.mu.Lock()
    b.buf = append(b.buf, p...)
    b.mu.Unlock()
    return len(p), nil
}

func (b *MultiReaderBuffer) NewReader() io.Reader {
    return &mrbReader{mrb: b}
}

type mrbReader struct {
    mrb *MultiReaderBuffer
    off int
}

func (r *mrbReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    r.mrb.mu.RLock()
    n = copy(p, r.mrb.buf[r.off:])
    r.mrb.mu.RUnlock()
    if n == 0 {
        return 0, io.EOF
    }
    r.off += n
    return n, nil
}
person Rodolfo Carvalho    schedule 28.02.2018