package micro

import "github.com/nats-io/nats.go/micro"

Example

Code:

{
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	// endpoint handler - in this case, HandlerFunc is used,
	// which is a built-in implementation of Handler interface
	echoHandler := func(req micro.Request) {
		req.Respond(req.Data())
	}

	// second endpoint
	incrementHandler := func(req micro.Request) {
		val, err := strconv.Atoi(string(req.Data()))
		if err != nil {
			req.Error("400", "request data should be a number", nil)
			return
		}

		responseData := val + 1
		req.Respond([]byte(strconv.Itoa(responseData)))
	}

	// third endpoint
	multiplyHandler := func(req micro.Request) {
		val, err := strconv.Atoi(string(req.Data()))
		if err != nil {
			req.Error("400", "request data should be a number", nil)
			return
		}

		responseData := val * 2
		req.Respond([]byte(strconv.Itoa(responseData)))
	}

	config := micro.Config{
		Name:        "IncrementService",
		Version:     "0.1.0",
		Description: "Increment numbers",

		// base handler - for simple services with single endpoints this is sufficient
		Endpoint: &micro.EndpointConfig{
			Subject: "echo",
			Handler: micro.HandlerFunc(echoHandler),
		},
	}
	svc, err := micro.AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer svc.Stop()

	// add a group to aggregate endpoints under common prefix
	numbers := svc.AddGroup("numbers")

	// register endpoints in a group
	err = numbers.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler))
	if err != nil {
		log.Fatal(err)
	}
	err = numbers.AddEndpoint("Multiply", micro.HandlerFunc(multiplyHandler))
	if err != nil {
		log.Fatal(err)
	}

	// send a request to a service
	resp, err := nc.Request("numbers.Increment", []byte("3"), 1*time.Second)
	if err != nil {
		log.Fatal(err)
	}
	responseVal, err := strconv.Atoi(string(resp.Data))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(responseVal)
}

Index

Examples

Constants

const (
	// Queue Group name used across all services
	DefaultQueueGroup = "q"

	// APIPrefix is the root of all control subjects
	APIPrefix = "$SRV"
)
const (
	ErrorHeader     = "Nats-Service-Error"
	ErrorCodeHeader = "Nats-Service-Error-Code"
)

Service Error headers

const (
	InfoResponseType  = "io.nats.micro.v1.info_response"
	PingResponseType  = "io.nats.micro.v1.ping_response"
	StatsResponseType = "io.nats.micro.v1.stats_response"
)

Variables

var (
	ErrRespond         = errors.New("NATS error when sending response")
	ErrMarshalResponse = errors.New("marshaling response")
	ErrArgRequired     = errors.New("argument required")
)
var (
	// ErrConfigValidation is returned when service configuration is invalid
	ErrConfigValidation = errors.New("validation")

	// ErrVerbNotSupported is returned when invalid [Verb] is used (PING, INFO, STATS)
	ErrVerbNotSupported = errors.New("unsupported verb")

	// ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name
	ErrServiceNameRequired = errors.New("service name is required to generate ID control subject")
)

Common errors returned by the Service framework.

Functions

func ControlSubject

func ControlSubject(verb Verb, name, id string) (string, error)

ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:

Example

Code:play 

package main

import (
	"fmt"

	"github.com/nats-io/nats.go/micro"
)

func main() {

	// subject used to get PING from all services
	subjectPINGAll, _ := micro.ControlSubject(micro.PingVerb, "", "")
	fmt.Println(subjectPINGAll)

	// subject used to get PING from services with provided name
	subjectPINGName, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "")
	fmt.Println(subjectPINGName)

	// subject used to get PING from a service with provided name and ID
	subjectPINGInstance, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "123")
	fmt.Println(subjectPINGInstance)

}

Output:

$SRV.PING
$SRV.PING.CoolService
$SRV.PING.CoolService.123

Types

type Config

type Config struct {
	// Name represents the name of the service.
	Name string `json:"name"`

	// Endpoint is an optional endpoint configuration.
	// More complex, multi-endpoint services can be configured using
	// Service.AddGroup and Service.AddEndpoint methods.
	Endpoint *EndpointConfig `json:"endpoint"`

	// Version is a SemVer compatible version string.
	Version string `json:"version"`

	// Description of the service.
	Description string `json:"description"`

	// Metadata annotates the service
	Metadata map[string]string `json:"metadata,omitempty"`

	// QueueGroup can be used to override the default queue group name.
	QueueGroup string `json:"queue_group"`

	// QueueGroupDisabled disables the queue group for the service.
	QueueGroupDisabled bool `json:"queue_group_disabled"`

	// StatsHandler is a user-defined custom function.
	// used to calculate additional service stats.
	StatsHandler StatsHandler

	// DoneHandler is invoked when all service subscription are stopped.
	DoneHandler DoneHandler

	// ErrorHandler is invoked on any nats-related service error.
	ErrorHandler ErrHandler
}

Config is a configuration of a service.

type DoneHandler

type DoneHandler func(Service)

