package micro

import "github.com/nats-io/nats.go/micro"

Example

Code:

{
	s := RunServerOnPort(-1)
	defer s.Shutdown()

	nc, err := nats.Connect(s.ClientURL())
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	// Service handler is a function which takes Service.Request as argument.
	// req.Respond or req.Error should be used to respond to the request.
	incrementHandler := func(req *Request) {
		val, err := strconv.Atoi(string(req.Data()))
		if err != nil {
			req.Error("400", "request data should be a number", nil)
			return
		}

		responseData := val + 1
		req.Respond([]byte(strconv.Itoa(responseData)))
	}

	config := Config{
		Name:        "IncrementService",
		Version:     "0.1.0",
		Description: "Increment numbers",
		Endpoint: Endpoint{
			// service handler
			Handler: incrementHandler,
			// a unique subject serving as a service endpoint
			Subject: "numbers.increment",
		},
	}
	// Multiple instances of the servcice with the same name can be created.
	// Requests to a service with the same name will be load-balanced.
	for i := 0; i < 5; i++ {
		svc, err := AddService(nc, config)
		if err != nil {
			log.Fatal(err)
		}
		defer svc.Stop()
	}

	// send a request to a service
	resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second)
	if err != nil {
		log.Fatal(err)
	}
	responseVal, err := strconv.Atoi(string(resp.Data))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(responseVal)

	//
	// Output: 4
	//
}

Output:

4

Index

Examples

Constants

const (
	// Queue Group name used across all services
	QG = "q"

	// APIPrefix is the root of all control subjects
	APIPrefix = "$SRV"
)
const (
	ErrorHeader     = "Nats-Service-Error"
	ErrorCodeHeader = "Nats-Service-Error-Code"
)

Service Error headers

Variables

var (
	ErrRespond         = errors.New("NATS error when sending response")
	ErrMarshalResponse = errors.New("marshaling response")
	ErrArgRequired     = errors.New("argument required")
)
var (
	// ErrConfigValidation is returned when service configuration is invalid
	ErrConfigValidation = errors.New("validation")

	// ErrVerbNotSupported is returned when invalid [Verb] is used (PING, SCHEMA, INFO, STATS)
	ErrVerbNotSupported = errors.New("unsupported verb")

	// ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name
	ErrServiceNameRequired = errors.New("service name is required to generate ID control subject")
)

Common errors returned by the Service framework.

Functions

func ControlSubject

func ControlSubject(verb Verb, name, id string) (string, error)

ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Schema, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:

Example

Code:

{

	// subject used to get PING from all services
	subjectPINGAll, _ := ControlSubject(PingVerb, "", "")
	fmt.Println(subjectPINGAll)

	// subject used to get PING from services with provided name
	subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "")
	fmt.Println(subjectPINGName)

	// subject used to get PING from a service with provided name and ID
	subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123")
	fmt.Println(subjectPINGInstance)

	// Output:
	// $SRV.PING
	// $SRV.PING.CoolService
	// $SRV.PING.CoolService.123
}

Output:

$SRV.PING
$SRV.PING.CoolService
$SRV.PING.CoolService.123

Types

type Config

type Config struct {
	Name         string   `json:"name"`
	Version      string   `json:"version"`
	Description  string   `json:"description"`
	Schema       Schema   `json:"schema"`
	Endpoint     Endpoint `json:"endpoint"`
	StatsHandler StatsHandler
	DoneHandler  DoneHandler
	ErrorHandler ErrHandler
}

Config is a configuration of a service.

type DoneHandler

type DoneHandler func(Service)

DoneHandler is a function used to configure a custom done handler for a service.

type Endpoint

type Endpoint struct {
	Subject string `json:"subject"`
	Handler RequestHandler
}

Endpoint is used to configure a subject and handler for a service.

type ErrHandler

type ErrHandler func(Service, *NATSError)

ErrHandler is a function used to configure a custom error handler for a service,

type Headers

type Headers nats.Header

Headers is a wrapper around *nats.Header

func (Headers) Get

func (h Headers) Get(key string) string

Get gets the first value associated with the given key. It is case-sensitive.

func (Headers) Values

func (h Headers) Values(key string) []string

Values returns all values associated with the given key. It is case-sensitive.

type Info

type Info struct {
	ServiceIdentity
	Description string `json:"description"`
	Subject     string `json:"subject"`
}

Info is the basic information about a service type.

type NATSError

type NATSError struct {
	Subject     string
	Description string
}

NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.

func (*NATSError) Error

func (e *NATSError) Error() string

type Ping

type Ping ServiceIdentity

Ping is the response type for PING monitoring endpoint.

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.

func (*Request) Data

func (r *Request) Data() []byte

