Skip to content

Commit b8118a4

Browse files
committed
feat: Integrate Kademlia DHT protocol to pubsub
1 parent df59f7b commit b8118a4

5 files changed

Lines changed: 86 additions & 104 deletions

File tree

consensus/consensus.go

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package consensus
22

33
import (
4-
"context"
54
"errors"
6-
"fmt"
7-
"strings"
85

96
"github.com/NethermindEth/juno/blockchain"
107
"github.com/NethermindEth/juno/builder"
@@ -26,7 +23,6 @@ import (
2623
"github.com/libp2p/go-libp2p/core/crypto"
2724
"github.com/libp2p/go-libp2p/core/host"
2825
"github.com/libp2p/go-libp2p/core/peer"
29-
"github.com/sourcegraph/conc/pool"
3026
)
3127

3228
type ConsensusServices struct {
@@ -47,6 +43,7 @@ func Init(
4743
timeoutFn driver.TimeoutFn,
4844
hostAddress string,
4945
hostPrivateKey crypto.PrivKey,
46+
bootstrapPeersFn func() []peer.AddrInfo,
5047
) (ConsensusServices, error) {
5148
chainHeight, err := blockchain.Height()
5249
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
@@ -82,7 +79,7 @@ func Init(
8279
return ConsensusServices{}, err
8380
}
8481

85-
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes)
82+
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes, bootstrapPeersFn)
8683

8784
commitListener := driver.NewCommitListener(logger, &proposalStore, proposer, p2p)
8885
driver := driver.New(logger, tendermintDB, stateMachine, commitListener, p2p, timeoutFn)
@@ -96,31 +93,6 @@ func Init(
9693
}, nil
9794
}
9895

99-
func Connect(ctx context.Context, host host.Host, peers string) error {
100-
if peers == "" {
101-
return nil
102-
}
103-
104-
pool := pool.New().WithErrors().WithFirstError()
105-
106-
for peerStr := range strings.SplitSeq(peers, ",") {
107-
pool.Go(func() error {
108-
peerAddr, err := peer.AddrInfoFromString(peerStr)
109-
if err != nil {
110-
return fmt.Errorf("unable to parse peer address %q: %w", peerStr, err)
111-
}
112-
113-
if err := host.Connect(ctx, *peerAddr); err != nil {
114-
return fmt.Errorf("unable to connect to %q: %w", peerStr, err)
115-
}
116-
117-
return nil
118-
})
119-
}
120-
121-
return pool.Wait()
122-
}
123-
12496
func toValue(value *felt.Felt) starknet.Value {
12597
return starknet.Value(*value)
12698
}

consensus/integtest/integ_test.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package integtest
22

