package micro
import "github.com/nats-io/nats.go/micro"
Example¶
Code:
{ nc, err := nats.Connect("127.0.0.1:4222") if err != nil { log.Fatal(err) } defer nc.Close() // endpoint handler - in this case, HandlerFunc is used, // which is a built-in implementation of Handler interface echoHandler := func(req micro.Request) { req.Respond(req.Data()) } // second endpoint incrementHandler := func(req micro.Request) { val, err := strconv.Atoi(string(req.Data())) if err != nil { req.Error("400", "request data should be a number", nil) return } responseData := val + 1 req.Respond([]byte(strconv.Itoa(responseData))) } // third endpoint multiplyHandler := func(req micro.Request) { val, err := strconv.Atoi(string(req.Data())) if err != nil { req.Error("400", "request data should be a number", nil) return } responseData := val * 2 req.Respond([]byte(strconv.Itoa(responseData))) } config := micro.Config{ Name: "IncrementService", Version: "0.1.0", Description: "Increment numbers", // base handler - for simple services with single endpoints this is sufficient Endpoint: µ.EndpointConfig{ Subject: "echo", Handler: micro.HandlerFunc(echoHandler), }, } svc, err := micro.AddService(nc, config) if err != nil { log.Fatal(err) } defer svc.Stop() // add a group to aggregate endpoints under common prefix numbers := svc.AddGroup("numbers") // register endpoints in a group err = numbers.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) if err != nil { log.Fatal(err) } err = numbers.AddEndpoint("Multiply", micro.HandlerFunc(multiplyHandler)) if err != nil { log.Fatal(err) } // send a request to a service resp, err := nc.Request("numbers.Increment", []byte("3"), 1*time.Second) if err != nil { log.Fatal(err) } responseVal, err := strconv.Atoi(string(resp.Data)) if err != nil { log.Fatal(err) } fmt.Println(responseVal) }
Index ¶
- Constants
- Variables
- func ControlSubject(verb Verb, name, id string) (string, error)
- type Config
- type DoneHandler
- type Endpoint
- type EndpointConfig
- type EndpointInfo
- type EndpointOpt
- func WithEndpointMetadata(metadata map[string]string) EndpointOpt
- func WithEndpointQueueGroup(queueGroup string) EndpointOpt
- func WithEndpointQueueGroupDisabled() EndpointOpt
- func WithEndpointSubject(subject string) EndpointOpt
- type EndpointStats
- type ErrHandler
- type Group
- type GroupOpt
- type Handler
- type HandlerFunc
- type Headers
- type Info
- type NATSError
- type Ping
- type Request
- type RespondOpt
- type Service
- type ServiceIdentity
- type Stats
- type StatsHandler
- type Verb
Examples ¶
Constants ¶
const ( // Queue Group name used across all services DefaultQueueGroup = "q" // APIPrefix is the root of all control subjects APIPrefix = "$SRV" )
const ( ErrorHeader = "Nats-Service-Error" ErrorCodeHeader = "Nats-Service-Error-Code" )
Service Error headers
const ( InfoResponseType = "io.nats.micro.v1.info_response" PingResponseType = "io.nats.micro.v1.ping_response" StatsResponseType = "io.nats.micro.v1.stats_response" )
Variables ¶
var ( ErrRespond = errors.New("NATS error when sending response") ErrMarshalResponse = errors.New("marshaling response") ErrArgRequired = errors.New("argument required") )
var ( // ErrConfigValidation is returned when service configuration is invalid ErrConfigValidation = errors.New("validation") // ErrVerbNotSupported is returned when invalid [Verb] is used (PING, INFO, STATS) ErrVerbNotSupported = errors.New("unsupported verb") // ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name ErrServiceNameRequired = errors.New("service name is required to generate ID control subject") )
Common errors returned by the Service framework.
Functions ¶
func ControlSubject ¶
ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:
- verb only: subject used to monitor all available services
- verb and kind: subject used to monitor services with the provided name
- verb, name and id: subject used to monitor an instance of a service with the provided ID
Example¶
Code:play
package main import ( "fmt" "github.com/nats-io/nats.go/micro" ) func main() { // subject used to get PING from all services subjectPINGAll, _ := micro.ControlSubject(micro.PingVerb, "", "") fmt.Println(subjectPINGAll) // subject used to get PING from services with provided name subjectPINGName, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "") fmt.Println(subjectPINGName) // subject used to get PING from a service with provided name and ID subjectPINGInstance, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "123") fmt.Println(subjectPINGInstance) }
Output:
$SRV.PING $SRV.PING.CoolService $SRV.PING.CoolService.123
Types ¶
type Config ¶
type Config struct { // Name represents the name of the service. Name string `json:"name"` // Endpoint is an optional endpoint configuration. // More complex, multi-endpoint services can be configured using // Service.AddGroup and Service.AddEndpoint methods. Endpoint *EndpointConfig `json:"endpoint"` // Version is a SemVer compatible version string. Version string `json:"version"` // Description of the service. Description string `json:"description"` // Metadata annotates the service Metadata map[string]string `json:"metadata,omitempty"` // QueueGroup can be used to override the default queue group name. QueueGroup string `json:"queue_group"` // QueueGroupDisabled disables the queue group for the service. QueueGroupDisabled bool `json:"queue_group_disabled"` // StatsHandler is a user-defined custom function. // used to calculate additional service stats. StatsHandler StatsHandler // DoneHandler is invoked when all service subscription are stopped. DoneHandler DoneHandler // ErrorHandler is invoked on any nats-related service error. ErrorHandler ErrHandler }
Config is a configuration of a service.
type DoneHandler ¶
type DoneHandler func(Service)
DoneHandler is a function used to configure a custom done handler for a service.
type Endpoint ¶
type Endpoint struct { EndpointConfig Name string // contains filtered or unexported fields }
Endpoint manages a service endpoint.
type EndpointConfig ¶
type EndpointConfig struct { // Subject on which the endpoint is registered. Subject string // Handler used by the endpoint. Handler Handler // Metadata annotates the service Metadata map[string]string `json:"metadata,omitempty"` // QueueGroup can be used to override the default queue group name. QueueGroup string `json:"queue_group"` // QueueGroupDisabled disables the queue group for the endpoint. QueueGroupDisabled bool `json:"queue_group_disabled"` }
type EndpointInfo ¶
type EndpointInfo struct { Name string `json:"name"` Subject string `json:"subject"` QueueGroup string `json:"queue_group"` Metadata map[string]string `json:"metadata"` }
type EndpointOpt ¶
type EndpointOpt func(*endpointOpts) error
func WithEndpointMetadata ¶
func WithEndpointMetadata(metadata map[string]string) EndpointOpt
func WithEndpointQueueGroup ¶
func WithEndpointQueueGroup(queueGroup string) EndpointOpt
func WithEndpointQueueGroupDisabled ¶
func WithEndpointQueueGroupDisabled() EndpointOpt
func WithEndpointSubject ¶
func WithEndpointSubject(subject string) EndpointOpt
Example¶
Code:
{
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}
config := micro.Config{
Name: "EchoService",
Version: "1.0.0",
}
srv, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
// endpoint will be registered under "service.echo" subject
err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("service.echo"))
if err != nil {
log.Fatal(err)
}
}
type EndpointStats ¶
type EndpointStats struct { Name string `json:"name"` Subject string `json:"subject"` QueueGroup string `json:"queue_group"` NumRequests int `json:"num_requests"` NumErrors int `json:"num_errors"` LastError string `json:"last_error"` ProcessingTime time.Duration `json:"processing_time"` AverageProcessingTime time.Duration `json:"average_processing_time"` Data json.RawMessage `json:"data,omitempty"` }
EndpointStats contains stats for a specific endpoint.
type ErrHandler ¶
ErrHandler is a function used to configure a custom error handler for a service,
type Group ¶
type Group interface { // AddGroup creates a new group, prefixed by this group's prefix. AddGroup(string, ...GroupOpt) Group // AddEndpoint registers new endpoints on a service. // The endpoint's subject will be prefixed with the group prefix. AddEndpoint(string, Handler, ...EndpointOpt) error }
Group allows for grouping endpoints on a service.
Endpoints created using AddEndpoint will be grouped under common prefix (group name) New groups can also be derived from a group using AddGroup.
type GroupOpt ¶
type GroupOpt func(*groupOpts)
func WithGroupQueueGroup ¶
func WithGroupQueueGroupDisabled ¶
func WithGroupQueueGroupDisabled() GroupOpt
type Handler ¶
type Handler interface { Handle(Request) }
Handler is used to respond to service requests.
Code:play
Example¶
package main
import (
"log"
"strconv"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/micro"
)
type rectangle struct {
height int
width int
}
// Handle is an implementation of micro.Handler used to
// calculate the area of a rectangle
func (r rectangle) Handle(req micro.Request) {
area := r.height * r.width
req.Respond([]byte(strconv.Itoa(area)))
}
func main() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
rec := rectangle{10, 5}
config := micro.Config{
Name: "RectangleAreaService",
Version: "0.1.0",
Endpoint: µ.EndpointConfig{
Subject: "area.rectangle",
Handler: rec,
},
}
svc, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer svc.Stop()
}
func ContextHandler ¶
ContextHandler is a helper function used to utilize context.Context
in request handlers.
Code:
Example¶
{
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
handler := func(ctx context.Context, req micro.Request) {
select {
case <-ctx.Done():
req.Error("400", "context canceled", nil)
default:
req.Respond([]byte("ok"))
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := micro.Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: µ.EndpointConfig{
Subject: "echo",
Handler: micro.ContextHandler(ctx, handler),
},
}
srv, _ := micro.AddService(nc, config)
defer srv.Stop()
}
type HandlerFunc ¶
type HandlerFunc func(Request)
HandlerFunc is a function implementing Handler. It allows using a function as a request handler, without having to implement Handle on a separate type.
func (HandlerFunc) Handle ¶
func (fn HandlerFunc) Handle(req Request)
type Headers ¶
type Headers nats.Header
Headers is a wrapper around *nats.Header
func (Headers) Get ¶
Get gets the first value associated with the given key. It is case-sensitive.
func (Headers) Values ¶
Values returns all values associated with the given key. It is case-sensitive.
type Info ¶
type Info struct { ServiceIdentity Type string `json:"type"` Description string `json:"description"` Endpoints []EndpointInfo `json:"endpoints"` }
Info is the basic information about a service type.
type NATSError ¶
NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.
func (*NATSError) Error ¶
type Ping ¶
type Ping struct { ServiceIdentity Type string `json:"type"` }
Ping is the response type for PING monitoring endpoint.
type Request ¶
type Request interface { // Respond sends the response for the request. // Additional headers can be passed using [WithHeaders] option. Respond([]byte, ...RespondOpt) error // RespondJSON marshals the given response value and responds to the request. // Additional headers can be passed using [WithHeaders] option. RespondJSON(any, ...RespondOpt) error // Error prepares and publishes error response from a handler. // A response error should be set containing an error code and description. // Optionally, data can be set as response payload. Error(code, description string, data []byte, opts ...RespondOpt) error // Data returns request data. Data() []byte // Headers returns request headers. Headers() Headers // Subject returns underlying NATS message subject. Subject() string // Reply returns underlying NATS message reply subject. Reply() string }
Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.
type RespondOpt ¶
type RespondOpt func(*nats.Msg)
RespondOpt is a function used to configure [Request.Respond] and [Request.RespondJSON] methods.
func WithHeaders ¶
func WithHeaders(headers Headers) RespondOpt
WithHeaders can be used to configure response with custom headers.
type Service ¶
type Service interface { // AddEndpoint registers endpoint with given name on a specific subject. AddEndpoint(string, Handler, ...EndpointOpt) error // AddGroup returns a Group interface, allowing for more complex endpoint topologies. // A group can be used to register endpoints with given prefix. AddGroup(string, ...GroupOpt) Group // Info returns the service info. Info() Info // Stats returns statistics for the service endpoint and all monitoring endpoints. Stats() Stats // Reset resets all statistics (for all endpoints) on a service instance. Reset() // Stop drains the endpoint subscriptions and marks the service as stopped. Stop() error // Stopped informs whether [Stop] was executed on the service. Stopped() bool }
Service exposes methods to operate on a service instance.
func AddService ¶
AddService adds a microservice.
It will enable internal common services (PING, STATS and INFO).
Request handlers have to be registered separately using Service.AddEndpoint.
A service name, version and Endpoint configuration are required to add a service.
AddService returns a Service interface, allowing service management.
Each service is assigned a unique ID.
Code:
Example¶
{
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}
config := micro.Config{
Name: "EchoService",
Version: "1.0.0",
Description: "Send back what you receive",
// DoneHandler can be set to customize behavior on stopping a service.
DoneHandler: func(srv micro.Service) {
info := srv.Info()
fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
},
// ErrorHandler can be used to customize behavior on service execution error.
ErrorHandler: func(srv micro.Service, err *micro.NATSError) {
info := srv.Info()
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
},
// optional base handler
Endpoint: µ.EndpointConfig{
Subject: "echo",
Handler: micro.HandlerFunc(echoHandler),
},
}
srv, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer srv.Stop()
}
type ServiceIdentity ¶
type ServiceIdentity struct { Name string `json:"name"` ID string `json:"id"` Version string `json:"version"` Metadata map[string]string `json:"metadata"` }
ServiceIdentity contains fields helping to identity a service instance.
type Stats ¶
type Stats struct { ServiceIdentity Type string `json:"type"` Started time.Time `json:"started"` Endpoints []*EndpointStats `json:"endpoints"` }
Stats is the type returned by STATS monitoring endpoint. It contains stats of all registered endpoints.
type StatsHandler ¶
StatsHandler is a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.
type Verb ¶
type Verb int64
Verb represents a name of the monitoring service.
Verbs being used to set up a specific control subject.
func (Verb) String ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
micro/test |
- Version
- v1.41.2 (latest)
- Published
- Apr 17, 2025
- Platform
- linux/amd64
- Imports
- 10 packages
- Last checked
- 1 day ago –
Tools for package owners.