-
etcd is a distributed reliable key-value store for the most critical data of a distributed system…
-
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
service KV {
rpc Range(RangeRequest) returns (RangeResponse) {
option (google.api.http) = {
post: "/v3beta/kv/range"
body: "*"
};
}
rpc Put(PutRequest) returns (PutResponse) {
option (google.api.http) = {
post: "/v3beta/kv/put"
body: "*"
};
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L190-225
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
r.becomeFollower(1, None)
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
r.addNode(peer.ID)
}
n := newNode()
go n.run(r)
return &n
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L291-423
func (n *node) run(r *raft) {
lead := None
for {
if lead != r.lead {
lead = r.lead
}
select {
case m := n.recvc:
r.Step(m)
case n.tickc:
r.tick()
case n.stop:
close(n.done)
return
}
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L931-1142
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
// ...
}
//...
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L518-534
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
commit := min(r.getProgress(to).Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
r.send(m)
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1191-1247
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
// ...
}
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
func stepCandidate(r *raft, m pb.Message) error {
// ...
switch m.Type {
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
}
// ...
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
func (r *raft) Step(m pb.Message) error {
// ...
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
}
// ...
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L730-766
func (r *raft) campaign(t CampaignType) {
r.becomeCandidate()
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
r.becomeLeader()
return
}
for id := range r.prs {
if id == r.id {
continue
}
r.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
func (r *raft) Step(m pb.Message) error {
// ...
switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
canVote := r.Vote == m.From || (r.Vote == None && r.lead == None)
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp})
r.electionElapsed = 0
r.Vote = m.From
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true})
}
}
// ...
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
func stepCandidate(r *raft, m pb.Message) error {
switch m.Type {
// ...
case pb.MsgVoteResp:
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {
case gr:
r.becomeLeader()
r.bcastAppend()
// ...
}
}
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L671-678
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L680-691
func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L708-728
func (r *raft) becomeLeader() {
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
r.pendingConfIndex = r.raftLog.lastIndex()
r.appendEntry(pb.Entry{Data: nil})
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L646-669
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L51-69
type Backend interface {
ReadTx() ReadTx
BatchTx() BatchTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Size() int64
SizeInUse() int64
Defrag() error
ForceCommit()
Close() error
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L80-104
type backend struct {
size int64
sizeInUse int64
commits int64
mu sync.RWMutex
db *bolt.DB
batchInterval time.Duration
batchLimit int
batchTx *batchTxBuffered
readTx *readTx
stopc chan struct{}
donec chan struct{}
lg *zap.Logger
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L30-36
type ReadTx interface {
Lock()
Unlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L28-38
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
Commit()
CommitAndStop()
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L137-176
func newBackend(bcfg BackendConfig) *backend {
bopts := &bolt.Options{}
bopts.InitialMmapSize = bcfg.mmapSize()
db, _ := bolt.Open(bcfg.Path, 0600, bopts)
b := &backend{
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
}
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L289-305
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case t.C:
case b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L38-47
type readTx struct {
mu sync.RWMutex
buf txReadBuffer
txmu sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L52-90
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
limit = 1
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
bn := string(bucketName)
bucket, ok := rt.buckets[bn]
if !ok {
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
}
if bucket == nil {
return keys, vals
}
c := bucket.Cursor()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L121-141
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
var isMatch func(b []byte) bool
if len(endKey) > 0 {
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
} else {
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
limit = 1
}
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
if limit == int64(len(keys)) {
break
}
}
return keys, vs
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L40-46
type batchTx struct {
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L243-246
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L65-67
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false)
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L74-103
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if err := bucket.Put(key, value); err != nil {
plog.Fatalf("cannot put key into bucket (%v)", err)
}
t.pending++
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L144-169
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
err := bucket.Delete(key)
if err != nil {
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
t.pending++
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L184-188
func (t *batchTx) Commit() {
t.Lock()
t.commit(false)
t.Unlock()
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L210-241
func (t *batchTx) commit(stop bool) {
if t.tx != nil {
if t.pending == 0 && !stop {
return
}
start := time.Now()
err := t.tx.Commit()
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
commitSec.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)
t.pending = 0
}
if !stop {
t.tx = t.backend.begin(true)
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L68-76
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
return keyi.get(ti.lg, atRev)
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L84-89
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if item := ti.tree.Get(keyi); item != nil {
return item.(*keyIndex)
}
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L149-171
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
g := ki.findGeneration(atRev)
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
n := g.walk(func(rev revision) bool { return rev.main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return revision{}, revision{}, 0, ErrRevisionNotFound
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L127-145
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L112-165
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
revpairs := tr.s.kvindex.Revisions(key, end, rev)
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
kvs := make([]mvccpb.KeyValue, int(ro.Limit))
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
kvs[i].Unmarshal(vs[0])
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L106-120
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil
}
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
}
})
return revs
}
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
g := ki.findGeneration(atRev)
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
n := g.walk(func(rev revision) bool { return rev.main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return revision{}, revision{}, 0, ErrRevisionNotFound
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L53-66
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(ti.lg, rev.main, rev.sub)
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L77-104
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
}
g := &ki.generations[len(ki.generations)-1]
if len(g.revs) == 0 {
g.created = rev
}
g.revs = append(g.revs, rev)
g.ver++
ki.modified = rev
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L252-309
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
kv := mvccpb.KeyValue{Key: key}
d, _ := kv.Marshal()
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Tombstone(key, idxRev)
tw.changes = append(tw.changes, kv)
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L321-433
func (s *store) restore() error {
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
tx := s.b.BatchTx()
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
close(rkvc)
s.currentRev = revc
return nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L486-506
func restoreChunk(lg *zap.Logger, kvc chan revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
for i, key := range keys {
rkv := r evKeyValue{key: key}
_ := rkv.kv.Unmarshal(vals[i])
rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) {
delete(keyToLease, rkv.kstr)
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
keyToLease[rkv.kstr] = lid
} else {
delete(keyToLease, rkv.kstr)
}
kvc rkv
}
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L441-484
func restoreIntoIndex(lg *zap.Logger, idx index) (chan revKeyValue, chan int64) {
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
go func() {
currentRev := int64(1)
defer func() { revc currentRev }()
for rkv := range rkvc {
ki = &keyIndex{key: rkv.kv.Key}
ki := idx.KeyIndex(ki)
rev := bytesToRev(rkv.key)
currentRev = rev.main
if ok {
if isTombstone(rkv.key) {
ki.tombstone(lg, rev.main, rev.sub)
continue
}
ki.put(lg, rev.main, rev.sub)
} else if !isTombstone(rkv.key) {
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
idx.Insert(ki)
}
}
}()
return rkvc, revc
}
恢復記憶體索引的相關程式碼在實現上非常值得學習,兩個不同的函式透過 Channel 進行通訊並使用 goroutine 處理任務,能夠很好地將訊息的『生產者』和『消費者』進行分離。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kv.go#L100-125
type KV interface {
ReadView
WriteView
Read() TxnRead
Write() TxnWrite
Hash() (hash uint32, revision int64, err error)
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
Compact(rev int64) (chan struct{}, error)
Commit()
Restore(b backend.Backend) error
Close() error
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L32-40
func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L45-65
type watchableStore struct {
*store
mu sync.RWMutex
unsynced watcherGroup
synced watcherGroup
stopc chan struct{}
wg sync.WaitGroup
}
func (s *watchableStore) syncWatchers() int {
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes)
tx := s.store.b.ReadTx()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
evs := kvsToEvents(nil, wg, revs, vs)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
w.minRev = curRev + 1
eb, ok := wb[w]
if !ok {
s.synced.add(w)
s.unsynced.delete(w)
continue
}
w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev})
s.synced.add(w)
s.unsynced.delete(w)
}
return s.unsynced.size()
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watcher.go#L108-135
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
if id == AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
}
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.cancels[id] = c
ws.watchers[id] = w
return id, nil
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L111-142
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
wa := &watcher{
key: key,
end: end,
minRev: startRev,
id: id,
ch: ch,
fcs: fcs,
}
synced := startRev > s.store.currentRev || startRev == 0
if synced {
s.synced.add(wa)
} else {
s.unsynced.add(wa)
}
return wa, func() { s.cancelWatcher(wa) }
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L136-206
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
// ...
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
}
sws.wg.Add(1)
go func() {
sws.sendLoop()
sws.wg.Done()
}()
go func() {
sws.recvLoop()
}()
sws.wg.Wait()
return err
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L220-334
func (sws *serverWatchStream) recvLoop() error {
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
creq := uv.CreateRequest
filters := FiltersFromRequest(creq)
wsrev := sws.watchStream.Rev()
rev := creq.StartRevision
id, _ := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: err != nil,
}
select {
case sws.ctrlStream wr:
case sws.closec:
return nil
}
case *pb.WatchRequest_CancelRequest: // ...
case *pb.WatchRequest_ProgressRequest: // ...
default:
continue
}
}
}
func (sws *serverWatchStream) sendLoop() {
for {
select {
case wresp, ok := sws.watchStream.Chan():
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
for i := range evs {
events[i] = &evs[i] }
canceled := wresp.CompactRevision != 0
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wresp.Revision),
WatchId: int64(wresp.WatchID),
Events: events,
CompactRevision: wresp.CompactRevision,
Canceled: canceled,
}
sws.gRPCStream.Send(wr)
case c, ok := sws.ctrlStream: // ...
case progressTicker.C: // ...
case sws.closec:
return
}
}
}