Skip to content

Commit

Permalink
Use ReadyBlockChannels from Decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene Kim committed Dec 18, 2018
1 parent ce34a60 commit 3b010da
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions pkg/impl/libraptorq/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libraptorq
import (
"errors"
"github.com/harmony-one/go-raptorq/pkg/impl/libraptorq/swig"
"github.com/harmony-one/go-raptorq/pkg/internal/readyblockchan"
"github.com/harmony-one/go-raptorq/pkg/raptorq"
"runtime"
)
Expand All @@ -19,6 +20,8 @@ func (*DecoderFactory) New(commonOTI uint64, schemeSpecificOTI uint32) (
dec.wrapped = wrapped
dec.commonOTI = commonOTI
dec.schemeSpecificOTI = schemeSpecificOTI
dec.rbcs.Reset(dec.NumSourceBlocks())
go dec.readyBlocksLoop()
decoder = dec
runtime.SetFinalizer(decoder, finalizeDecoder)
} else {
Expand All @@ -36,6 +39,38 @@ type Decoder struct {
wrapped swig.BytesDecoder
commonOTI uint64
schemeSpecificOTI uint32
rbcs readyblockchan.ReadyBlockChannels
}

// Decoder destroy sequence:
//
// 1. Decoder loses all references
// 2. GC kicks in
// 3. finalizeDecoder() gets called
// 4. Wrapped decoder is deleted (via Close())
// 5. Decoder dtor waits for thread pool to drain.
// 6. Pending wait_threads() calls see that the object is being deleted and
// return Error::EXITING via their promise/future pairs.
// 7. Decoder dtor now returns. Close() and finalizeDecoder() return in turn.
// 8. The future in a WaitForBlock() call (made from readyBlocksLoop()) returns
// Error::EXITING.
// 9. readyBlocksLoop() sees Error::EXITING and breaks out of loop.
//
// Note that by the time readyBlocksLoop() sees Error::EXITING,
// the “wrapped” field has already been reset as nil.

func (dec *Decoder) readyBlocksLoop() {
for {
var sbn uint8
var e swig.RaptorQ__v1Error
swig.WaitForBlock(dec.wrapped, &sbn, &e)
switch e {
case swig.Error_NONE:
dec.rbcs.AddBlock(sbn)
case swig.Error_EXITING:
break
}
}
}

func (dec *Decoder) CommonOTI() uint64 {
Expand Down Expand Up @@ -106,9 +141,18 @@ func (dec *Decoder) FreeSourceBlock(sbn uint8) {
dec.wrapped.Free(sbn)
}

func (dec *Decoder) AddReadyBlockChan(ch chan<- uint8) (err error) {
return dec.rbcs.AddChannel(ch)
}

func (dec *Decoder) RemoveReadyBlockChan(ch chan<- uint8) (err error) {
return dec.rbcs.RemoveChannel(ch)
}

func (dec *Decoder) Close() (err error) {
switch wrapped := dec.wrapped.(type) {
case swig.BytesDecoder:
dec.rbcs.Reset(dec.NumSourceBlocks())
dec.wrapped = nil
swig.DeleteBytesDecoder(wrapped)
default:
Expand Down

0 comments on commit 3b010da

Please sign in to comment.