Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type FailOverConfig struct {
// propagates the updated topology. Requires kvrocks to support node status
// modification (new versions only). Defaults to false for backward compatibility.
EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"`
WaitForSync bool `yaml:"wait_for_sync"`
Comment thread
Paragrf marked this conversation as resolved.
}

type ControllerConfig struct {
Expand Down
1 change: 1 addition & 0 deletions consts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ var (
ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
ErrCannotOfflineMaster = errors.New("cannot take master node offline, failover first")
ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal")
ErrSyncTimeout = errors.New("replication sync timeout")
)
22 changes: 14 additions & 8 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ClusterCheckOptions struct {
pingInterval time.Duration
maxFailureCount int64
enableSlaveHAUpdate bool
failoverOpts store.FailoverOptions
}

type ClusterChecker struct {
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker {
options: ClusterCheckOptions{
pingInterval: time.Second * 3,
maxFailureCount: 5,
failoverOpts: store.DefaultFailoverOptions(),
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -110,6 +112,11 @@ func (c *ClusterChecker) WithSlaveHAUpdate(enable bool) *ClusterChecker {
return c
}

func (c *ClusterChecker) WithFailoverOptions(opts store.FailoverOptions) *ClusterChecker {
c.options.failoverOpts = opts
return c
}

func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64, error) {
clusterInfo, err := node.GetClusterInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -174,20 +181,19 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i
log.Error("Failed to get the cluster info", zap.Error(err))
return count
}
newMasterID, err := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "")
if err != nil {
log.Error("Failed to promote the new master", zap.Error(err))
_, newMaster, promoteErr := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "", c.options.failoverOpts)
if promoteErr != nil {
log.Error("Failed to promote the new master", zap.Error(promoteErr))
return count
}
err = c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster)
if err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
if updateErr := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); updateErr != nil {
log.Error("Failed to persist cluster after promoting new master", zap.Error(updateErr))
return count
}
// the node is normal if it can be elected as the new master,
// because it requires the node is healthy.
c.resetFailureCount(newMasterID)
log.With(zap.String("new_master_id", newMasterID)).Info("Promote the new master")
c.resetFailureCount(newMaster.ID())
log.With(zap.String("new_master_id", newMaster.ID())).Info("Promote the new master")
}
return count
}
Expand Down
4 changes: 2 additions & 2 deletions server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type Handler struct {
Raft *RaftHandler
}

