Bug 2 — Checkpoint never written after snapshot (input_mysql_stream.go:762)
Snapshot rows emit Position: nil, so the ack handler never writes a checkpoint. A checkpoint only gets written on the first CDC event. Any restart before that → full re-snapshot from scratch. We saw the snapshot run 1,785 times before any checkpoint was written. Fix: write the binlog position to checkpoint immediately after snapshot completes.
Bug 2 — Checkpoint never written after snapshot input_mysql_stream.go
Confirmed. Snapshot rows are emitted with Position: nil:
// readSnapshot
case i.rawMessageEvents <- MessageEvent{
...
Position: nil, // ← always nil for snapshot rows
}:
In flushBatch, the ackFn silently skips checkpoint writes for nil positions:
ackFn: func(ctx context.Context, _ error) error {
...
offset := *maxOffset
if offset == nil {
return nil // ← no-op, binlog position never persisted
}
return i.setCachedBinlogPosition(ctx, *offset)
},
After startMySQLSync completes the snapshot, startPos is captured and used to start canal (RunFrom(*pos)), but it is never written to the checkpoint cache. A checkpoint is only written on the first real CDC event ack. Any restart before that → cache miss → streamSnapshot && pos == nil →
full re-snapshot from row 0.
Your proposed fix is correct: after snapshot.releaseSnapshot() succeeds in startMySQLSync, write startPos to the checkpoint cache immediately before calling canal.RunFrom.
Bug 2 — Checkpoint never written after snapshot (input_mysql_stream.go:762)
Snapshot rows emit Position: nil, so the ack handler never writes a checkpoint. A checkpoint only gets written on the first CDC event. Any restart before that → full re-snapshot from scratch. We saw the snapshot run 1,785 times before any checkpoint was written. Fix: write the binlog position to checkpoint immediately after snapshot completes.
Bug 2 — Checkpoint never written after snapshot input_mysql_stream.go
Confirmed. Snapshot rows are emitted with Position: nil:
// readSnapshot
case i.rawMessageEvents <- MessageEvent{
...
Position: nil, // ← always nil for snapshot rows
}:
In flushBatch, the ackFn silently skips checkpoint writes for nil positions:
ackFn: func(ctx context.Context, _ error) error {
...
offset := *maxOffset
if offset == nil {
return nil // ← no-op, binlog position never persisted
}
return i.setCachedBinlogPosition(ctx, *offset)
},
After startMySQLSync completes the snapshot, startPos is captured and used to start canal (RunFrom(*pos)), but it is never written to the checkpoint cache. A checkpoint is only written on the first real CDC event ack. Any restart before that → cache miss → streamSnapshot && pos == nil →
full re-snapshot from row 0.
Your proposed fix is correct: after snapshot.releaseSnapshot() succeeds in startMySQLSync, write startPos to the checkpoint cache immediately before calling canal.RunFrom.