DoneHandler is a function used to configure a custom done handler for a service.

type Endpoint

type Endpoint struct {
	EndpointConfig
	Name string
	// contains filtered or unexported fields
}

Endpoint manages a service endpoint.

type EndpointConfig

type EndpointConfig struct {
	// Subject on which the endpoint is registered.
	Subject string

	// Handler used by the endpoint.
	Handler Handler

	// Metadata annotates the service
	Metadata map[string]string `json:"metadata,omitempty"`

	// QueueGroup can be used to override the default queue group name.
	QueueGroup string `json:"queue_group"`

	// QueueGroupDisabled disables the queue group for the endpoint.
	QueueGroupDisabled bool `json:"queue_group_disabled"`
}

type EndpointInfo

type EndpointInfo struct {
	Name       string            `json:"name"`
	Subject    string            `json:"subject"`
	QueueGroup string            `json:"queue_group"`
	Metadata   map[string]string `json:"metadata"`
}

type EndpointOpt

type EndpointOpt func(*endpointOpts) error

func WithEndpointMetadata

func WithEndpointMetadata(metadata map[string]string) EndpointOpt

func WithEndpointQueueGroup

func WithEndpointQueueGroup(queueGroup string) EndpointOpt

func WithEndpointQueueGroupDisabled

func WithEndpointQueueGroupDisabled() EndpointOpt

func WithEndpointSubject

func WithEndpointSubject(subject string) EndpointOpt
Example

Code:

{
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	echoHandler := func(req micro.Request) {
		req.Respond(req.Data())
	}

	config := micro.Config{
		Name:    "EchoService",
		Version: "1.0.0",
	}

	srv, err := micro.AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}

	// endpoint will be registered under "service.echo" subject
	err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("service.echo"))
	if err != nil {
		log.Fatal(err)
	}
}

type EndpointStats

type EndpointStats struct {
	Name                  string          `json:"name"`
	Subject               string          `json:"subject"`
	QueueGroup            string          `json:"queue_group"`
	NumRequests           int             `json:"num_requests"`
	NumErrors             int             `json:"num_errors"`
	LastError             string          `json:"last_error"`
	ProcessingTime        time.Duration   `json:"processing_time"`
	AverageProcessingTime time.Duration   `json:"average_processing_time"`
	Data                  json.RawMessage `json:"data,omitempty"`
}

EndpointStats contains stats for a specific endpoint.

type ErrHandler

type ErrHandler func(Service, *NATSError)

ErrHandler is a function used to configure a custom error handler for a service,

type Group

type Group interface {
	// AddGroup creates a new group, prefixed by this group's prefix.
	AddGroup(string, ...GroupOpt) Group

	// AddEndpoint registers new endpoints on a service.
	// The endpoint's subject will be prefixed with the group prefix.
	AddEndpoint(string, Handler, ...EndpointOpt) error
}

Group allows for grouping endpoints on a service.

Endpoints created using AddEndpoint will be grouped under common prefix (group name) New groups can also be derived from a group using AddGroup.

type GroupOpt

type GroupOpt func(*groupOpts)

func WithGroupQueueGroup

func WithGroupQueueGroup(queueGroup string) GroupOpt

func WithGroupQueueGroupDisabled

func WithGroupQueueGroupDisabled() GroupOpt

type Handler

type Handler interface {
	Handle(Request)
}

Handler is used to respond to service requests.

Example

Code:play 

package main

import (
	"log"
	"strconv"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/micro"
)

type rectangle struct {
	height int
	width  int
}

// Handle is an implementation of micro.Handler used to
// calculate the area of a rectangle
func (r rectangle) Handle(req micro.Request) {
	area := r.height * r.width
	req.Respond([]byte(strconv.Itoa(area)))
}

func main() {
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	rec := rectangle{10, 5}

	config := micro.Config{
		Name:    "RectangleAreaService",
		Version: "0.1.0",
		Endpoint: &micro.EndpointConfig{
			Subject: "area.rectangle",
			Handler: rec,
		},
	}
	svc, err := micro.AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer svc.Stop()
}

func ContextHandler

func ContextHandler(ctx context.Context, handler func(context.Context, Request)) Handler

ContextHandler is a helper function used to utilize context.Context in request handlers.

Example

Code:

