@@ -4,11 +4,12 @@ import (
44 "context"
55 "fmt"
66 "maps"
7- "sync "
7+ "slices "
88 "testing"
99 "time"
1010
1111 "github.com/NethermindEth/juno/consensus/p2p/buffered"
12+ "github.com/NethermindEth/juno/consensus/p2p/config"
1213 "github.com/NethermindEth/juno/p2p/pubsub/testutils"
1314 "github.com/NethermindEth/juno/utils"
1415 pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -22,17 +23,22 @@ import (
2223)
2324
2425const (
25- chainID = "1"
26- protocolID = "test-buffered-topic-subscription-protocol"
27- topicName = "test-buffered-topic-subscription-topic"
28- nodeCount = 20
29- messageCount = 100
30- logLevel = zapcore .ErrorLevel
31- retryInterval = 1 * time .Second
26+ chainID = "1"
27+ protocolID = "test-buffered-topic-subscription-protocol"
28+ topicName = "test-buffered-topic-subscription-topic"
29+ nodeCount = 20
30+ messageCount = 100
31+ logLevel = zapcore .InfoLevel
32+ maxWait = 5 * time .Second
3233)
3334
3435type TestMessage = consensus.ConsensusStreamId
3536
37+ type origin struct {
38+ Source int
39+ Index int
40+ }
41+
3642func TestBufferedTopicSubscriptionAndProtoBroadcaster (t * testing.T ) {
3743 t .Run (fmt .Sprintf ("%d nodes, each sending %d messages" , nodeCount , messageCount ), func (t * testing.T ) {
3844 logger , err := utils .NewZapLogger (utils .NewLogLevel (logLevel ), true )
@@ -42,7 +48,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
4248 topics := nodes .JoinTopic (t , chainID , protocolID , topicName )
4349
4450 messages := make ([][]* TestMessage , nodeCount )
45- allMessages := make (map [string ]struct {} )
51+ allMessages := make (map [string ]origin )
4652
4753 for i := range messages {
4854 messages [i ] = make ([]* TestMessage , messageCount )
@@ -53,46 +59,65 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
5359 msgBytes , err := proto .Marshal (msg )
5460 require .NoError (t , err )
5561
56- allMessages [string (msgBytes )] = struct {}{ }
62+ allMessages [string (msgBytes )] = origin { Source : i , Index : j }
5763 }
5864 }
5965
60- iterator := iter.Iterator [* pubsub.Topic ]{MaxGoroutines : len ( topics ) }
61- wg := sync. WaitGroup {}
62- wg . Add ( len ( messages ) )
66+ iterator := iter.Iterator [* pubsub.Topic ]{MaxGoroutines : nodeCount }
67+ finished := make ( chan struct {}, nodeCount )
68+ liveness := make ( chan struct {}, 1 )
6369
6470 go func () {
6571 iterator .ForEachIdx (topics , func (i int , destination * * pubsub.Topic ) {
6672 logger := & utils.ZapLogger {SugaredLogger : logger .Named (fmt .Sprintf ("destination-%d" , i ))}
6773 pending := maps .Clone (allMessages )
74+
75+ // Ignore the messages we are broadcasting
76+ for _ , message := range messages [i ] {
77+ msgBytes , err := proto .Marshal (message )
78+ require .NoError (t , err )
79+ delete (pending , string (msgBytes ))
80+ }
81+
6882 subscription := buffered .NewTopicSubscription (logger , nodeCount * messageCount , func (ctx context.Context , msg * pubsub.Message ) {
69- if len (pending ) == 0 {
83+ msgStr := string (msg .Message .Data )
84+ if _ , ok := pending [msgStr ]; ! ok {
7085 return
7186 }
7287
73- delete (pending , string (msg .Message .Data ))
88+ select {
89+ case liveness <- struct {}{}:
90+ default :
91+ }
92+
93+ delete (pending , msgStr )
94+
7495 if len (pending ) == 0 {
75- wg . Done ()
96+ finished <- struct {}{}
7697 logger .Info ("all messages received" )
7798 }
7899 logger .Debugw ("received" , "message" , string (msg .Message .Data ), "pending" , len (pending ))
79100 })
80101
81102 subscription .Loop (t .Context (), * destination )
103+ if len (pending ) > 0 {
104+ logger .Infow ("missing messages" , "pending" , slices .Collect (maps .Values (pending )))
105+ }
82106 })
83107 }()
84108
85109 go func () {
86- time .Sleep (1 * time .Second )
87110 iterator .ForEachIdx (topics , func (i int , source * * pubsub.Topic ) {
88111 logger := & utils.ZapLogger {SugaredLogger : logger .Named (fmt .Sprintf ("source-%d" , i ))}
112+ rebroadcastInterval := config .DefaultBufferSizes .RebroadcastInterval
113+
89114 var rebroadcastStrategy buffered.RebroadcastStrategy [* TestMessage ]
90115 if i % 2 == 0 {
91- rebroadcastStrategy = buffered .NewRebroadcastStrategy (retryInterval , func (msg * TestMessage ) uint64 {
116+ rebroadcastStrategy = buffered .NewRebroadcastStrategy (rebroadcastInterval , func (msg * TestMessage ) uint64 {
92117 return msg .BlockNumber
93118 })
94119 }
95- broadcaster := buffered .NewProtoBroadcaster (logger , messageCount , retryInterval , rebroadcastStrategy )
120+ broadcaster := buffered .NewProtoBroadcaster (logger , messageCount , rebroadcastInterval , rebroadcastStrategy )
96121 go broadcaster .Loop (t .Context (), * source )
97122 for _ , message := range messages [i ] {
98123 logger .Debugw ("publishing" , "message" , message )
@@ -101,7 +126,9 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
101126 })
102127 }()
103128
104- wg .Wait ()
129+ for range nodeCount {
130+ wait (t , liveness , finished )
131+ }
105132 })
106133
107134 t .Run ("canceled context" , func (t * testing.T ) {
@@ -128,6 +155,21 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
128155 })
129156}
130157
158+ func wait (t * testing.T , liveness , finished chan struct {}) {
159+ t .Helper ()
160+ for {
161+ select {
162+ case <- finished :
163+ return
164+ case <- liveness :
165+ continue
166+ case <- time .After (maxWait ):
167+ require .FailNow (t , "liveness check failed" )
168+ return
169+ }
170+ }
171+ }
172+
131173func getTestMessage (node , messageIndex int ) * TestMessage {
132174 return & TestMessage {
133175 Nonce : uint64 (node * messageCount + messageIndex ),
0 commit comments