func (*Request) Error

func (r *Request) Error(code, description string, data []byte, opts ...RespondOpt) error

Error prepares and publishes error response from a handler. A response error should be set containing an error code and description. Optionally, data can be set as response payload.

Example

Code:

{
	handler := func(req *Request) {
		// respond with an error
		// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
		if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
			log.Fatal(err)
		}
	}

	fmt.Printf("%T", handler)
}

func (*Request) Headers

func (r *Request) Headers() Headers

func (*Request) Respond

func (r *Request) Respond(response []byte, opts ...RespondOpt) error
Example

Code:

{
	handler := func(req *Request) {
		// respond to the request
		if err := req.Respond(req.Data()); err != nil {
			log.Fatal(err)
		}
	}

	fmt.Printf("%T", handler)
}

func (*Request) RespondJSON

func (r *Request) RespondJSON(response interface{}, opts ...RespondOpt) error
Example

Code:

{
	type Point struct {
		X int `json:"x"`
		Y int `json:"y"`
	}

	handler := func(req *Request) {
		resp := Point{5, 10}
		// respond to the request
		// response will be serialized to {"x":5,"y":10}
		if err := req.RespondJSON(resp); err != nil {
			log.Fatal(err)
		}
	}

	fmt.Printf("%T", handler)
}

type RequestHandler

type RequestHandler func(*Request)

RequestHandler is a function used as a Handler for a service.

type RespondOpt

type RespondOpt func(*nats.Msg)

RespondOpt is a

func WithHeaders

func WithHeaders(headers Headers) RespondOpt

type Schema

type Schema struct {
	Request  string `json:"request"`
	Response string `json:"response"`
}

Schema can be used to configure a schema for a service. It is olso returned by the SCHEMA monitoring service (if set).

type SchemaResp

type SchemaResp struct {
	ServiceIdentity
	Schema Schema `json:"schema"`
}

SchemaResp is the response value for SCHEMA requests.

type Service

type Service interface {
	// Info returns the service info.
	Info() Info

	// Stats returns statisctics for the service endpoint and all monitoring endpoints.
	Stats() Stats

	// Reset resets all statistics on a service instance.
	Reset()

	// Stop drains the endpoint subscriptions and marks the service as stopped.
	Stop() error

	// Stopped informs whether [Stop] was executed on the service.
	Stopped() bool
}

func AddService

func AddService(nc *nats.Conn, config Config) (Service, error)

AddService adds a microservice. It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as the actual service handler on the subject provided in config.Endpoint A service name, version and Endpoint configuration are required to add a service. AddService returns a Service interface, allowing service menagement. Each service is assigned a unique ID.

Example

Code:

{
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	echoHandler := func(req *Request) {
		req.Respond(req.Data())
	}

	config := Config{
		Name:        "EchoService",
		Version:     "v1.0.0",
		Description: "Send back what you receive",
		Endpoint: Endpoint{
			Subject: "echo",
			Handler: echoHandler,
		},

		// DoneHandler can be set to customize behavior on stopping a service.
		DoneHandler: func(srv Service) {
			info := srv.Info()
			fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
		},

		// ErrorHandler can be used to customize behavior on service execution error.
		ErrorHandler: func(srv Service, err *NATSError) {
			info := srv.Info()
			fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
		},
	}

	srv, err := AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer srv.Stop()
}

type ServiceIdentity

type ServiceIdentity struct {
	Name    string `json:"name"`
	ID      string `json:"id"`
	Version string `json:"version"`
}

ServiceIdentity contains fields helping to identidy a service instance.

type Stats

type Stats struct {
	ServiceIdentity
	NumRequests           int             `json:"num_requests"`
	NumErrors             int             `json:"num_errors"`
	LastError             string          `json:"last_error"`
	ProcessingTime        time.Duration   `json:"processing_time"`
	AverageProcessingTime time.Duration   `json:"average_processing_time"`
	Started               string          `json:"started"`
	Data                  json.RawMessage `json:"data,omitempty"`
}

Stats is the type returned by STATS monitoring endpoint. It contains stats for a specific endpoint (either request handler or monitoring enpoints).

type StatsHandler

type StatsHandler func(Endpoint) interface{}

StatsHandleris a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.

type Verb

type Verb int64

Verb represents a name of the monitoring service.

const (
	PingVerb Verb = iota
	StatsVerb
	InfoVerb
	SchemaVerb
)

Verbs being used to set up a specific control subject.

func (Verb) String

func (s Verb) String() string

Source Files

request.go service.go

Version
v1.22.1
Published
Dec 22, 2022
Platform
windows/amd64
Imports
8 packages
Last checked
20 minutes ago

Tools for package owners.