etcd API使用

連接客戶端

訪問etcd首先要創建client,它需要傳入一個Config配置.

Endpoints:etcd的多個節點服務地址。 DialTimeout:創建client的首次連接超時時間,這裡傳了5秒,如果5秒都沒有連接成功就會返回err;一旦client創建成功,不用再關心後續底層連接的狀態了,client內部會重連。

 cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})

返回的 client,它的類型具體如下:

type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
Username string
Password string
}

類型中的成員是etcd客戶端幾何核心功能模塊的具體實現:

Cluster:向集群裡增加刪除etcd節點,集群管理

KV:K-V鍵值庫

Lease:租約相關操作,租約過期會刪除授權的key

Watcher:觀察訂閱,從而監聽最新的數據變化

Auth:管理etcd的用戶和權限,屬於管理員操作

Maintenance:維護etcd,比如主動遷移etcd的leader節點

KV增刪改查

kv對象

type KV interface {
// Put puts a key-value pair into etcd.
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}

KV存儲讀取刪除

  1. kv對象實例獲取 kv := clientev3.NewKV(client)
  2. 存儲Put
putResp, err := kv.Put(context.TODO(),"/key/example", "hello")
  1. Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

參數介紹:

ctx: Context包對象,是用來跟蹤上下文的,列如超時控制

key: 存儲對象的key

val: 存儲對象的value

opts: 可變參數,額外選項

  1. 查詢Get getResp, err := kv.Get(context.TODO(), "/key/example")
  2. 刪除Delete delREsp, err := kv.Delete(context.TODO(), "/key/example")
  3. WithPrefix
rangeResp, err := kv.Get(context.TODO(), "/key/example", clientv3.WithPrefix())
  1. WithPrefix()是指查找以/key/example為前綴的所有key

租約lease

租約對象

type Lease interface {
// Grant creates a new lease. 分配租約
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease. 撤銷租約
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID. 獲取ttl剩餘時間
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases. 列舉所有租約
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever
// 租約自動定時續期
KeepAlive(ctx context.Context, id LeaseID) ( // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
//續期一次租約
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}

租約使用

  1. 獲取租約
  2. 設置租約ttl
  3. 對keyPut時使用租約,租約到期key自動刪除
  4. 主動給Lease進行續約
lease := clientv3.NewLease(client)
grantResp, err := lease.Grant(context.TODO(), 10)
kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID))
keepResp, err := lease.KeepAlive(context.TODO(), grantResp.ID)
// sleep一會...

事務Tnx

etcd中事務是原子執行的,針對kv存儲操作的if … then … else結構.將請求歸併到一起放在原子塊中.事務可以保護key不受其他併發更新操作的影響.

etcd的實現方式有些不同,它的事務是基於cas方式實現的。在事務執行過程中,client和etcd之間沒有維護事務會話,在commit事務時,它的“衝突判斷(If)和執行過程Then/Else”一次性提交給etcd,etcd來作為一個原子過程來執行“If-Then-Else”。所以,etcd事務不會發生阻塞,無論成功,還是失敗,都會立即返回,需要應用進行失敗(發生衝突)重試。因此,這也就要求業務代碼是可重試的。

type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,

// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
  1. 通過kv對象開啟一個事務
  2. if then else 書寫事務
  3. 最後Commit提交整個Txn事務
txn := kv.Txn(context.TODO())
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=","hello")).
Then(clientv3.OpGet("/hi")).
Else(clientv3.OpGet("/test/",clientv3.WithPrefix())).
Commit()
if txnResp.Succeeded { // If = true
fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
} else { // If =false
fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
}

這個Value(“/hi”)返回的Cmp表達了:“/hi這個key對應的value”

利用Compare函數來繼續為”主語”增加描述, 形成完整的條件語句:/hi這個key對應的value必須等於"hello"

監聽watch

watch接口

type Watcher interface { 

// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
}

Watch 方法返回一個WatchChan 類似的變量,該通道傳遞WatchResponse類型

//創建監聽
wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
//range 監聽事件
for v := range wc {
for _, e := range v.Events {
log.Printf("type:%v kv:%v val:%v prevKey:%v \\n ", e.Type, string(e.Kv.Key),e.Kv.Value, e.PrevKv)
}
}
etcd API使用


分享到:


相關文章: