44 "context"
55 "fmt"
66 "maps"
7- "sync"
87 "testing"
98 "time"
109
@@ -26,9 +25,10 @@ const (
2625 protocolID = "test-buffered-topic-subscription-protocol"
2726 topicName = "test-buffered-topic-subscription-topic"
2827 nodeCount = 20
29- messageCount = 100
30- logLevel = zapcore .ErrorLevel
28+ messageCount = 50
29+ logLevel = zapcore .InfoLevel
3130 retryInterval = 1 * time .Second
31+ maxWait = 5 * time .Second
3232)
3333
3434type TestMessage = consensus.ConsensusStreamId
@@ -57,22 +57,29 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
5757 }
5858 }
5959
60- iterator := iter.Iterator [* pubsub.Topic ]{MaxGoroutines : len ( topics ) }
61- wg := sync. WaitGroup {}
62- wg . Add ( len ( messages ) )
60+ iterator := iter.Iterator [* pubsub.Topic ]{MaxGoroutines : nodeCount }
61+ finished := make ( chan struct {}, nodeCount )
62+ liveness := make ( chan struct {}, 1 )
6363
6464 go func () {
6565 iterator .ForEachIdx (topics , func (i int , destination * * pubsub.Topic ) {
6666 logger := & utils.ZapLogger {SugaredLogger : logger .Named (fmt .Sprintf ("destination-%d" , i ))}
6767 pending := maps .Clone (allMessages )
6868 subscription := buffered .NewTopicSubscription (logger , nodeCount * messageCount , func (ctx context.Context , msg * pubsub.Message ) {
69- if len (pending ) == 0 {
69+ msgStr := string (msg .Message .Data )
70+ if _ , ok := pending [msgStr ]; ! ok {
7071 return
7172 }
7273
73- delete (pending , string (msg .Message .Data ))
74+ select {
75+ case liveness <- struct {}{}:
76+ default :
77+ }
78+
79+ delete (pending , msgStr )
80+
7481 if len (pending ) == 0 {
75- wg . Done ()
82+ finished <- struct {}{}
7683 logger .Info ("all messages received" )
7784 }
7885 logger .Debugw ("received" , "message" , string (msg .Message .Data ), "pending" , len (pending ))
@@ -83,7 +90,6 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
8390 }()
8491
8592 go func () {
86- time .Sleep (1 * time .Second )
8793 iterator .ForEachIdx (topics , func (i int , source * * pubsub.Topic ) {
8894 logger := & utils.ZapLogger {SugaredLogger : logger .Named (fmt .Sprintf ("source-%d" , i ))}
8995 var rebroadcastStrategy buffered.RebroadcastStrategy [* TestMessage ]
@@ -101,7 +107,9 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
101107 })
102108 }()
103109
104- wg .Wait ()
110+ for range nodeCount {
111+ wait (t , liveness , finished )
112+ }
105113 })
106114
107115 t .Run ("canceled context" , func (t * testing.T ) {
@@ -128,6 +136,21 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
128136 })
129137}
130138
139+ func wait (t * testing.T , liveness , finished chan struct {}) {
140+ t .Helper ()
141+ for {
142+ select {
143+ case <- finished :
144+ return
145+ case <- liveness :
146+ continue
147+ case <- time .After (maxWait ):
148+ require .FailNow (t , "liveness check failed" )
149+ return
150+ }
151+ }
152+ }
153+
131154func getTestMessage (node , messageIndex int ) * TestMessage {
132155 return & TestMessage {
133156 Nonce : uint64 (node * messageCount + messageIndex ),
0 commit comments