raft – github.com/coreos/raft Index | Files | Directories

package raft

import "github.com/coreos/raft"

Index

Constants

const (
	Debug = 1
	Trace = 2
)
const (
	StateChangeEventType  = "stateChange"
	LeaderChangeEventType = "leaderChange"
	TermChangeEventType   = "termChange"
	CommitEventType       = "commit"
	AddPeerEventType      = "addPeer"
	RemovePeerEventType   = "removePeer"

	HeartbeatIntervalEventType        = "heartbeatInterval"
	ElectionTimeoutThresholdEventType = "electionTimeoutThreshold"

	HeartbeatEventType = "heartbeat"
)
const (
	Stopped      = "stopped"
	Initialized  = "initialized"
	Follower     = "follower"
	Candidate    = "candidate"
	Leader       = "leader"
	Snapshotting = "snapshotting"
)
const (
	MaxLogEntriesPerRequest         = 2000
	NumberOfLogEntriesAfterSnapshot = 200
)
const (
	// DefaultHeartbeatInterval is the interval that the leader will send
	// AppendEntriesRequests to followers to maintain leadership.
	DefaultHeartbeatInterval = 50 * time.Millisecond

	DefaultElectionTimeout = 150 * time.Millisecond
)
const ElectionTimeoutThresholdPercent = 0.8

ElectionTimeoutThresholdPercent specifies the threshold at which the server will dispatch warning events that the heartbeat RTT is too close to the election timeout.

Variables

var CommandTimeoutError = errors.New("raft: Command timeout")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var NotLeaderError = errors.New("raft.Server: Not current leader")

Functions

func LogLevel

func LogLevel() int

func RegisterCommand

func RegisterCommand(command Command)

Registers a command by storing a reference to an instance of it.

func SetLogLevel

func SetLogLevel(level int)

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	Term         uint64
	PrevLogIndex uint64
	PrevLogTerm  uint64
	CommitIndex  uint64
	LeaderName   string
	Entries      []*protobuf.LogEntry
}

The request sent to a server to append entries to the log.

func (*AppendEntriesRequest) Decode

func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error)

Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*AppendEntriesRequest) Encode

func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error)

Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type AppendEntriesResponse

type AppendEntriesResponse struct {
	// contains filtered or unexported fields
}

The response returned from a server appending entries to the log.

func (*AppendEntriesResponse) CommitIndex

func (aer *AppendEntriesResponse) CommitIndex() uint64

func (*AppendEntriesResponse) Decode

func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)

Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*AppendEntriesResponse) Encode

func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)

Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

func (*AppendEntriesResponse) Index

func (aer *AppendEntriesResponse) Index() uint64

func (*AppendEntriesResponse) Success

func (aer *AppendEntriesResponse) Success() bool

func (*AppendEntriesResponse) Term

func (aer *AppendEntriesResponse) Term() uint64

type Command

type Command interface {
	CommandName() string
}

Command represents an action to be taken on the replicated state machine.

type CommandApply

type CommandApply interface {
	Apply(Context) (interface{}, error)
}

CommandApply represents the interface to apply a command to the server.

type CommandEncoder

type CommandEncoder interface {
	Encode(w io.Writer) error
	Decode(r io.Reader) error
}

type Config

type Config struct {
	CommitIndex uint64 `json:"commitIndex"`
	// TODO decide what we need to store in peer struct
	Peers []*Peer `json:"peers"`
}

type Context

type Context interface {
	Server() Server
	CurrentTerm() uint64
	CurrentIndex() uint64
	CommitIndex() uint64
}

Context represents the current state of the server. It is passed into a command when the command is being applied since the server methods are locked.

type DefaultJoinCommand

type DefaultJoinCommand struct {
	Name             string `json:"name"`
	ConnectionString string `json:"connectionString"`
}

Join command

func (*DefaultJoinCommand) Apply

func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error)

func (*DefaultJoinCommand) CommandName

func (c *DefaultJoinCommand) CommandName() string

The name of the Join command in the log

func (*DefaultJoinCommand) NodeName

func (c *DefaultJoinCommand) NodeName() string

type DefaultLeaveCommand

type DefaultLeaveCommand struct {
	Name string `json:"name"`
}

Leave command

func (*DefaultLeaveCommand) Apply

func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error)

func (*DefaultLeaveCommand) CommandName

func (c *DefaultLeaveCommand) CommandName() string

The name of the Leave command in the log

func (*DefaultLeaveCommand) NodeName

func (c *DefaultLeaveCommand) NodeName() string

type Event

type Event interface {
	Type() string
	Source() interface{}
	Value() interface{}
	PrevValue() interface{}
}

Event represents an action that occurred within the Raft library. Listeners can subscribe to event types by using the Server.AddEventListener() function.