33
import (
44
"fmt"
5+
"maps"
6+
"slices"
7+
gosync "sync"
58
"testing"
6-
"time"
79

810
"github.com/NethermindEth/juno/blockchain"
911
"github.com/NethermindEth/juno/consensus"
@@ -16,7 +18,7 @@ import (
1618
"github.com/NethermindEth/juno/sync"
1719
"github.com/NethermindEth/juno/utils"
1820
"github.com/NethermindEth/juno/vm"
19-
"github.com/libp2p/go-libp2p/core/host"
21+
"github.com/libp2p/go-libp2p/core/peer"
2022
"github.com/sourcegraph/conc"
2123
"github.com/sourcegraph/conc/iter"
2224
"github.com/stretchr/testify/assert"
@@ -27,7 +29,6 @@ import (
2729
const (
2830
commitBufferSize = 1024
2931
maxLag = 10
30-
gst = 2 * time.Second // 2 Heartbeats should be enough for the nodes to connect to its peers
3132
hostAddress = "/ip4/0.0.0.0/tcp/0"
3233
)
3334

@@ -78,18 +79,15 @@ func loadGenesis(t *testing.T, log *utils.ZapLogger) (core.StateDiff, map[felt.F
7879

7980
func initNode(
8081
t *testing.T,
81-
gstChan chan struct{},
8282
index int,
8383
logger *utils.ZapLogger,
8484
commits chan commit,
8585
cfg *testConfig,
8686
genesisDiff core.StateDiff,
8787
genesisClasses map[felt.Felt]core.Class,
88-
) host.Host {
88+
ready chan struct{},
89+
) networkNodeConfig {
8990
t.Helper()
90-
defer func() {
91-
logger.Debug("finished initNode")
92-
}()
9391

9492
mockServices := consensus.InitMockServices(0, 0, index, cfg.nodeCount)
9593

@@ -98,6 +96,12 @@ func initNode(
9896
bc := getBlockchain(t, genesisDiff, genesisClasses)
9997
vm := vm.New(false, logger)
10098

99+
adjacentNodes := make(map[int]peer.AddrInfo)
100+
bootstrapPeers := gosync.OnceValue(func() []peer.AddrInfo {
101+
<-ready
102+
return slices.Collect(maps.Values(adjacentNodes))
103+
})
104+
101105
services, err := consensus.Init(
102106
logger,
103107
consensusDB,
@@ -108,6 +112,7 @@ func initNode(
108112
mockServices.TimeoutFn,
109113
hostAddress,
110114
mockServices.PrivateKey,
115+
bootstrapPeers,
111116
)
112117
require.NoError(t, err)
113118

@@ -119,15 +124,17 @@ func initNode(
119124
require.NoError(t, services.P2P.Run(t.Context()))
120125
})
121126
wg.Go(func() {
122-
<-gstChan
123127
require.NoError(t, services.Driver.Run(t.Context()))
124128
})
125129
wg.Go(func() {
126130
writeBlock(t, index, bc, services.CommitListener, commits)
127131
})
128132
t.Cleanup(wg.Wait)
129133

130-
return services.Host
134+
return networkNodeConfig{
135+
host: services.Host,
136+
adjacentNodes: adjacentNodes,
137+
}
131138
}
132139

133140
func writeBlock(
@@ -233,20 +240,16 @@ func runTest(t *testing.T, cfg testConfig) {
233240

234241
commits := make(chan commit, commitBufferSize)
235242

236-
gstChan := make(chan struct{})
243+
ready := make(chan struct{})
237244

238245
hosts := make(networkConfig, honestNodeCount)
239246
iterator := iter.Iterator[networkNodeConfig]{MaxGoroutines: honestNodeCount}
240247
iterator.ForEachIdx(hosts, func(i int, nodeConfig *networkNodeConfig) {
241-
*nodeConfig = networkNodeConfig{
242-
host: initNode(t, gstChan, i, logger, commits, &cfg, genesisDiff, genesisClasses),
243-
adjacentNodes: make(map[int]struct{}),
244-
}
248+
*nodeConfig = initNode(t, i, logger, commits, &cfg, genesisDiff, genesisClasses, ready)
245249
})
246-
hosts.setup(t, cfg.networkSetup)
247250

248-
time.Sleep(gst)
249-
close(gstChan)
251+
cfg.networkSetup(t, hosts)
252+
close(ready)
250253

251254
assertCommits(t, commits, cfg, logger)
252255
}

consensus/integtest/p2p.go

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
11
package integtest
22

33
import (
4-
"fmt"
54
"math/rand"
6-
"slices"
7-
"strings"
85
"testing"
96

10-
"github.com/NethermindEth/juno/consensus"
117
"github.com/libp2p/go-libp2p/core/host"
12-
"github.com/multiformats/go-multiaddr"
13-
"github.com/sourcegraph/conc/iter"
8+
"github.com/libp2p/go-libp2p/core/peer"
149
"github.com/stretchr/testify/require"
1510
)
1611

1712
type networkNodeConfig struct {
1813
host host.Host
19-
adjacentNodes map[int]struct{}
14+
adjacentNodes map[int]peer.AddrInfo
2015
}
2116

2217
func (c networkNodeConfig) isConnected(to int) bool {
@@ -26,35 +21,8 @@ func (c networkNodeConfig) isConnected(to int) bool {
2621

2722
type networkConfig []networkNodeConfig
2823

29-
func (c networkConfig) setup(t *testing.T, configFn networkConfigFn) {
30-
t.Helper()
31-
32-
configFn(t, c)
33-
34-
iter.Iterator[networkNodeConfig]{MaxGoroutines: len(c)}.ForEach(c, func(thisNodeConfig *networkNodeConfig) {
35-
peers := make([]string, 0, len(thisNodeConfig.adjacentNodes))
36-
37-
for otherNode := range thisNodeConfig.adjacentNodes {
38-
otherNodeHost := c[otherNode].host
39-
40-
localAddrIndex := slices.IndexFunc(otherNodeHost.Addrs(), func(addr multiaddr.Multiaddr) bool {
41-
return addr[0].Value() == "127.0.0.1"
42-
})
43-
require.GreaterOrEqual(t, localAddrIndex, 0)
44-
localAddr := otherNodeHost.Addrs()[localAddrIndex]
45-
46-
peerAddr := fmt.Sprintf("%s/p2p/%s", localAddr, otherNodeHost.ID())
47-
peers = append(peers, peerAddr)
48-
}
49-
50-
if len(peers) > 0 {
51-
require.NoError(t, consensus.Connect(t.Context(), thisNodeConfig.host, strings.Join(peers, ",")))
52-
}
53-
})
54-
}
55-
5624
func (c networkConfig) add(from, to int) {
57-
c[from].adjacentNodes[to] = struct{}{}
25+
c[from].adjacentNodes[to] = peer.AddrInfo{ID: c[to].host.ID(), Addrs: c[to].host.Addrs()}
5826
}
5927

6028
//nolint:gosec // The whole package is for testing purpose only, so it's safe to use weak random.

consensus/p2p/config/config.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,15 @@ package config
22

33
import "time"
44

5+
type TopicBufferSizes struct {
6+
// Inbound
7+
Subscription int
8+
Output int
9+
// Outbound
10+
ProtoBroadcaster int
11+
}
12+
513
type BufferSizes struct {
6-
ProposalSubscription int
714
VoteSubscription int
815
ProposalDemux int
916
ProposalCommitNotifier int
@@ -13,19 +20,20 @@ type BufferSizes struct {
1320
PrecommitOutput int
1421
ProposalProtoBroadcaster int
1522
VoteProtoBroadcaster int
23+
PubSubQueueSize int
1624
RetryInterval time.Duration
1725
}
1826

1927
var DefaultBufferSizes = BufferSizes{
20-
ProposalSubscription: 1024,
2128
VoteSubscription: 1024,
2229
ProposalDemux: 1024,
23-
ProposalCommitNotifier: 32,
24-
ProposalSingleStreamInput: 32,
25-
ProposalOutputs: 32,
30+
ProposalCommitNotifier: 1024,
31+
ProposalSingleStreamInput: 1024,
32+
ProposalOutputs: 1024,
2633
PrevoteOutput: 1024,
2734
PrecommitOutput: 1024,
2835
ProposalProtoBroadcaster: 1024,
2936
VoteProtoBroadcaster: 1024,
37+
PubSubQueueSize: 1024,
3038
RetryInterval: 1 * time.Second,
3139
}

consensus/p2p/p2p.go

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,23 @@ import (
1515
"github.com/NethermindEth/juno/core/address"
1616
"github.com/NethermindEth/juno/service"
1717
"github.com/NethermindEth/juno/utils"
18+
dht "github.com/libp2p/go-libp2p-kad-dht"
1819
pubsub "github.com/libp2p/go-libp2p-pubsub"
1920
"github.com/libp2p/go-libp2p/core/host"
21+
"github.com/libp2p/go-libp2p/core/peer"
22+
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
2023
"github.com/sourcegraph/conc"
2124
)
2225

2326
type topicName string
2427

2528
const (
26-
proposalTopicName topicName = "consensus_proposals"
27-
voteTopicName topicName = "consensus_votes"
28-
gossipSubHistory = 60
29+
protocolPrefix = "starknet"
30+
chainID = "1" // TODO: Make this configurable
31+
consensusProtocolID = "consensus"
32+
proposalTopicName topicName = "consensus_proposals"
33+
voteTopicName topicName = "consensus_votes"
34+
gossipSubHistory = 60
2935
)
3036

3137
type P2P[V types.Hashable[H], H types.Hash, A types.Addr] interface {
@@ -36,12 +42,14 @@ type P2P[V types.Hashable[H], H types.Hash, A types.Addr] interface {
3642
}
3743

3844
type p2p[V types.Hashable[H], H types.Hash, A types.Addr] struct {
39-
host host.Host
40-
log utils.Logger
41-
commitNotifier chan types.Height
42-
broadcasters Broadcasters[V, H, A]
43-
listeners Listeners[V, H, A]
44-
topicAttachment map[topicName][]attachedToTopic
45+
host host.Host
46+
log utils.Logger
47+
commitNotifier chan types.Height
48+
broadcasters Broadcasters[V, H, A]
49+
listeners Listeners[V, H, A]
50+
topicAttachment map[topicName][]attachedToTopic
51+
pubSubQueueSize int
52+
bootstrapPeersFn func() []peer.AddrInfo
4553
}
4654

4755
type attachedToTopic interface {
@@ -55,6 +63,7 @@ func New(
5563
proposalStore *proposal.ProposalStore[starknet.Hash],
5664
currentHeight types.Height,
5765
bufferSizeConfig *config.BufferSizes,
66+
bootstrapPeersFn func() []peer.AddrInfo,
5867
) P2P[starknet.Value, starknet.Hash, address.Address] {
5968
commitNotifier := make(chan types.Height, bufferSizeConfig.ProposalCommitNotifier)
6069

@@ -104,12 +113,14 @@ func New(
104113
}
105114

106115
return &p2p[starknet.Value, starknet.Hash, address.Address]{
107-
host: host,
108-
log: log,
109-
commitNotifier: commitNotifier,
110-
broadcasters: broadcasters,
111-
listeners: listeners,
112-
topicAttachment: topicAttachment,
116+
host: host,
117+
log: log,
118+
commitNotifier: commitNotifier,
119+
broadcasters: broadcasters,
120+
listeners: listeners,
121+
topicAttachment: topicAttachment,
122+
pubSubQueueSize: bufferSizeConfig.PubSubQueueSize,
123+
bootstrapPeersFn: bootstrapPeersFn,
113124
}
114125
}
115126

@@ -122,7 +133,27 @@ func (p *p2p[V, H, A]) getGossipSubOptions() pubsub.Option {
122133
}
123134

124135
func (p *p2p[V, H, A]) Run(ctx context.Context) error {
125-
gossipSub, err := pubsub.NewGossipSub(ctx, p.host, p.getGossipSubOptions())
136+
dht, err := dht.New(
137+
ctx,
138+
p.host,
139+
dht.ProtocolPrefix("/"+protocolPrefix),
140+
dht.ProtocolExtension("/"+chainID),
141+
dht.ProtocolExtension("/"+consensusProtocolID),
142+
dht.BootstrapPeersFunc(p.bootstrapPeersFn),
143+
dht.Mode(dht.ModeServer),
144+
)
145+
if err != nil {
146+
return fmt.Errorf("unable to create dht with error: %w", err)
147+
}
148+
149+
gossipSub, err := pubsub.NewGossipSub(
150+
ctx,
151+
p.host,
152+
p.getGossipSubOptions(),
153+
pubsub.WithPeerOutboundQueueSize(p.pubSubQueueSize),
154+
pubsub.WithValidateQueueSize(p.pubSubQueueSize),
155+
pubsub.WithDiscovery(routing.NewRoutingDiscovery(dht)),
156+
)
126157
if err != nil {
127158
return fmt.Errorf("unable to create gossipsub with error: %w", err)
128159
}

0 commit comments

Comments
 (0)