Skip to content

Commit df59f7b

Browse files
committed
feat: Implement consensus data source
1 parent 3ecf606 commit df59f7b

15 files changed

Lines changed: 566 additions & 327 deletions

consensus/consensus.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package consensus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strings"
8+
9+
"github.com/NethermindEth/juno/blockchain"
10+
"github.com/NethermindEth/juno/builder"
11+
consensusDB "github.com/NethermindEth/juno/consensus/db"
12+
"github.com/NethermindEth/juno/consensus/driver"
13+
"github.com/NethermindEth/juno/consensus/p2p"
14+
"github.com/NethermindEth/juno/consensus/p2p/config"
15+
"github.com/NethermindEth/juno/consensus/proposal"
16+
"github.com/NethermindEth/juno/consensus/proposer"
17+
"github.com/NethermindEth/juno/consensus/starknet"
18+
"github.com/NethermindEth/juno/consensus/tendermint"
19+
"github.com/NethermindEth/juno/consensus/types"
20+
"github.com/NethermindEth/juno/consensus/votecounter"
21+
"github.com/NethermindEth/juno/core/felt"
22+
"github.com/NethermindEth/juno/db"
23+
"github.com/NethermindEth/juno/utils"
24+
"github.com/NethermindEth/juno/vm"
25+
"github.com/libp2p/go-libp2p"
26+
"github.com/libp2p/go-libp2p/core/crypto"
27+
"github.com/libp2p/go-libp2p/core/host"
28+
"github.com/libp2p/go-libp2p/core/peer"
29+
"github.com/sourcegraph/conc/pool"
30+
)
31+
32+
type ConsensusServices struct {
33+
Host host.Host
34+
Proposer proposer.Proposer[starknet.Value, starknet.Hash]
35+
P2P p2p.P2P[starknet.Value, starknet.Hash, starknet.Address]
36+
Driver *driver.Driver[starknet.Value, starknet.Hash, starknet.Address]
37+
CommitListener driver.CommitListener[starknet.Value, starknet.Hash, starknet.Address]
38+
}
39+
40+
func Init(
41+
logger *utils.ZapLogger,
42+
database db.KeyValueStore,
43+
blockchain *blockchain.Blockchain,
44+
vm vm.VM,
45+
nodeAddress *starknet.Address,
46+
validators votecounter.Validators[starknet.Address],
47+
timeoutFn driver.TimeoutFn,
48+
hostAddress string,
49+
hostPrivateKey crypto.PrivKey,
50+
) (ConsensusServices, error) {
51+
chainHeight, err := blockchain.Height()
52+
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
53+
return ConsensusServices{}, err
54+
}
55+
currentHeight := types.Height(chainHeight + 1)
56+
57+
tendermintDB := consensusDB.NewTendermintDB[starknet.Value, starknet.Hash, starknet.Address](database, currentHeight)
58+
59+
executor := builder.NewExecutor(blockchain, vm, logger, false, true) // TODO: We're currently skipping signature validation
60+
builder := builder.New(blockchain, executor)
61+
62+
proposalStore := proposal.ProposalStore[starknet.Hash]{}
63+
proposer := proposer.New(logger, &builder, &proposalStore, *nodeAddress, toValue)
64+
stateMachine := tendermint.New(tendermintDB, logger, *nodeAddress, proposer, validators, currentHeight)
65+
66+
host, err := libp2p.New(
67+
libp2p.ListenAddrStrings(hostAddress),
68+
libp2p.Identity(hostPrivateKey),
69+
// libp2p.UserAgent(makeAgentName(version)),
70+
// // Use address factory to add the public address to the list of
71+
// // addresses that the node will advertise.
72+
// libp2p.AddrsFactory(addressFactory),
73+
// If we know the public ip, enable the relay service.
74+
libp2p.EnableRelayService(),
75+
// When listening behind NAT, enable peers to try to poke thought the
76+
// NAT in order to reach the node.
77+
libp2p.EnableHolePunching(),
78+
// Try to open a port in the NAT router to accept incoming connections.
79+
libp2p.NATPortMap(),
80+
)
81+
if err != nil {
82+
return ConsensusServices{}, err
83+
}
84+
85+
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes)
86+
87+
commitListener := driver.NewCommitListener(logger, &proposalStore, proposer, p2p)
88+
driver := driver.New(logger, tendermintDB, stateMachine, commitListener, p2p, timeoutFn)
89+
90+
return ConsensusServices{
91+
Host: host,
92+
Proposer: proposer,
93+
P2P: p2p,
94+
Driver: &driver,
95+
CommitListener: commitListener,
96+
}, nil
97+
}
98+
99+
func Connect(ctx context.Context, host host.Host, peers string) error {
100+
if peers == "" {
101+
return nil
102+
}
103+
104+
pool := pool.New().WithErrors().WithFirstError()
105+
106+
for peerStr := range strings.SplitSeq(peers, ",") {
107+
pool.Go(func() error {
108+
peerAddr, err := peer.AddrInfoFromString(peerStr)
109+
if err != nil {
110+
return fmt.Errorf("unable to parse peer address %q: %w", peerStr, err)
111+
}
112+
113+
if err := host.Connect(ctx, *peerAddr); err != nil {
114+
return fmt.Errorf("unable to connect to %q: %w", peerStr, err)
115+
}
116+
117+
return nil
118+
})
119+
}
120+
121+
return pool.Wait()
122+
}
123+
124+
func toValue(value *felt.Felt) starknet.Value {
125+
return starknet.Value(*value)
126+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package datasource
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
syncmap "sync"
8+
"sync/atomic"
9+
10+
"github.com/NethermindEth/juno/consensus/driver"
11+
"github.com/NethermindEth/juno/consensus/p2p"
12+
"github.com/NethermindEth/juno/consensus/proposer"
13+
"github.com/NethermindEth/juno/consensus/types"
14+
"github.com/NethermindEth/juno/core"
15+
"github.com/NethermindEth/juno/sync"
16+
)
17+
18+
const maxCommitHistory = 1024 // TODO: make this configurable
19+
20+
type consensusDataSource[V types.Hashable[H], H types.Hash, A types.Addr] struct {
21+
commitListener driver.CommitListener[V, H, A]
22+
proposer proposer.Proposer[V, H]
23+
cache syncmap.Map
24+
latest atomic.Uint64
25+
}
26+
27+
func New[V types.Hashable[H], H types.Hash, A types.Addr](
28+
commitListener driver.CommitListener[V, H, A],
29+
proposer proposer.Proposer[V, H],
30+
p2p p2p.P2P[V, H, A],
31+
) *consensusDataSource[V, H, A] {
32+
return &consensusDataSource[V, H, A]{
33+
commitListener: commitListener,
34+
proposer: proposer,
35+
cache: syncmap.Map{},
36+
latest: atomic.Uint64{},
37+
}
38+
}
39+
40+
func (c *consensusDataSource[V, H, A]) Run(ctx context.Context) error {
41+
for {
42+
select {
43+
case <-ctx.Done():
44+
return nil
45+
case committedBlock := <-c.commitListener.Listen():
46+
blockNumber := committedBlock.Block.Number
47+
48+
c.cache.Store(blockNumber, &committedBlock)
49+
c.latest.Store(blockNumber)
50+
c.cache.Delete(blockNumber - maxCommitHistory)
51+
}
52+
}
53+
}
54+
55+
func (c *consensusDataSource[V, H, A]) BlockByNumber(ctx context.Context, blockNumber uint64) (sync.CommittedBlock, error) {
56+
committedBlock, ok := c.cache.Load(blockNumber)
57+
if !ok {
58+
return sync.CommittedBlock{}, errors.New("block not found in cache")
59+
}
60+
61+
return *committedBlock.(*sync.CommittedBlock), nil
62+
}
63+
64+
func (c *consensusDataSource[V, H, A]) BlockLatest(ctx context.Context) (*core.Block, error) {
65+
committedBlock, err := c.BlockByNumber(ctx, c.latest.Load())
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
return committedBlock.Block, nil
71+
}
72+
73+
func (c *consensusDataSource[V, H, A]) BlockPending(ctx context.Context) (sync.Pending, error) {
74+
return sync.Pending{}, errors.New("not implemented") // TODO: Revise this
75+
}
76+
77+
func (c *consensusDataSource[V, H, A]) PreConfirmedBlockByNumber(ctx context.Context, blockNumber uint64) (core.PreConfirmed, error) {
78+
preconfirmed := c.proposer.Preconfirmed()
79+
if preconfirmed.Block.Number != blockNumber {
80+
return core.PreConfirmed{}, fmt.Errorf("block %d is not preconfirmed", blockNumber)
81+
}
82+
return *c.proposer.Preconfirmed(), nil
83+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package driver
2+
3+
import (
4+
"context"
5+
6+
"github.com/NethermindEth/juno/consensus/p2p"
7+
"github.com/NethermindEth/juno/consensus/proposal"
8+
"github.com/NethermindEth/juno/consensus/proposer"
9+
"github.com/NethermindEth/juno/consensus/types"
10+
"github.com/NethermindEth/juno/sync"
11+
"github.com/NethermindEth/juno/utils"
12+
)
13+
14+
// CommitListener is a component that is used to notify different components that a new committed block is available.
15+
type CommitListener[V types.Hashable[H], H types.Hash, A types.Addr] interface {
16+
// Commit is called by Tendermint when a block has been decided on and can be committed to the DB.
17+
Commit(context.Context, types.Height, V)
18+
// Listen returns a channel that will receive committed blocks.
19+
// This is supposed to be used by the component that writes the committed blocks to the database.
20+
Listen() <-chan sync.CommittedBlock
21+
}
22+
23+
type commitListener[V types.Hashable[H], H types.Hash, A types.Addr] struct {
24+
log utils.Logger
25+
proposalStore *proposal.ProposalStore[H]
26+
proposer proposer.Proposer[V, H]
27+
p2p p2p.P2P[V, H, A]
28+
commits chan sync.CommittedBlock
29+
}
30+
31+
func NewCommitListener[V types.Hashable[H], H types.Hash, A types.Addr](
32+
log utils.Logger,
33+
proposalStore *proposal.ProposalStore[H],
34+
proposer proposer.Proposer[V, H],
35+
p2p p2p.P2P[V, H, A],
36+
) CommitListener[V, H, A] {
37+
commits := make(chan sync.CommittedBlock)
38+
return &commitListener[V, H, A]{
39+
log: log,
40+
proposalStore: proposalStore,
41+
proposer: proposer,
42+
p2p: p2p,
43+
commits: commits,
44+
}
45+
}
46+
47+
func (b *commitListener[V, H, A]) Commit(ctx context.Context, height types.Height, value V) {
48+
buildResult := b.proposalStore.Get(value.Hash())
49+
if buildResult == nil {
50+
b.log.Errorw("failed to get build result", "hash", value.Hash())
51+
return
52+
}
53+
54+
committedBlock := sync.CommittedBlock{
55+
Block: buildResult.Preconfirmed.Block,
56+
StateUpdate: buildResult.Preconfirmed.StateUpdate,
57+
NewClasses: buildResult.Preconfirmed.NewClasses,
58+
Persisted: make(chan struct{}),
59+
}
60+
61+
select {
62+
case <-ctx.Done():
63+
return
64+
case b.commits <- committedBlock:
65+
}
66+
67+
select {
68+
case <-ctx.Done():
69+
return
70+
case <-committedBlock.Persisted:
71+
}
72+
73+
b.proposer.OnCommit(ctx, height, value)
74+
b.p2p.OnCommit(ctx, height, value)
75+
}
76+
77+
func (b *commitListener[V, H, A]) Listen() <-chan sync.CommittedBlock {
78+
return b.commits
79+
}

consensus/driver/driver.go

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,21 @@ import (
66

77
"github.com/NethermindEth/juno/consensus/db"
88
"github.com/NethermindEth/juno/consensus/p2p"
9-
"github.com/NethermindEth/juno/consensus/proposer"
109
"github.com/NethermindEth/juno/consensus/tendermint"
1110
"github.com/NethermindEth/juno/consensus/types"
1211
"github.com/NethermindEth/juno/utils"
1312
)
1413

15-
type timeoutFn func(step types.Step, round types.Round) time.Duration
16-
17-
type Blockchain[V types.Hashable[H], H types.Hash] interface {
18-
// Commit is called by Tendermint when a block has been decided on and can be committed to the DB.
19-
Commit(types.Height, V)
20-
}
14+
type TimeoutFn func(step types.Step, round types.Round) time.Duration
2115

2216
type Driver[V types.Hashable[H], H types.Hash, A types.Addr] struct {
23-
log utils.Logger
24-
db db.TendermintDB[V, H, A]
25-
stateMachine tendermint.StateMachine[V, H, A]
26-
blockchain Blockchain[V, H]
27-
p2p p2p.P2P[V, H, A]
28-
proposer proposer.Proposer[V, H]
17+
log utils.Logger
18+
db db.TendermintDB[V, H, A]
19+
stateMachine tendermint.StateMachine[V, H, A]
20+
commitListener CommitListener[V, H, A]
21+
p2p p2p.P2P[V, H, A]
2922

30-
getTimeout timeoutFn
23+
getTimeout TimeoutFn
3124

3225
scheduledTms map[types.Timeout]*time.Timer
3326
timeoutsCh chan types.Timeout
@@ -37,21 +30,19 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
3730
log utils.Logger,
3831
db db.TendermintDB[V, H, A],
3932
stateMachine tendermint.StateMachine[V, H, A],
40-
blockchain Blockchain[V, H],
33+
commitListener CommitListener[V, H, A],
4134
p2p p2p.P2P[V, H, A],
42-
proposer proposer.Proposer[V, H],
43-
getTimeout timeoutFn,
35+
getTimeout TimeoutFn,
4436
) Driver[V, H, A] {
4537
return Driver[V, H, A]{
46-
log: log,
47-
db: db,
48-
stateMachine: stateMachine,
49-
blockchain: blockchain,
50-
p2p: p2p,
51-
proposer: proposer,
52-
getTimeout: getTimeout,
53-
scheduledTms: make(map[types.Timeout]*time.Timer),
54-
timeoutsCh: make(chan types.Timeout),
38+
log: log,
39+
db: db,
40+
stateMachine: stateMachine,
41+
commitListener: commitListener,
42+
p2p: p2p,
43+
getTimeout: getTimeout,
44+
scheduledTms: make(map[types.Timeout]*time.Timer),
45+
timeoutsCh: make(chan types.Timeout),
5546
}
5647
}
5748

@@ -132,9 +123,7 @@ func (d *Driver[V, H, A]) execute(
132123
}
133124

134125
d.log.Debugw("Committing", "height", action.Height, "round", action.Round)
135-
d.blockchain.Commit(action.Height, *action.Value)
136-
d.proposer.OnCommit(ctx, action.Height, *action.Value)
137-
d.p2p.OnCommit(ctx, action.Height, *action.Value)
126+
d.commitListener.Commit(ctx, action.Height, *action.Value)
138127

139128
if err := d.db.DeleteWALEntries(action.Height); err != nil {
140129
d.log.Errorw("failed to delete WAL messages during commit", "height", action.Height, "round", action.Round, "err", err)

consensus/driver/driver_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type (
2727
listeners = p2p.Listeners[starknet.Value, starknet.Hash, starknet.Address]
2828
broadcasters = p2p.Broadcasters[starknet.Value, starknet.Hash, starknet.Address]
2929
tendermintDB = db.TendermintDB[starknet.Value, starknet.Hash, starknet.Address]
30-
blockchain = driver.Blockchain[starknet.Value, starknet.Hash]
30+
commitListener = driver.CommitListener[starknet.Value, starknet.Hash, starknet.Address]
3131
)
3232

3333
const (
@@ -165,9 +165,8 @@ func TestDriver(t *testing.T) {
165165
utils.NewNopZapLogger(),
166166
newTendermintDB(t),
167167
stateMachine,
168-
newMockBlockchain(t, &commitAction),
168+
newMockCommitListener(t, &commitAction),
169169
p2p,
170-
newMockProposer(),
171170
mockTimeoutFn,
172171
)
173172

0 commit comments

Comments
 (0)