type EventListener

type EventListener func(Event)

EventListener is a function that can receive event notifications.

type HTTPMuxer

type HTTPMuxer interface {
	HandleFunc(string, func(http.ResponseWriter, *http.Request))
}

type HTTPTransporter

type HTTPTransporter struct {
	DisableKeepAlives bool

	Transport *http.Transport
	// contains filtered or unexported fields
}

An HTTPTransporter is a default transport layer used to communicate between multiple servers.

func NewHTTPTransporter

func NewHTTPTransporter(prefix string) *HTTPTransporter

Creates a new HTTP transporter with the given path prefix.

func (*HTTPTransporter) AppendEntriesPath

func (t *HTTPTransporter) AppendEntriesPath() string

Retrieves the AppendEntries path.

func (*HTTPTransporter) Install

func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)

Applies Raft routes to an HTTP router for a given server.

func (*HTTPTransporter) Prefix

func (t *HTTPTransporter) Prefix() string

Retrieves the path prefix used by the transporter.

func (*HTTPTransporter) RequestVotePath

func (t *HTTPTransporter) RequestVotePath() string

Retrieves the RequestVote path.

func (*HTTPTransporter) SendAppendEntriesRequest

func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse

Sends an AppendEntries RPC to a peer.

func (*HTTPTransporter) SendSnapshotRecoveryRequest

func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse

Sends a SnapshotRequest RPC to a peer.

func (*HTTPTransporter) SendSnapshotRequest

func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse

Sends a SnapshotRequest RPC to a peer.

func (*HTTPTransporter) SendVoteRequest

func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse

Sends a RequestVote RPC to a peer.

func (*HTTPTransporter) SnapshotPath

func (t *HTTPTransporter) SnapshotPath() string

Retrieves the Snapshot path.

func (*HTTPTransporter) SnapshotRecoveryPath

func (t *HTTPTransporter) SnapshotRecoveryPath() string

Retrieves the SnapshotRecovery path.

type JoinCommand

type JoinCommand interface {
	Command
	NodeName() string
}

Join command interface

type LeaveCommand

type LeaveCommand interface {
	Command
	NodeName() string
}

Leave command interface

type Log

type Log struct {
	ApplyFunc func(*LogEntry, Command) (interface{}, error)
	// contains filtered or unexported fields
}

A log is a collection of log entries that are persisted to durable storage.

func (*Log) CommitIndex

func (l *Log) CommitIndex() uint64

The last committed index in the log.

type LogEntry

type LogEntry struct {
	Position int64 // position in the log file
	// contains filtered or unexported fields
}

A log entry stores a single item in the log.

func (*LogEntry) Command

func (e *LogEntry) Command() []byte

func (*LogEntry) CommandName

func (e *LogEntry) CommandName() string

func (*LogEntry) Decode

func (e *LogEntry) Decode(r io.Reader) (int, error)

Decodes the log entry from a buffer. Returns the number of bytes read and any error that occurs.

func (*LogEntry) Encode

func (e *LogEntry) Encode(w io.Writer) (int, error)

Encodes the log entry to a buffer. Returns the number of bytes written and any error that may have occurred.

func (*LogEntry) Index

func (e *LogEntry) Index() uint64

func (*LogEntry) Term

func (e *LogEntry) Term() uint64

type NOPCommand

type NOPCommand struct {
}

NOP command

func (NOPCommand) Apply

func (c NOPCommand) Apply(server Server) (interface{}, error)

func (NOPCommand) CommandName

func (c NOPCommand) CommandName() string

The name of the NOP command in the log

func (NOPCommand) Decode

func (c NOPCommand) Decode(r io.Reader) error

func (NOPCommand) Encode

func (c NOPCommand) Encode(w io.Writer) error

type Peer

type Peer struct {
	Name             string `json:"name"`
	ConnectionString string `json:"connectionString"`
	// contains filtered or unexported fields
}

A peer is a reference to another server involved in the consensus protocol.

func (*Peer) LastActivity

func (p *Peer) LastActivity() time.Time

LastActivity returns the last time any response was received from the peer.

type RequestVoteRequest

type RequestVoteRequest struct {
	Term          uint64
	LastLogIndex  uint64
	LastLogTerm   uint64
	CandidateName string
	// contains filtered or unexported fields
}

The request sent to a server to vote for a candidate to become a leader.

func (*RequestVoteRequest) Decode

func (req *RequestVoteRequest) Decode(r io.Reader) (int, error)

Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*RequestVoteRequest) Encode

func (req *RequestVoteRequest) Encode(w io.Writer) (int, error)

Encodes the RequestVoteRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type RequestVoteResponse

