An AsyncAPI Golang Code generator that generates all Go code from the broker to the application/user. Just plug your application to your favorite message broker!
❤️ Support is greatly appreciated and contributions are welcomed!
Inspired from popular deepmap/oapi-codegen
- Supported functionalities
- Usage
- Concepts
- Examples
- Supported Brokers
- CLI options
- Advanced topics
- Contributing and support
- AsyncAPI versions:
- 2.6.0
- 3.0.0
- Brokers:
- Kafka
- NATS / NATS JetStream
- Custom
- Formats:
- JSON
- Logging:
- Elastic Common Schema (JSON)
- Text (Humand readable)
- Custom
- Others:
- Versioning support
In order to use this library in your code, please execute the following lines:
# Install the tool
go install github.com/lerenn/asyncapi-codegen/cmd/asyncapi-codegen@latest
# Generate the code from the asyncapi file
asyncapi-codegen -i ./asyncapi.yaml -p <your-package> -o ./asyncapi.gen.go
# Install dependencies needed by the generated code
go get -u github.com/lerenn/asyncapi-codegen/pkg/extensionsYou can also specify the generation part by adding a go generate instruction
at the beginning of your file:
//go:generate go run github.com/lerenn/asyncapi-codegen/cmd/asyncapi-codegen@<version> -i ./asyncapi.yaml -p <your-package> -o ./asyncapi.gen.goYou can also use the dockerized version of this tool:
docker run -v .:/code -w /code lerenn/asyncapi-codegen asyncapi-codegen -i ./asyncapi.yaml -p <your-package> -o ./asyncapi.gen.goLet's imagine a message broker centric architecture: you have the application that you are developing on the right and the potential user(s) on the left.
Being a two directional communication, both of them can communicate to each other through the broker. They can even communicate with themselves, in case of multiple users or application replication.
For more information about this, please refere to the official AsyncAPI concepts.
- Yellow parts: when using the codegen tool, you will generate the code that will act as an adapter (called controller) between the user, the broker, and the application.
- Red parts: you will need to fill these parts between user, broker and application. These will allow message production and reception with the generated code.
- Orange parts: these parts will be available in this repository if you use an already supported broker. However, you can also use the implement it yourself if the broker is not supported yet.
Here is a list of example, from basic to advanced ones.
Please note that the examples are separated in different subdirectories per broker.
It is strongly advised to read them in present order, following your AsyncAPI version.
- HelloWorld:
- Ping:
In order to connect your broker to the autogenerated code, you will need to create a controller that will be used to publish and subscribe to messages.
You can use one of the already supported brokers or implement your own.
In order to use Kafka as a broker, you can use the following code:
broker, _ := kafka.NewController([]string{"<host>:<port>", /* additional hosts */}, /* options */)Here are the options that you can use with the Kafka controller:
WithGroupdID: specify the group ID that will be used by the controller. If not specified, default queue name (asyncapi) will be used.WithPartition: specify the partition that will be used by the controller. If not specified, default partition (0) will be used.WithMaxBytes: specify the maximum size of a message that will be received. If not specified, default value (10e6, meaning10MB) will be used.WithLogger: specify the logger that will be used by the controller. If not specified, a silent logger is used that won't log anything.WithAutoCommit: specify if the broker should use auto-commit for incoming messages or manual commits. Note that commits are managed by the broker implementation regardless, with manual commits they are executed after the message is complete processed. Subscribers retain the option to manually handle errors via the ErrorHandler, to use mechanisms such as dead letter or retry topics. The default value istrueWithSasl: specify sasl mechanism to connect to the broker. Per default no mechanism will be used.WithTLS: specify tls config to connect to the broker. Per default no tls config will be used.WithConnectionTest: specify if the controller should make a connection test on creation. The default value istrue
To use a TLS connection and or authentication for the connection to the kafka broker the following options can be used:
// Plain mechanism
kafkaController, err := kafka.NewController([]string{"<host>:<port>"},
kafka.WithGroupID(queueGroupID),
kafka.WithSasl(plain.Mechanism{Username: "<user>", Password: "<password>"}),
)
// Sha256 mechanism
sha256Mechanism, err := scram.Mechanism(scram.SHA256, "<user>", "<password>")
if err != nil {
// handle error
}
kafkaController, err := kafka.NewController([]string{"<host>:<port>"},
kafka.WithGroupID(queueGroupID),
kafka.WithSasl(sha256Mechanism),
)
// Sha512 mechanism
sha512Mechanism, err := scram.Mechanism(scram.SHA512, "<user>", "<password>")
if err != nil {
// handle error
}
kafkaController, err := kafka.NewController([]string{"<host>:<port>"},
kafka.WithGroupID(queueGroupID),
kafka.WithSasl(sha512Mechanism),
)
// TLS
// configure tls.config
myTLSConfig := &tls.Config{}
kafkaController, err := kafka.NewController([]string{"<host>:<port>"},
kafka.WithGroupID(queueGroupID),
kafka.WithTLS(myTLSConfig),
)In order to use NATS as a broker, you can use the following code:
// Create the NATS controller
broker, _ := nats.NewController("nats://<host>:<port>")
defer broker.Close()
// Add NATS controller to a new App controller
ctrl, err := NewAppController(broker, /* options */)
//...Here are the options that you can use with the NATS controller:
WithLogger: specify the logger that will be used by the controller. If not specified, a silent logger is used that won't log anything.WithQueueGroup: specify the queue group that will be used by the controller. If not specified, default queue name (asyncapi) will be used.WithConnectionOpts: specify connection Options for establishing connection with nats see Nats Options for more information. If not specified, no options will be used.
To use a TLS connection and or authentication for the connection to the nats broker the following nats options can be used:
import (
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats"
// import natsio go client option
natsio "github.com/nats-io/nats.go"
)
func main(){
myTLSConfig := &tls.Config{}
natsController, err := nats.NewController("nats://<host>:<port>",
nats.WithQueueGroup(queueGroupID),
nats.WithConnectionOpts(natsio.UserCredentials("<user.jwt>", "<user.nk>"), natsio.Secure(myTLSConfig))
)
}In order to use NATS JetStream as a broker, you can use the following code:
// Create the NATS controller
broker, _ := natsjetstream.NewController("nats://<host>:<port>", /* options */)
defer broker.Close()
// Add NATS controller to a new App controller
ctrl, err := NewAppController(broker)
//...It is important to either create/update a stream with WithStreamConfig or to use WithStream to specify the stream that will be used by the broker.
Consumer for the user controller can be either created/updated with WithConsumerConfig or WithConsumer.
- the messages will be ack'd from the consumer even though the subscription was not setup (this will be logged)
In order to connect your application and your user to your broker, we need to provide a controller to it. Here is the interface that you need to satisfy:
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions"
)
type BrokerController interface {
// Publish a message to the broker
Publish(ctx context.Context, channel string, mw extensions.BrokerMessage) error
// Subscribe to messages from the broker
Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan any, err error)
}You can find that there is an extensions.BrokerMessage structure that is provided and
that aims to abstract the event broker technology.
By writing your own by satisfying this interface, you will be able to connect your broker to the generated code.
The default options for asyncapi-codegen will generate everything; user, application, and type definitions but you can generate subsets of those via the -generate flag. It defaults to user,application,types but you can specify any combination of those.
Here are the universal parts that you can generate:
application: generate the application boilerplate.applicationrequires the types in the same package to compile.user: generate the user boilerplate. It, too, requires the types to be present in its package.types: all type definitions for all types in the AsyncAPI spec. This will be everything under#components, as well as request parameter, request body, and response type objects.
The package name is the name of the package that will be used in the generated code. It is important to have the same package name for the user, application, and types in order to compile the code.
The input file is the path to the AsyncAPI specification file that will be used to generate the code. It can be either a JSON or a YAML file.
Also, you can specify dependencies by separating them with a comma:
asyncapi-codegen -i ./asyncapi.yaml,./dependency1.yaml,./dependency2.yaml -p <your-package> -o ./asyncapi.gen.goA dependency can also be an OpenAPI document: only its components are kept,
so its schemas can be referenced (e.g. $ref: './openapi.yaml#/components/schemas/Pet')
even when the document contains fields AsyncAPI does not allow, such as an
array-typed servers.
The output file is the path to the file that will be generated by the tool. It will contain the generated code.
By default, the generated code will be formatted using gofmt. If you want to
disable this feature, you can use the -f flag.
By default, the generation will use the key specified in the asyncapi codegen.
This is also called the none convertion. You can also convert your keys to
snake, camel, or kebab.
Given this schema:
Payload:
type: object
properties:
This_is a-Property:
type: stringHere are the generated JSON sent, given by the different options:
- No conversion (
none):{ "This_is a-Property": "value" } - Camel case (
camel):{ "thisIsAProperty": "value" } - Pascal case (
pascal):{ "ThisIsAProperty": "value" } - Kebab case (
kebab):{ "this-is-a-property": "value" } - Snake case (
snake):{ "this_is_a_property": "value" }
You can use middlewares that will be executing when receiving and publishing
messages. You can add one or multiple middlewares using the WithMiddlewares
function in the initialization of the App or User controller:
// Create a new app controller with middlewares
ctrl, _ := NewAppController(/* Broker of your choice */, WithMiddlewares(myMiddleware1, myMiddleware2 /*, ... */))Here the function signature that should be satisfied:
func(ctx context.Context, msg *extensions.BrokerMessage, next extensions.NextMiddleware) errorNote: the returned context will be the one that will be passed to following middlewares, and finally to the generated code (and subscription callback).
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions"
// ...
)
func myMiddleware(ctx context.Context, _ *extensions.BrokerMessage, _ middleware.Next) error {
// Execute this middleware only if this is a received message
extensions.IfContextValueEquals(ctx, extensions.ContextKeyIsDirection, "reception", func() {
// Do specific stuff if message is received
})
return nil
}You can even discriminate on more specification. Please see the Context section.
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions"
// ...
)
func myMiddleware(_ context.Context, msg *extensions.BrokerMessage, _ middleware.Next) error {
msg.Headers["additional"] = "some-info"
return nil
}import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions"
// ...
)
func myMiddleware(_ context.Context, msg *extensions.BrokerMessage, _ middleware.Next) error {
if msg.Headers["author"] != "me" {
return fmt.Errorf("this is not me, aborting...")
}
return nil
}By default, middlewares will be executed right before the operation. If there is
a need to execute code before and/or after the operation, you can call the next
argument that represents the next middleware that should be executed or the
operation corresponding code if this was the last middleware.
Here is an example:
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions"
// ...
)
func surroundingMiddleware(ctx context.Context, next extensions.NextMiddleware) error {
// Pre-operation
fmt.Println("This will be displayed BEFORE the reception/publication")
// Calling next middleware or reception/publication code
// The given context will be the one propagated to other middlewares and operation source code
err := next(ctx)
// Post-operation
fmt.Println("This will be displayed AFTER the reception/publication")
return err
}When receiving the context from generated code (either in subscription, middleware, logging, etc), you can get some information embedded in context.
To get these information, please use the functions from
github.com/lerenn/asyncapi-codegen/pkg/extensions:
// Execute this middleware only if this is from "ping" channel
extensions.IfContextValueEquals(ctx, extensions.ContextKeyIsChannel, "ping", func() {
// Do specific stuff if the channel is ping
})You can find other keys in the package pkg/extensions.
You can have 2 types of logging:
- Controller logging: logs the internal operations of the controller (subscription, malformed messages, etc);
- Publication/Reception logging: logs every publication or reception of messages.
To log internal operation of the controller, the only thing you have to do is
to initialize the controller with a logger, with the function WithLogger():
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
// ...
)
func main() {
// Create a new app controller with an Elastic Common Schema JSON compatible logger
ctrl, _ := NewAppController(/* Broker of your choice */, WithLogger(log.NewECS()))
// ...
}You can find all loggers in the directory pkg/log.
By default, the provided loggers (NewText and NewECS) output every message.
You can restrict them to a minimum severity level with the WithLevel option:
import(
log "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers"
)
// Only warnings and errors will be printed, info messages are discarded.
logger := log.NewECS(log.WithLevel(log.LevelWarning))Available levels are LevelInfo (default, logs everything), LevelWarning
(warnings and errors) and LevelError (errors only).
To log published and received messages, you'll have to pass a logger as a middleware in order to execute it on every published and received messages:
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
// ...
)
func main() {
// Create a new app controller with a middleware for logging incoming/outgoing messages
loggingMiddleware := middleware.Logging(log.NewECS())
ctrl, _ := NewAppController(/* Broker of your choice */, WithMiddlewares(loggingMiddleware))
// ...
}It is possible to set your own logger to the generated code, all you have to do is to fill the following interface:
type Logger interface {
// Info logs information based on a message and key-value elements
Info(ctx log.Context, msg string, info ...log.AdditionalInfo)
// Error logs error based on a message and key-value elements
Error(ctx log.Context, msg string, info ...log.AdditionalInfo)
}Here is a basic implementation example:
type SimpleLogger struct{}
func (logger SimpleLogger) formatLog(ctx log.Context, info ...log.AdditionalInfo) string {
var formattedLogInfo string
for i := 0; i < len(keyvals)-1; i += 2 {
formattedLogInfo = fmt.Sprintf("%s, %s: %+v", formattedLogInfo, info.Key, info.Value)
}
return fmt.Sprintf("%s, context: %+v", formattedLogInfo, ctx)
}
func (logger SimpleLogger) Info(ctx log.Context, msg string, info ...log.AdditionalInfo) {
log.Printf("INFO: %s%s", msg, logger.formatLog(ctx, info...))
}
func (logger SimpleLogger) Error(ctx log.Context, msg string, info ...log.AdditionalInfo) {
log.Printf("ERROR: %s%s", msg, logger.formatLog(ctx, info...))
}You can then create a controller with a logger using similar lines:
// Create a new app controller with the custom logger
ctrl, _ := NewAppController(
/* Broker of your choice */,
WithLogger(SimpleLogger{}), /* Use on as internal logger */
WithMiddleware(middleware.Logging(SimpleLogger{})), /* Use to log incoming/outgoing messages */
)The generated code passes the contextual information (channel, correlation ID,
direction, provider, broker message, version) directly to your logger as
explicit LogInfo arguments at every log call. Your logger therefore only has
to log the msg and the info it is given — there is no need to unwrap any
context key:
func (logger SimpleLogger) Info(ctx context.Context, msg string, info ...extensions.LogInfo) {
// 'info' already contains the contextual values (channel, correlationID, …)
// ... log msg with info
}Migration note (from versions where loggers enriched from context): the contextual values are no longer read from the
context.Contextby the built-inText/ECSloggers — they are now supplied as explicitLogInfoarguments by the generated code, using the generic keys returned byextensions.LogInfosFromContext(channel,correlationID,brokerMessage,direction,provider,version). This means:
- The built-in
TextandECSloggers now emit those generic keys instead of their previous custom names (e.g. ECS no longer maps totrace.id,event.action,event.original).- Custom loggers receive everything as arguments and no longer need to call
extensions.LogInfosFromContextthemselves.The values are still stored in the
context.Context(it stays useful for other purposes), so a custom logger that wants to keep reading from context can still callextensions.LogInfosFromContext(ctx).
If you are in need to do a migration or support multiple versions of your
AsyncAPI specifications, you can use the versioning package:
import (
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/versioning"
v1 "path/to/asyncapi/spec/version/1"
v2 "path/to/asyncapi/spec/version/2"
)
func main() {
// Create a broker (here from NATS)
broker, _ := nats.NewController("nats://nats:4222"))
defer broker.Close()
// Add a version wrapper to the broker
vw := versioning.NewWrapper(broker)
// Create application for version 1
appV1, _ := v1.NewAppController(vw, /* controller options */)
defer appV1.Close(context.Background())
// Create v2 app
appV2, _ := v2.NewAppController(vw, /* controller options */)
defer appV2.Close(context.Background())
// ...
}Then you can use each application independently:
err := appV1.SubscribeHello(context.Background(), func(ctx context.Context, msg v1.HelloMessage) {
// Stuff for version 1
})
err := appV2.SubscribeHello(context.Background(), func(ctx context.Context, msg v2.HelloMessage) {
// Stuff for version 2
})That way, you can support multiple different versions with the same broker.
The versioning feature will add an application-version header to each
message in order to have the correct version of the application on each of
them.
If messages can have no application-version, you can use the option WithDefaultVersion
to add a default version to non-tagged messages.
vw := versioning.NewWrapper(broker, versioning.WithDefaultVersion("1.1.4"))Also, if you don't want to use this header as a recipient to the application version,
you can specify your own header with the option WithVersionHeaderKey.
vw := versioning.NewWrapper(broker, versioning.WithVersionHeaderKey("my-version-key"))If you want to use the version of the AsyncAPI document used, you can access the constant
AsyncAPIVersion that is generated with the types. It is generated as followed:
const AsyncAPIVersion = "{{ .Info.Version }}"These extension properties apply to "Operation Objects" and override the Go name of the functions generated for that operation. When omitted, the default derived name is used.
x-go-subscribe-func: overrides theSubscribeTo<Operation>method name.x-go-received-func: overrides the<Operation>Receivedsubscriber callback name.x-go-reply-func: overrides theReplyTo<Operation>method name.x-go-send-func: overrides theSend{As,To}<Operation>method name.x-go-request-func: overrides theRequest{As,To}<Operation>method name.
For example,
operations:
ping:
action: receive
channel:
$ref: '#/channels/ping'
x-go-subscribe-func: ListenForPing
x-go-received-func: OnPing
x-go-send-func: PublishPing
x-go-request-func: AskPingThese extension properties apply to "Schema Objects" in AsyncAPI spec.
-
x-go-type: Overrides the default Go type with the specified Go type name.For example,
schemas: Object: properties: flag: type: integer x-go-type: uint8
will be generated as
type Object struct { Flag uint8 `json:"flag"` }
-
x-go-name: Overrides the Go identifier generated for a type or a struct field, while keeping the original property key in the JSON tag.For example,
schemas: event: type: object x-go-name: Event properties: event_id: type: integer x-go-name: EventID
will be generated as
type Event struct { EventID *int64 `json:"event_id,omitempty"` }
-
x-go-type-import: Specifies the import package forx-go-type. This has two propertiesnameandpath.pathis the package import path, e.g.github.com/google/uuid.nameis the package import name, which is optional. For example,schemas: Object: properties: flag: type: integer x-go-type: mypackage.Flag x-go-type-import: path: abc.xyz/repo/mypackage
will be generated as
import ( "abc.xyz/repo/mypackage" ) // ... type Object struct { Flag mypackage.Flag `json:"flag"` }
while
schemas: Object: properties: flag: type: integer x-go-type: alias.Flag x-go-type-import: path: abc.xyz/repo/mypackage name: alias
will be generated as
import ( alias "abc.xyz/repo/mypackage" ) // ... type Object struct { Flag alias.Flag `json:"flag"` }
-
x-omitempty: Controls the addition of theomitemptytag in JSON tags of generated Go structures. When set tofalse, theomitemptytag won't be added even for optional fields.For example,
schemas: User: type: object properties: id: type: string name: type: string x-omitempty: false
will be generated as
type User struct { Id *string `json:"id,omitempty"` Name *string `json:"name"` }
When an optional property declares a default value, a SetDefaults() method is
generated on the corresponding struct. Calling it fills the fields that are still
unset (nil) with their default, leaving already-set fields untouched:
schemas:
Settings:
type: object
properties:
retries:
type: integer
default: 3generates:
func (t *Settings) SetDefaults() {
if t.Retries == nil {
v := int64(3)
t.Retries = &v
}
}Defaults are supported for scalar properties (string, boolean, integer, number).
When an operation (or its channel) declares more than one message, a dedicated send function is generated for each one, named after the operation and the message:
channels:
events:
address: app.events
messages:
userCreated: { $ref: '#/components/messages/userCreated' }
userDeleted: { $ref: '#/components/messages/userDeleted' }
operations:
sendEvents:
action: send
channel: { $ref: '#/channels/events' }generates SendAsSendEventsOperationForUserCreated and
...ForUserDeleted, each sending its own message type. Operations with a single
message are unchanged.
On the receive side, a subscribe function is likewise generated per message, named after the operation and the message, with one callback per message type:
operations:
receiveEvents:
action: receive
channel: { $ref: '#/channels/events' }generates SubscribeToReceiveEventsOperationForUserCreated(ctx, fn) and
...ForUserDeleted(ctx, fn) (plus the matching UnsubscribeFrom...For...
functions and per-message methods on the subscriber interface). A single broker
subscription is opened per channel and multiplexed to the registered handlers.
Each received message is discriminated with a try-each-until-valid strategy:
it is unmarshalled and validated against every expected message schema (using
go-playground/validator, so this
dependency must be available in your module) and dispatched to the first one
that matches. This is heuristic and can be ambiguous when several message
schemas overlap — it works best when the messages have distinguishing required
or enum/const fields. A message that matches none of the expected types is
nacked and surfaced to the error handler as
extensions.ErrNoMatchingMessage. Operations with a single message are
unchanged.
When a channel parameter declares a location runtime expression, the generated
send functions auto-fill that parameter from the outgoing message when the
caller leaves it empty. The parameter stays overridable: if you set it
explicitly, your value is used as-is.
channels:
user:
address: app.user.{userId}
parameters:
userId:
location: $message.payload#/userId
messages:
userSignedUp:
$ref: '#/components/messages/userSignedUp'With the spec above, SendAs... derives userId from msg.Payload.UserId, so
the message is published on app.user.<userId> without passing the parameter
manually. Supported locations are $message.payload#/... and
$message.header#/..., and the targeted field must be a string.
You can use an error handler that will be executed when processing for messages
failed. To add a custom ErrorHandler to your controller use the WithErrorHandler
function in the initialization of the App or User controller:
// Create a new app controller with ErrorHandler
ctrl, _ := NewAppController(/* Broker of your choice */, WithErrorHandler(myErrorHandler), ...)Here the function signature that should be satisfied:
func(ctx context.Context, topic string, msg *AcknowledgeableBrokerMessage, err error)Note: The default ErrorHandler is a Noop ErrorHandler doing nothing. By using a ErrorHandler you can add custom behavior for example to move messages to retry or dead letter topics/queues. Acks and Naks will be executed after the ErrorHandler, you can use the AcknowledgeableBrokerMessage in the handler to Ack/Nak the message manually.
// Create a new app controller with Logging ErrorHandler
ctrl, _ := NewAppController(/* Broker of your choice */, WithErrorHandler(errorhandlers.Logging(mylogger)), ...)func(ctx context.Context, topic string, msg *extensions.AcknowledgeableBrokerMessage, err error) {
// check error or move message to some other queue/topic
handleTheErrorSomehow()
// Ack or Nak the message
msg.Ack()
msg.Nak()
}You can use go-playground/validator to validate the fields content against the contract.
The following tags are currently supported:
| Asyncapi | Validator tag | Comment |
|---|---|---|
| required | required | For a full support, the flag --force-pointers is necessary |
| minLength | min | |
| maxLength | max | |
| minimum | gte | |
| maximum | lte | |
| exclusiveMinimum | gt | |
| exclusiveMaximum | lt | |
| uniqueItems | unique | Only for arrays |
| enum | oneof | Only string enum are supported |
You can validate manually wherever you hold a generated payload, or plug in the provided opt-in middleware to reject invalid messages automatically:
import (
"github.com/go-playground/validator/v10"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares"
)
ctrl, _ := NewUserController(broker, WithMiddlewares(
middlewares.Validation(validator.New(), func() any { return &UserMessagePayload{} }),
))The middleware unmarshals the message payload and validates it against the generated tags; if validation fails it returns an error, so the message is neither delivered (on reception) nor sent (on publication).
If you find any bug or lacking a feature, please raise an issue on the Github repository!
Also please do not hesitate to propose any improvment or bug fix on PR. Any contribution is warmly welcomed!
And if you find this project useful, please support it through the Support feature on Github.