broadcast.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package broadcast
  2. type Broadcast struct {
  3. listeners []chan interface{}
  4. reg chan (chan interface{})
  5. unreg chan (chan interface{})
  6. in chan interface{}
  7. stop chan int64
  8. stopStatus bool
  9. }
  10. func NewBroadcast() *Broadcast {
  11. b := &Broadcast{
  12. listeners: make([]chan interface{}, 0),
  13. reg: make(chan (chan interface{})),
  14. unreg: make(chan (chan interface{})),
  15. in: make(chan interface{}),
  16. stop: make(chan int64),
  17. stopStatus: false,
  18. }
  19. go func() {
  20. for {
  21. select {
  22. case l := <-b.unreg:
  23. // remove L from b.listeners
  24. // this operation is slow: O(n) but not used frequently
  25. // unlike iterating over listeners
  26. oldListeners := b.listeners
  27. b.listeners = make([]chan interface{}, 0, len(oldListeners))
  28. for _, oldL := range oldListeners {
  29. if l != oldL {
  30. b.listeners = append(b.listeners, oldL)
  31. }
  32. }
  33. case l := <-b.reg:
  34. b.listeners = append(b.listeners, l)
  35. case item := <-b.in:
  36. for _, l := range b.listeners {
  37. l <- item
  38. }
  39. case _ = <-b.stop:
  40. b.stopStatus = true
  41. break
  42. }
  43. }
  44. }()
  45. return b
  46. }
  47. func (b *Broadcast) In() chan interface{} {
  48. return b.in
  49. }
  50. func (b *Broadcast) Reg() chan interface{} {
  51. listener := make(chan interface{})
  52. b.reg <- listener
  53. return listener
  54. }
  55. func (b *Broadcast) UnReg(listener chan interface{}) {
  56. b.unreg <- listener
  57. }
  58. func (b *Broadcast) Close() {
  59. if b.stopStatus == false {
  60. b.stop <- 1
  61. }
  62. }