type RequestVoteResponse struct {
	Term        uint64
	VoteGranted bool
	// contains filtered or unexported fields
}

The response returned from a server after a vote for a candidate to become a leader.

func (*RequestVoteResponse) Decode

func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error)

Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*RequestVoteResponse) Encode

func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error)

Encodes the RequestVoteResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

type Server

type Server interface {
	Name() string
	Context() interface{}
	StateMachine() StateMachine
	Leader() string
	State() string
	Path() string
	LogPath() string
	SnapshotPath(lastIndex uint64, lastTerm uint64) string
	Term() uint64
	CommitIndex() uint64
	VotedFor() string
	MemberCount() int
	QuorumSize() int
	IsLogEmpty() bool
	LogEntries() []*LogEntry
	LastCommandName() string
	GetState() string
	ElectionTimeout() time.Duration
	SetElectionTimeout(duration time.Duration)
	HeartbeatInterval() time.Duration
	SetHeartbeatInterval(duration time.Duration)
	Transporter() Transporter
	SetTransporter(t Transporter)
	AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
	RequestVote(req *RequestVoteRequest) *RequestVoteResponse
	RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
	SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
	AddPeer(name string, connectiongString string) error
	RemovePeer(name string) error
	Peers() map[string]*Peer
	Init() error
	Start() error
	Stop()
	Running() bool
	Do(command Command) (interface{}, error)
	TakeSnapshot() error
	LoadSnapshot() error
	AddEventListener(string, EventListener)
	FlushCommitIndex()
}

A server is involved in the consensus protocol and can act as a follower, candidate or a leader.

func NewServer

func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error)

Creates a new server with a log at the given path. transporter must not be nil. stateMachine can be nil if snapshotting and log compaction is to be disabled. context can be anything (including nil) and is not used by the raft package except returned by Server.Context(). connectionString can be anything.

type Snapshot

type Snapshot struct {
	LastIndex uint64 `json:"lastIndex"`
	LastTerm  uint64 `json:"lastTerm"`

	// Cluster configuration.
	Peers []*Peer `json:"peers"`
	State []byte  `json:"state"`
	Path  string  `json:"path"`
}

Snapshot represents an in-memory representation of the current state of the system.

type SnapshotRecoveryRequest

type SnapshotRecoveryRequest struct {
	LeaderName string
	LastIndex  uint64
	LastTerm   uint64
	Peers      []*Peer
	State      []byte
}

The request sent to a server to start from the snapshot.

func (*SnapshotRecoveryRequest) Decode

func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error)

Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotRecoveryRequest) Encode

func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error)

Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type SnapshotRecoveryResponse

type SnapshotRecoveryResponse struct {
	Term        uint64
	Success     bool
	CommitIndex uint64
}

The response returned from a server appending entries to the log.

func (*SnapshotRecoveryResponse) Decode

func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error)

Decodes the SnapshotRecoveryResponse from a buffer.

func (*SnapshotRecoveryResponse) Encode

func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error)

Encode writes the response to a writer. Returns the number of bytes written and any error that occurs.

type SnapshotRequest

type SnapshotRequest struct {
	LeaderName string
	LastIndex  uint64
	LastTerm   uint64
}

The request sent to a server to start from the snapshot.

func (*SnapshotRequest) Decode

func (req *SnapshotRequest) Decode(r io.Reader) (int, error)

Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotRequest) Encode

func (req *SnapshotRequest) Encode(w io.Writer) (int, error)

Encodes the SnapshotRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type SnapshotResponse

type SnapshotResponse struct {
	Success bool `json:"success"`
}

The response returned if the follower entered snapshot state

func (*SnapshotResponse) Decode

func (resp *SnapshotResponse) Decode(r io.Reader) (int, error)

Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotResponse) Encode

func (resp *SnapshotResponse) Encode(w io.Writer) (int, error)

Encodes the SnapshotResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

type StateMachine

type StateMachine interface {
	Save() ([]byte, error)
	Recovery([]byte) error
}

StateMachine is the interface for allowing the host application to save and recovery the state machine. This makes it possible to make snapshots and compact the log.

type Transporter

type Transporter interface {
	SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
	SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
	SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
	SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
}

Transporter is the interface for allowing the host application to transport requests to other nodes.

Source Files

append_entries.go command.go commands.go config.go context.go debug.go event.go event_dispatcher.go http_transporter.go log.go log_entry.go peer.go request_vote.go server.go snapshot.go statemachine.go test.go transporter.go util.go

Directories

PathSynopsis
protobuf
Version
v0.0.0-20140324040310-67dca7288f16 (latest)
Published
Mar 24, 2014
Platform
linux/amd64
Imports
20 packages
Last checked
3 hours ago

Tools for package owners.