package session
import "go.dedis.ch/dela/mino/minogrpc/session"
Package session defines an abstraction of a session during a distributed RPC.
During a stream-based distributed RPC in minogrpc, the stream is kept alive during the whole protocol to act as a health check so that resources can be cleaned eventually, or if something goes wrong. The session manages this state while also managing the relays to other participants that the node must forward the messages to. Basically, a session has one or several relays open to the parent nodes and zero, one or multiple relays to other participants depending on the routing of the messages.
The package implements a unicast and a stream relay. Stream relay is only used when the orchestrator of a protocol is connecting to the first participant. Unicast is then used so that the sender of a message can receive feedbacks on the status of the message.
Documentation Last Review: 07.10.20202
Index ¶
- Constants
- type Address
- func NewAddress(host string) Address
- func NewAddressFromURL(addr url.URL) (a Address, err error)
- func NewOrchestratorAddress(addr mino.Address) Address
- func (a Address) ConnectionType() mino.AddressConnectionType
- func (a Address) Equal(other mino.Address) bool
- func (a Address) GetDialAddress() string
- func (a Address) GetHostname() (string, error)
- func (a Address) MarshalText() ([]byte, error)
- func (a Address) String() string
- type AddressFactory
- type ConnectionManager
- type NonBlockingQueue
- func (q *NonBlockingQueue) Channel() <-chan router.Packet
- func (q *NonBlockingQueue) Push(msg router.Packet) error
- type PacketStream
- type Queue
- type Relay
- func NewRelay( stream PacketStream, gw mino.Address, ctx serde.Context, conn grpc.ClientConnInterface, md metadata.MD, ) Relay
- func NewStreamRelay(gw mino.Address, stream PacketStream, ctx serde.Context) Relay
- type Session
Constants ¶
const HandshakeKey = "handshake"
HandshakeKey is the key to the handshake store in the headers.
Types ¶
type Address ¶
type Address struct {
// contains filtered or unexported fields
}
Address is a representation of the network Address of a participant. The overlay implementation requires a difference between an orchestrator and its source address, where the former initiates a protocol and the latter participates.
See session.wrapAddress for the abstraction provided to a caller external to the overlay module.
- implements mino.Address
func NewAddress ¶
NewAddress creates a new address.
func NewAddressFromURL ¶
NewAddressFromURL creates a new address given a URL.
func NewOrchestratorAddress ¶
NewOrchestratorAddress creates a new address which will be considered as the initiator of a protocol.
func (Address) ConnectionType ¶
func (a Address) ConnectionType() mino.AddressConnectionType
ConnectionType returns how to connect to the other host
func (Address) Equal ¶
Equal implements 'mino.Address'. It returns true if both addresses are exactly similar, in the sense that an orchestrator won't match a follower address with the same host.
func (Address) GetDialAddress ¶
GetDialAddress returns a string formatted to be understood by grpc.Dial() functions.
func (Address) GetHostname ¶
GetHostname parses the address to extract the hostname.
func (Address) MarshalText ¶
MarshalText implements mino.Address. It returns the text format of the address that can later be deserialized.
func (Address) String ¶
String implements fmt.Stringer. It returns a string representation of the address.
type AddressFactory ¶
AddressFactory is a factory for addresses.
- implements mino.AddressFactory
func (AddressFactory) FromText ¶
func (f AddressFactory) FromText(buf []byte) mino.Address
FromText implements mino.AddressFactory. It returns an instance of an address from a byte slice.
type ConnectionManager ¶
type ConnectionManager interface { Len() int Acquire(mino.Address) (grpc.ClientConnInterface, error) Release(mino.Address) }
ConnectionManager is an interface required by the session to open and release connections to the relays.
type NonBlockingQueue ¶
NonBlockingQueue is an implementation of a queue that makes sure pushing a message will never hang. The queue will fill a buffer if the channel is not drained and will drop messages when the limit is reached.
- implements session.Queue
func (*NonBlockingQueue) Channel ¶
func (q *NonBlockingQueue) Channel() <-chan router.Packet
Channel implements session.Queue. It returns a channel that will be populated with incoming messages. The queue uses a buffer when the channel is busy therefore this channel should listened to as much as possible to drain the messages. At some point when the size of the buffer reaches a limit, messages will be dropped.
func (*NonBlockingQueue) Push ¶
func (q *NonBlockingQueue) Push(msg router.Packet) error
Push implements session.Queue. It appends the message to the queue without blocking. The message is dropped if the queue is at maximum capacity by returning an error.
type PacketStream ¶
type PacketStream interface { Context() context.Context Send(*ptypes.Packet) error Recv() (*ptypes.Packet, error) }
PacketStream is a gRPC stream to send and receive protobuf packets.
type Queue ¶
Queue is an interface to queue messages.
type Relay ¶
type Relay interface { // GetDistantAddress returns the address of the peer at the other end of the // relay. GetDistantAddress() mino.Address // Stream returns the stream that is holding the relay. Stream() PacketStream // Send sends a packet through the relay. Send(ctx context.Context, p router.Packet) (*ptypes.Ack, error) // Close closes the relay and clean the resources. Close() error }
Relay is the interface of the relays spawn by the session when trying to contact a child node.
func NewRelay ¶
func NewRelay( stream PacketStream, gw mino.Address, ctx serde.Context, conn grpc.ClientConnInterface, md metadata.MD, ) Relay
NewRelay returns a new relay that will send messages to the gateway through unicast requests.
func NewStreamRelay ¶
NewStreamRelay creates a new relay that will send the packets through the stream.
type Session ¶
type Session interface { mino.Sender mino.Receiver // GetNumParents returns the number of active parents for the session. GetNumParents() int // Listen takes a stream that will determine when to close the session. Listen(parent Relay, table router.RoutingTable, ready chan struct{}) // SetPassive sets a new passive parent. A passive parent is part of the // parent relays, but the stream does not listen to, and thus it is not // removed from the map if it closed. SetPassive(parent Relay, table router.RoutingTable) // Close shutdowns the session so that future calls to receive will return // an error. Close() // RecvPacket takes a packet and the address of the distant peer that have // sent it, then pass it to the correct relay according to the routing // table. RecvPacket(from mino.Address, p *ptypes.Packet) (*ptypes.Ack, error) }
Session is an interface for a stream session that allows to send messages to the parent and relays, while receiving the ones for the local address.
func NewSession ¶
func NewSession( md metadata.MD, me mino.Address, msgFac serde.Factory, pktFac router.PacketFactory, ctx serde.Context, connMgr ConnectionManager, ) Session
NewSession creates a new session for the provided parent relay.
Source Files ¶
addr.go mod.go queue.go
- Version
- v0.1.0 (latest)
- Published
- Apr 10, 2024
- Platform
- linux/amd64
- Imports
- 21 packages
- Last checked
- 1 month ago –
Tools for package owners.