123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575 |
- package reedsolomon
- import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "sync"
- )
- type StreamEncoder interface {
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Encode(data []io.Reader, parity []io.Writer) error
-
-
-
-
-
-
-
-
- Verify(shards []io.Reader) (bool, error)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Reconstruct(valid []io.Reader, fill []io.Writer) error
-
-
-
-
-
-
-
-
-
- Split(data io.Reader, dst []io.Writer, size int64) (err error)
-
-
-
-
-
-
-
- Join(dst io.Writer, shards []io.Reader, outSize int64) error
- }
- type StreamReadError struct {
- Err error
- Stream int
- }
- func (s StreamReadError) Error() string {
- return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
- }
- func (s StreamReadError) String() string {
- return s.Error()
- }
- type StreamWriteError struct {
- Err error
- Stream int
- }
- func (s StreamWriteError) Error() string {
- return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
- }
- func (s StreamWriteError) String() string {
- return s.Error()
- }
- type rsStream struct {
- r *reedSolomon
- bs int
-
- readShards func(dst [][]byte, in []io.Reader) error
-
- writeShards func(out []io.Writer, in [][]byte) error
- creads bool
- cwrites bool
- }
- func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
- enc, err := New(dataShards, parityShards, o...)
- if err != nil {
- return nil, err
- }
- rs := enc.(*reedSolomon)
- r := rsStream{r: rs, bs: 4 << 20}
- r.readShards = readShards
- r.writeShards = writeShards
- return &r, err
- }
- func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
- enc, err := New(dataShards, parityShards, o...)
- if err != nil {
- return nil, err
- }
- rs := enc.(*reedSolomon)
- r := rsStream{r: rs, bs: 4 << 20}
- r.readShards = readShards
- r.writeShards = writeShards
- if conReads {
- r.readShards = cReadShards
- }
- if conWrites {
- r.writeShards = cWriteShards
- }
- return &r, err
- }
- func createSlice(n, length int) [][]byte {
- out := make([][]byte, n)
- for i := range out {
- out[i] = make([]byte, length)
- }
- return out
- }
- func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
- if len(data) != r.r.DataShards {
- return ErrTooFewShards
- }
- if len(parity) != r.r.ParityShards {
- return ErrTooFewShards
- }
- all := createSlice(r.r.Shards, r.bs)
- in := all[:r.r.DataShards]
- out := all[r.r.DataShards:]
- read := 0
- for {
- err := r.readShards(in, data)
- switch err {
- case nil:
- case io.EOF:
- if read == 0 {
- return ErrShardNoData
- }
- return nil
- default:
- return err
- }
- out = trimShards(out, shardSize(in))
- read += shardSize(in)
- err = r.r.Encode(all)
- if err != nil {
- return err
- }
- err = r.writeShards(parity, out)
- if err != nil {
- return err
- }
- }
- }
- func trimShards(in [][]byte, size int) [][]byte {
- for i := range in {
- if in[i] != nil {
- in[i] = in[i][0:size]
- }
- if len(in[i]) < size {
- in[i] = nil
- }
- }
- return in
- }
- func readShards(dst [][]byte, in []io.Reader) error {
- if len(in) != len(dst) {
- panic("internal error: in and dst size does not match")
- }
- size := -1
- for i := range in {
- if in[i] == nil {
- dst[i] = nil
- continue
- }
- n, err := io.ReadFull(in[i], dst[i])
-
-
-
- switch err {
- case io.ErrUnexpectedEOF, io.EOF:
- if size < 0 {
- size = n
- } else if n != size {
-
- return ErrShardSize
- }
- dst[i] = dst[i][0:n]
- case nil:
- continue
- default:
- return StreamReadError{Err: err, Stream: i}
- }
- }
- if size == 0 {
- return io.EOF
- }
- return nil
- }
- func writeShards(out []io.Writer, in [][]byte) error {
- if len(out) != len(in) {
- panic("internal error: in and out size does not match")
- }
- for i := range in {
- if out[i] == nil {
- continue
- }
- n, err := out[i].Write(in[i])
- if err != nil {
- return StreamWriteError{Err: err, Stream: i}
- }
-
- if n != len(in[i]) {
- return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
- }
- }
- return nil
- }
- type readResult struct {
- n int
- size int
- err error
- }
- func cReadShards(dst [][]byte, in []io.Reader) error {
- if len(in) != len(dst) {
- panic("internal error: in and dst size does not match")
- }
- var wg sync.WaitGroup
- wg.Add(len(in))
- res := make(chan readResult, len(in))
- for i := range in {
- if in[i] == nil {
- dst[i] = nil
- wg.Done()
- continue
- }
- go func(i int) {
- defer wg.Done()
- n, err := io.ReadFull(in[i], dst[i])
-
-
-
- res <- readResult{size: n, err: err, n: i}
- }(i)
- }
- wg.Wait()
- close(res)
- size := -1
- for r := range res {
- switch r.err {
- case io.ErrUnexpectedEOF, io.EOF:
- if size < 0 {
- size = r.size
- } else if r.size != size {
-
- return ErrShardSize
- }
- dst[r.n] = dst[r.n][0:r.size]
- case nil:
- default:
- return StreamReadError{Err: r.err, Stream: r.n}
- }
- }
- if size == 0 {
- return io.EOF
- }
- return nil
- }
- func cWriteShards(out []io.Writer, in [][]byte) error {
- if len(out) != len(in) {
- panic("internal error: in and out size does not match")
- }
- var errs = make(chan error, len(out))
- var wg sync.WaitGroup
- wg.Add(len(out))
- for i := range in {
- go func(i int) {
- defer wg.Done()
- if out[i] == nil {
- errs <- nil
- return
- }
- n, err := out[i].Write(in[i])
- if err != nil {
- errs <- StreamWriteError{Err: err, Stream: i}
- return
- }
- if n != len(in[i]) {
- errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
- }
- }(i)
- }
- wg.Wait()
- close(errs)
- for err := range errs {
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (r rsStream) Verify(shards []io.Reader) (bool, error) {
- if len(shards) != r.r.Shards {
- return false, ErrTooFewShards
- }
- read := 0
- all := createSlice(r.r.Shards, r.bs)
- for {
- err := r.readShards(all, shards)
- if err == io.EOF {
- if read == 0 {
- return false, ErrShardNoData
- }
- return true, nil
- }
- if err != nil {
- return false, err
- }
- read += shardSize(all)
- ok, err := r.r.Verify(all)
- if !ok || err != nil {
- return ok, err
- }
- }
- }
- var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
- func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
- if len(valid) != r.r.Shards {
- return ErrTooFewShards
- }
- if len(fill) != r.r.Shards {
- return ErrTooFewShards
- }
- all := createSlice(r.r.Shards, r.bs)
- for i := range valid {
- if valid[i] != nil && fill[i] != nil {
- return ErrReconstructMismatch
- }
- }
- read := 0
- for {
- err := r.readShards(all, valid)
- if err == io.EOF {
- if read == 0 {
- return ErrShardNoData
- }
- return nil
- }
- if err != nil {
- return err
- }
- read += shardSize(all)
- all = trimShards(all, shardSize(all))
- err = r.r.Reconstruct(all)
- if err != nil {
- return err
- }
- err = r.writeShards(fill, all)
- if err != nil {
- return err
- }
- }
- }
- func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
-
- if len(shards) < r.r.DataShards {
- return ErrTooFewShards
- }
-
- shards = shards[:r.r.DataShards]
- for i := range shards {
- if shards[i] == nil {
- return StreamReadError{Err: ErrShardNoData, Stream: i}
- }
- }
-
- src := io.MultiReader(shards...)
-
- n, err := io.CopyN(dst, src, outSize)
- if err == io.EOF {
- return ErrShortData
- }
- if err != nil {
- return err
- }
- if n != outSize {
- return ErrShortData
- }
- return nil
- }
- func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
- if size == 0 {
- return ErrShortData
- }
- if len(dst) != r.r.DataShards {
- return ErrInvShardNum
- }
- for i := range dst {
- if dst[i] == nil {
- return StreamWriteError{Err: ErrShardNoData, Stream: i}
- }
- }
-
- perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
-
- padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
- data = io.MultiReader(data, bytes.NewBuffer(padding))
-
- for i := range dst {
- n, err := io.CopyN(dst[i], data, perShard)
- if err != io.EOF && err != nil {
- return err
- }
- if n != perShard {
- return ErrShortData
- }
- }
- return nil
- }
|