{
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	handler := func(ctx context.Context, req micro.Request) {
		select {
		case <-ctx.Done():
			req.Error("400", "context canceled", nil)
		default:
			req.Respond([]byte("ok"))
		}
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	config := micro.Config{
		Name:    "EchoService",
		Version: "0.1.0",
		Endpoint: &micro.EndpointConfig{
			Subject: "echo",
			Handler: micro.ContextHandler(ctx, handler),
		},
	}

	srv, _ := micro.AddService(nc, config)
	defer srv.Stop()
}

type HandlerFunc

type HandlerFunc func(Request)

HandlerFunc is a function implementing Handler. It allows using a function as a request handler, without having to implement Handle on a separate type.

func (HandlerFunc) Handle

func (fn HandlerFunc) Handle(req Request)

type Headers

type Headers nats.Header

Headers is a wrapper around *nats.Header

func (Headers) Get

func (h Headers) Get(key string) string

Get gets the first value associated with the given key. It is case-sensitive.

func (Headers) Values

func (h Headers) Values(key string) []string

Values returns all values associated with the given key. It is case-sensitive.

type Info

type Info struct {
	ServiceIdentity
	Type        string         `json:"type"`
	Description string         `json:"description"`
	Endpoints   []EndpointInfo `json:"endpoints"`
}

Info is the basic information about a service type.

type NATSError

type NATSError struct {
	Subject     string
	Description string
}

NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.

func (*NATSError) Error

func (e *NATSError) Error() string

type Ping

type Ping struct {
	ServiceIdentity
	Type string `json:"type"`
}

Ping is the response type for PING monitoring endpoint.

type Request

type Request interface {
	// Respond sends the response for the request.
	// Additional headers can be passed using [WithHeaders] option.
	Respond([]byte, ...RespondOpt) error

	// RespondJSON marshals the given response value and responds to the request.
	// Additional headers can be passed using [WithHeaders] option.
	RespondJSON(any, ...RespondOpt) error

	// Error prepares and publishes error response from a handler.
	// A response error should be set containing an error code and description.
	// Optionally, data can be set as response payload.
	Error(code, description string, data []byte, opts ...RespondOpt) error

	// Data returns request data.
	Data() []byte

	// Headers returns request headers.
	Headers() Headers

	// Subject returns underlying NATS message subject.
	Subject() string

	// Reply returns underlying NATS message reply subject.
	Reply() string
}

Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.

type RespondOpt

type RespondOpt func(*nats.Msg)

RespondOpt is a function used to configure [Request.Respond] and [Request.RespondJSON] methods.

func WithHeaders

func WithHeaders(headers Headers) RespondOpt

WithHeaders can be used to configure response with custom headers.

type Service

type Service interface {
	// AddEndpoint registers endpoint with given name on a specific subject.
	AddEndpoint(string, Handler, ...EndpointOpt) error

	// AddGroup returns a Group interface, allowing for more complex endpoint topologies.
	// A group can be used to register endpoints with given prefix.
	AddGroup(string, ...GroupOpt) Group

	// Info returns the service info.
	Info() Info

	// Stats returns statistics for the service endpoint and all monitoring endpoints.
	Stats() Stats

	// Reset resets all statistics (for all endpoints) on a service instance.
	Reset()

	// Stop drains the endpoint subscriptions and marks the service as stopped.
	Stop() error

	// Stopped informs whether [Stop] was executed on the service.
	Stopped() bool
}

Service exposes methods to operate on a service instance.

func AddService

func AddService(nc *nats.Conn, config Config) (Service, error)

AddService adds a microservice. It will enable internal common services (PING, STATS and INFO). Request handlers have to be registered separately using Service.AddEndpoint. A service name, version and Endpoint configuration are required to add a service. AddService returns a Service interface, allowing service management. Each service is assigned a unique ID.

Example

Code:

{
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	echoHandler := func(req micro.Request) {
		req.Respond(req.Data())
	}

	config := micro.Config{
		Name:        "EchoService",
		Version:     "1.0.0",
		Description: "Send back what you receive",
		// DoneHandler can be set to customize behavior on stopping a service.
		DoneHandler: func(srv micro.Service) {
			info := srv.Info()
			fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
		},

		// ErrorHandler can be used to customize behavior on service execution error.
		ErrorHandler: func(srv micro.Service, err *micro.NATSError) {
			info := srv.Info()
			fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
		},

		// optional base handler
		Endpoint: &micro.EndpointConfig{
			Subject: "echo",
			Handler: micro.HandlerFunc(echoHandler),
		},
	}

	srv, err := micro.AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer srv.Stop()
}

type ServiceIdentity

type ServiceIdentity struct {
	Name     string            `json:"name"`
	ID       string            `json:"id"`
	Version  string            `json:"version"`
	Metadata map[string]string `json:"metadata"`
}

ServiceIdentity contains fields helping to identity a service instance.

type Stats

type Stats struct {
	ServiceIdentity
	Type      string           `json:"type"`
	Started   time.Time        `json:"started"`
	Endpoints []*EndpointStats `json:"endpoints"`
}

Stats is the type returned by STATS monitoring endpoint. It contains stats of all registered endpoints.

type StatsHandler

type StatsHandler func(*Endpoint) any

StatsHandler is a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.

type Verb

type Verb int64

Verb represents a name of the monitoring service.

const (
	PingVerb Verb = iota
	StatsVerb
	InfoVerb
)

Verbs being used to set up a specific control subject.

func (Verb) String

func (s Verb) String() string

Source Files

request.go service.go

Directories

PathSynopsis
micro/test
Version
v1.41.2 (latest)
Published
Apr 17, 2025
Platform
linux/amd64
Imports
10 packages
Last checked
1 day ago

Tools for package owners.