fix: channel reader blocks until EOF or an error was received

This commit is contained in:
Kyren223
2024-10-15 18:17:46 +03:00
parent 77dca0e1df
commit c5dba4360a
2 changed files with 30 additions and 3 deletions

View File

@@ -5,6 +5,8 @@ import (
"io"
)
const bufferSize = 512
type ChannelReader struct {
Out <-chan []byte
Err <-chan error
@@ -17,6 +19,7 @@ func NewChannelReader(ctx context.Context, reader io.Reader) ChannelReader {
go func(in chan<- []byte, inErr chan<- error) {
defer close(in)
defer close(inErr)
data := make([]byte, bufferSize)
outer:
for {
select {
@@ -24,12 +27,12 @@ func NewChannelReader(ctx context.Context, reader io.Reader) ChannelReader {
inErr <- ctx.Err()
break outer
default:
data, err := io.ReadAll(reader)
if err != nil {
n, err := reader.Read(data)
if err != nil && err != io.EOF {
inErr <- err
break outer
}
in <- data
in <- data[:n]
}
}
}(outCh, errCh)

24
pkg/util/io_test.go Normal file
View File

@@ -0,0 +1,24 @@
package util
import (
"bytes"
"context"
"testing"
"time"
)
func TestChannelReader(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
b := []byte{1, 2, 3, 4, 5, 6, 7, 8}
reader := NewChannelReader(ctx, bytes.NewReader(b))
select {
case data := <-reader.Out:
if !bytes.Equal(b, data) {
t.Errorf("TestChannelReader() %v != %v", b, data)
}
case err := <-reader.Err:
t.Errorf("TestChannelReader() err = %v", err)
}
}