Skip to content

Commit 22ab07d

Browse files
committed
fix #3231 prevent snapshot corruption
Signed-off-by: Sylvere Richard <sylvere.richard@gmail.com>
1 parent 17b8d22 commit 22ab07d

2 files changed

Lines changed: 85 additions & 2 deletions

File tree

manager/state/raft/transport/peer.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,15 @@ func raftMessageStructSize(m *raftpb.Message) int {
145145

146146
// Returns the max allowable payload based on MaxRaftMsgSize and
147147
// the struct size for the given raftpb.Message.
148+
// If the struct overhead exceeds GRPCMaxMsgSize (e.g. very large
149+
// snapshot metadata), the payload size is clamped to a minimum of
150+
// 64 KiB to guarantee forward progress when splitting.
148151
func raftMessagePayloadSize(m *raftpb.Message) int {
149-
return GRPCMaxMsgSize - raftMessageStructSize(m)
152+
s := GRPCMaxMsgSize - raftMessageStructSize(m)
153+
if s < 64<<10 {
154+
s = 64 << 10
155+
}
156+
return s
150157
}
151158

152159
// Split a large raft message into smaller messages.
@@ -164,6 +171,11 @@ func splitSnapshotData(_ context.Context, m *raftpb.Message) []api.StreamRaftMes
164171
// Get the max payload size.
165172
payloadSize := raftMessagePayloadSize(m)
166173

174+
// Capture the full snapshot data before the loop, because
175+
// we need to copy the Snapshot struct for each chunk to avoid
176+
// mutating m.Snapshot.Data through the shared pointer.
177+
fullData := m.Snapshot.Data
178+
167179
// split the snapshot into smaller messages.
168180
for snapDataIndex := 0; snapDataIndex < size; {
169181
chunkSize := size - snapDataIndex
@@ -172,9 +184,13 @@ func splitSnapshotData(_ context.Context, m *raftpb.Message) []api.StreamRaftMes
172184
}
173185

174186
raftMsg := *m
187+
// Copy the Snapshot to avoid mutating the original message's
188+
// Snapshot.Data through the shared pointer.
189+
snap := *m.Snapshot
190+
raftMsg.Snapshot = &snap
175191

176192
// sub-slice for this snapshot chunk.
177-
raftMsg.Snapshot.Data = m.Snapshot.Data[snapDataIndex : snapDataIndex+chunkSize]
193+
raftMsg.Snapshot.Data = fullData[snapDataIndex : snapDataIndex+chunkSize]
178194

179195
snapDataIndex += chunkSize
180196

manager/state/raft/transport/transport_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,73 @@ func testSend(ctx context.Context, c *mockCluster, from uint64, to []uint64, msg
105105
}
106106
}
107107

108+
func TestSplitSnapshotData(t *testing.T) {
109+
ctx := context.Background()
110+
111+
t.Run("NormalSnapshot", func(t *testing.T) {
112+
m := newSnapshotMessage(1, 2)
113+
msgs := splitSnapshotData(ctx, &m)
114+
assert.NotEmpty(t, msgs)
115+
// reassemble and verify
116+
var assembled []byte
117+
for _, msg := range msgs {
118+
assembled = append(assembled, msg.Message.Snapshot.Data...)
119+
}
120+
assert.Equal(t, m.Snapshot.Data, assembled)
121+
})
122+
123+
t.Run("LargeMetadataDoesNotPanic", func(t *testing.T) {
124+
// Simulate a snapshot where struct overhead exceeds GRPCMaxMsgSize.
125+
// This is the scenario from the bug: many cluster objects cause
126+
// large raft message metadata, making payloadSize negative without
127+
// the fix.
128+
data := make([]byte, 5<<20) // 5 MiB snapshot data
129+
for i := range data {
130+
data[i] = byte(i % 256)
131+
}
132+
m := raftpb.Message{
133+
Type: raftpb.MsgSnap,
134+
From: 1,
135+
To: 2,
136+
Snapshot: &raftpb.Snapshot{
137+
Data: data,
138+
Metadata: raftpb.SnapshotMetadata{
139+
Index: uint64(len(data)),
140+
// Large ConfState to push struct size above GRPCMaxMsgSize
141+
ConfState: raftpb.ConfState{
142+
Voters: make([]uint64, 1<<20),
143+
},
144+
},
145+
},
146+
}
147+
// Must not panic
148+
msgs := splitSnapshotData(ctx, &m)
149+
assert.NotEmpty(t, msgs)
150+
// reassemble and verify
151+
var assembled []byte
152+
for _, msg := range msgs {
153+
assembled = append(assembled, msg.Message.Snapshot.Data...)
154+
}
155+
assert.Equal(t, data, assembled)
156+
})
157+
}
158+
159+
func TestRaftMessagePayloadSizeMinimum(t *testing.T) {
160+
// When struct overhead exceeds GRPCMaxMsgSize, payloadSize must
161+
// be clamped to the minimum (64 KiB) instead of going negative.
162+
// Use Entries to inflate the struct size well beyond GRPCMaxMsgSize.
163+
bigEntry := raftpb.Entry{Data: make([]byte, GRPCMaxMsgSize)}
164+
m := &raftpb.Message{
165+
Type: raftpb.MsgSnap,
166+
Entries: []raftpb.Entry{bigEntry},
167+
Snapshot: &raftpb.Snapshot{
168+
Data: nil,
169+
},
170+
}
171+
ps := raftMessagePayloadSize(m)
172+
assert.Equal(t, 64<<10, ps, "payloadSize should be clamped to minimum 64 KiB")
173+
}
174+
108175
func TestSend(t *testing.T) {
109176
ctx, cancel := context.WithCancel(context.Background())
110177
c := newCluster()

0 commit comments

Comments
 (0)