В зависимости от характера этого средства записи и того, как вы его используете, хранить все в памяти (чтобы иметь возможность воспроизвести все для читателей, присоединившихся позже) очень рискованно и может потребовать много памяти или привести к сбою вашего приложения из-за недостаточно памяти.
Использование его для регистратора с низким трафиком, хранящего все в памяти, вероятно, нормально, но, например, для потоковой передачи аудио или видео, скорее всего, нет.
Если приведенные ниже реализации считывателя прочитают все данные, которые были записаны в буфер, их метод Read()
сообщит io.EOF
правильно. Следует соблюдать осторожность, так как некоторые конструкции (например, bufio.Scanner
) могут не считывать больше данных после io.EOF
встречается (но это не недостаток нашей реализации).
Если вы хотите, чтобы считыватели нашего буфера ждали, если в буфере больше нет доступных данных, чтобы дождаться записи новых данных вместо возврата io.EOF
, вы можете обернуть возвращенные считыватели в «хвостовой считыватель», представленный здесь: Go: tail -f-подобный генератор.
Реализация файлов с "безопасной памятью"
Вот очень простое и элегантное решение. Он использует файл для записи, а также использует файлы для чтения. Синхронизация в основном обеспечивается операционной системой. Это не приводит к ошибке нехватки памяти, поскольку данные хранятся исключительно на диске. В зависимости от характера вашего писателя этого может быть или не быть достаточно.
Я скорее буду использовать следующий интерфейс, потому что Close()
важен в случае файлов.
type MyBuf interface {
io.WriteCloser
NewReader() (io.ReadCloser, error)
}
А реализация предельно проста:
type mybuf struct {
*os.File
}
func (mb *mybuf) NewReader() (io.ReadCloser, error) {
f, err := os.Open(mb.Name())
if err != nil {
return nil, err
}
return f, nil
}
func NewMyBuf(name string) (MyBuf, error) {
f, err := os.Create(name)
if err != nil {
return nil, err
}
return &mybuf{File: f}, nil
}
Наш тип mybuf
встраивает *os.File
, поэтому мы получаем методы Write()
и Close()
для "бесплатно".
NewReader()
просто открывает существующий резервный файл для чтения (в режиме только для чтения) и возвращает его, снова пользуясь преимуществами реализации io.ReadCloser
.
Создание нового значения MyBuf
реализовано в функции NewMyBuf()
, которая также может возвращать error
в случае сбоя создания файла.
Примечания:
Обратите внимание, что поскольку mybuf
включает *os.File
, с помощью утверждения типа можно "достучаться" до других экспортируемых методы os.File
, даже если они не являются частью интерфейса MyBuf
. Я не считаю это недостатком, но если вы хотите запретить это, вам нужно изменить реализацию mybuf
, чтобы не встраивать os.File
, а иметь его как именованное поле (но тогда вы должны сами добавить методы Write()
и Close()
, правильно перенаправить в поле os.File
).
Реализация в памяти
Если файловой реализации недостаточно, здесь используется in-memory реализация.
Поскольку теперь мы работаем только в памяти, мы будем использовать следующий интерфейс:
type MyBuf interface {
io.Writer
NewReader() io.Reader
}
Идея состоит в том, чтобы хранить все байтовые срезы, которые когда-либо передавались в наш буфер. Читатели предоставят сохраненные слайсы при вызове Read()
, каждый читатель будет отслеживать, сколько сохраненных слайсов было обслужено его методом Read()
. С синхронизацией нужно разобраться, мы будем использовать простой sync.RWMutex
.
Без лишних слов, вот реализация:
type mybuf struct {
data [][]byte
sync.RWMutex
}
func (mb *mybuf) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
// Cannot retain p, so we must copy it:
p2 := make([]byte, len(p))
copy(p2, p)
mb.Lock()
mb.data = append(mb.data, p2)
mb.Unlock()
return len(p), nil
}
type mybufReader struct {
mb *mybuf // buffer we read from
i int // next slice index
data []byte // current data slice to serve
}
func (mbr *mybufReader) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
// Do we have data to send?
if len(mbr.data) == 0 {
mb := mbr.mb
mb.RLock()
if mbr.i < len(mb.data) {
mbr.data = mb.data[mbr.i]
mbr.i++
}
mb.RUnlock()
}
if len(mbr.data) == 0 {
return 0, io.EOF
}
n = copy(p, mbr.data)
mbr.data = mbr.data[n:]
return n, nil
}
func (mb *mybuf) NewReader() io.Reader {
return &mybufReader{mb: mb}
}
func NewMyBuf() MyBuf {
return &mybuf{}
}
Обратите внимание, что общий контракт Writer.Write()
включает в себя то, что реализация не должна сохранять переданный фрагмент, поэтому мы должны сделать его копию, прежде чем «сохранить».
Также обратите внимание, что Read()
считывателей пытаются заблокироваться на минимальное время. То есть он блокируется только в том случае, если нам нужен новый фрагмент данных из буфера, и блокирует только чтение, то есть, если у читателя есть частичный фрагмент данных, он отправит его в Read()
без блокировки и касания буфера.
person
icza
schedule
02.06.2017