func NewHandler(s *store.ClusterStore) *Handler {
func NewHandler(s *store.ClusterStore, waitForSync bool) *Handler {
return &Handler{
Namespace: &NamespaceHandler{s: s},
Cluster: &ClusterHandler{s: s},
Shard: &ShardHandler{s: s},
Shard: &ShardHandler{s: s, configWaitForSync: waitForSync},
Node: &NodeHandler{s: s},
Raft: &RaftHandler{},
}
Expand Down
75 changes: 69 additions & 6 deletions server/api/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/gin-gonic/gin"
"go.uber.org/zap"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/apache/kvrocks-controller/store"
)

type ShardHandler struct {
s store.Store
s store.Store
configWaitForSync bool
}

type SlotsRequest struct {
Expand Down Expand Up @@ -114,12 +119,21 @@ func (handler *ShardHandler) Remove(c *gin.Context) {
helper.ResponseNoContent(c)
}

// FailoverOpts holds optional parameters for manual failover.
type FailoverOpts struct {
WaitForSync bool `json:"wait_for_sync"`
ForceOnTimeout bool `json:"force_on_timeout"`
SyncTimeoutMs int `json:"sync_timeout_ms"` // 0 means use default
PauseTimeoutMs int `json:"pause_timeout_ms"` // 0 means use default
}

func (handler *ShardHandler) Failover(c *gin.Context) {
ns := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)

var req struct {
PreferredNodeID string `json:"preferred_node_id"`
PreferredNodeID string `json:"preferred_node_id"`
Options *FailoverOpts `json:"options"`
}
if c.Request.Body != nil {
if err := c.ShouldBindJSON(&req); err != nil {
Expand All @@ -131,16 +145,65 @@ func (handler *ShardHandler) Failover(c *gin.Context) {
helper.ResponseBadRequest(c, fmt.Errorf("invalid node id: %s", req.PreferredNodeID))
return
}
// We have checked this if statement in middleware.RequiredClusterShard
shardIndex, _ := strconv.Atoi(c.Param("shard"))
newMasterNodeID, err := cluster.PromoteNewMaster(c, shardIndex, "", req.PreferredNodeID)

opts := store.DefaultFailoverOptions()
if handler.configWaitForSync {
opts.WaitForSync = true
} else if req.Options != nil {
opts.WaitForSync = req.Options.WaitForSync
}
if req.Options != nil {
if req.Options.SyncTimeoutMs > 0 {
opts.SyncTimeout = time.Duration(req.Options.SyncTimeoutMs) * time.Millisecond
}
if req.Options.PauseTimeoutMs > 0 {
opts.PauseDuration = time.Duration(req.Options.PauseTimeoutMs) * time.Millisecond
}
opts.ForceOnTimeout = req.Options.ForceOnTimeout
}

shardIndex, err := strconv.Atoi(c.Param("shard"))
if err != nil {
helper.ResponseBadRequest(c, err)
return
}
oldMaster, newMaster, err := cluster.PromoteNewMaster(c, shardIndex, "", req.PreferredNodeID, opts)
if err != nil {
helper.ResponseError(c, err)
return
}

unpauseOldMaster := func() {
if !opts.WaitForSync {
return
}
if e := oldMaster.UnpauseClient(c); e != nil {
logger.Get().With(zap.Error(e), zap.String("node", oldMaster.Addr())).Error("Failed to unpause old master")
}
}

if err := handler.s.UpdateCluster(c, ns, cluster); err != nil {
unpauseOldMaster()
helper.ResponseError(c, err)
return
}
helper.ResponseOK(c, gin.H{"new_master_id": newMasterNodeID})

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if e := oldMaster.SyncClusterInfo(c, cluster); e != nil {
logger.Get().With(zap.Error(e), zap.String("node", oldMaster.Addr())).Warn("Failed to sync cluster info to old master")
}
}()
go func() {
defer wg.Done()
if e := newMaster.SyncClusterInfo(c, cluster); e != nil {
logger.Get().With(zap.Error(e), zap.String("node", newMaster.Addr())).Warn("Failed to sync cluster info to new master")
}
}()
wg.Wait()

unpauseOldMaster()
helper.ResponseOK(c, gin.H{"new_master_id": newMaster.ID()})
}
3 changes: 2 additions & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (srv *Server) initHandlers() {
c.Set(consts.ContextKeyStore, srv.store)
c.Next()
}, middleware.RedirectIfNotLeader)
handler := api.NewHandler(srv.store)
waitForSync := srv.config.Controller != nil && srv.config.Controller.FailOver != nil && srv.config.Controller.FailOver.WaitForSync
handler := api.NewHandler(srv.store, waitForSync)

engine.Any("/debug/pprof/*profile", PProf)
engine.GET("/metrics", gin.WrapH(promhttp.Handler()))
Expand Down
11 changes: 5 additions & 6 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,17 @@ func (cluster *Cluster) RemoveNode(shardIndex int, nodeID string) error {
}

func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
shardIdx int, masterNodeID, preferredNodeID string,
) (string, error) {
shardIdx int, masterNodeID, preferredNodeID string, opts FailoverOptions) (oldMasterNode Node, newMasterNode Node, err error) {
shard, err := cluster.GetShard(shardIdx)
if err != nil {
return "", err
return nil, nil, err
}
newMasterNodeID, err := shard.promoteNewMaster(ctx, masterNodeID, preferredNodeID)
oldMaster, newMaster, err := shard.promoteNewMaster(ctx, masterNodeID, preferredNodeID, opts)
if err != nil {
return "", err
return nil, nil, err
}
cluster.Shards[shardIdx] = shard
return newMasterNodeID, nil
return oldMaster, newMaster, nil
}

func (cluster *Cluster) SyncToNodes(ctx context.Context) error {
Expand Down
39 changes: 37 additions & 2 deletions store/cluster_mock_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

package store

import "context"
import (
"context"
"time"
)

// ClusterMockNode is a mock implementation of the Node interface,
// it is used for testing purposes.
type ClusterMockNode struct {
*ClusterNode

Sequence uint64
Sequence uint64
MasterReplOffset uint64 // used when simulating master in GetReplicationInfo
SlaveOffset uint64 // used when simulating slave offset in GetReplicationInfo
SlaveAddr string // when master, slave Addr for matching; empty means use mock.Addr()
}

var _ Node = (*ClusterMockNode)(nil)
Expand All @@ -53,3 +59,32 @@ func (mock *ClusterMockNode) SyncClusterInfo(ctx context.Context, cluster *Clust
func (mock *ClusterMockNode) Reset(ctx context.Context) error {
return nil
}

func (mock *ClusterMockNode) PauseClient(ctx context.Context, timeout time.Duration) error {
return nil
}

func (mock *ClusterMockNode) UnpauseClient(ctx context.Context) error {
return nil
}

func (mock *ClusterMockNode) GetReplicationInfo(ctx context.Context) (*ReplicationInfo, error) {
if mock.IsMaster() {
addr := mock.SlaveAddr
if addr == "" {
addr = mock.Addr()
}
return &ReplicationInfo{
Role: RoleMaster,
MasterReplOffset: mock.MasterReplOffset,
Slaves: []SlaveReplInfo{
{Addr: addr, Offset: mock.SlaveOffset},
},
}, nil
}
return &ReplicationInfo{
Role: RoleSlave,
MasterReplOffset: mock.SlaveOffset,
SlaveReplOffset: mock.SlaveOffset,
}, nil
}
Loading
Loading