App Logo
Concepts

Event Driven Architecture

A transport-agnostic, resilient event bus for Authula built on Watermill. Supports 7 message brokers with automatic reconnection, concurrent handler execution, and graceful shutdown.

Architecture

Services publish events to the EventBus, which uses Watermill PubSub under the hood. The EventBus provides features like automatic ID assignment, timestamping, multiplexing, concurrent handler execution, retry logic, and panic recovery. Supported transports include GoChannel, Redis, Kafka, RabbitMQ, NATS, PostgreSQL, and SQLite. Handlers process events concurrently, and the EventBus manages reconnection, graceful shutdown, and message acknowledgment.

Event Flow Overview:

  1. Services (A, B, C, ...)

    • Each service can publish events to the EventBus.
    • Services can also subscribe to specific event types and handle them.
  2. EventBus (built on Watermill)

    • Receives published events from services.
    • Assigns automatic IDs and timestamps.
    • Supports multiplexing and concurrent handler execution.
    • Handles retries, panic recovery, reconnection, graceful shutdown, and message acknowledgment.
  3. Pub/Sub Transport Layer

    • The EventBus communicates with the underlying transport (GoChannel, Redis, Kafka, RabbitMQ, NATS, PostgreSQL, or SQLite).
    • The transport layer is responsible for delivering events between services.

Summary:
Services interact with the EventBus to publish and subscribe to events. The EventBus manages event delivery, reliability, and concurrency, while the transport layer handles the actual message passing between services.

Quick Start

// Initialize provider (Redis example)
config := authulaconfig.WithEventBus(authulamodels.EventBusConfig{
	Provider: authulaevents.ProviderRedis.String(),
	Redis: &authulamodels.RedisConfig{
		URL: os.Getenv(authulaenv.EnvRedisURL),
		ConsumerGroup: os.Getenv(authulaenv.EnvEventBusConsumerGroup),
	},
})

// Then inside of plugins, you can use it like so:
func (p *SomePlugin) Init(ctx *models.PluginContext) error {
	_, err := p.ctx.EventBus.Subscribe("event_name", func(ctx context.Context, event models.Event) error {
		return nil
	})
	if err != nil {
		return err
	}
	return nil
}

Supported Providers

ProviderRequired Env VarOptional ConfigBest For
GoChannelNoneBufferSizeTesting, single-node apps
RedisREDIS_URLURL, ConsumerGroupProduction, caching layers
KafkaKAFKA_BROKERSBrokers, ConsumerGroupHigh throughput, streaming
RabbitMQRABBITMQ_URLURLComplex routing, AMQP
NATSNATS_URLURLCloud-native, simplicity
PostgreSQLPOSTGRES_URLURLExisting DB infrastructure
SQLiteNoneDBPath (default: events.db)Embedded, edge deployments

Configuration

Environment variables take precedence over config values:

.env
REDIS_URL="redis://localhost:6379"
EVENT_BUS_CONSUMER_GROUP="consumer_group"
// With explicit config
authulaconfig.WithEventBus(authulamodels.EventBusConfig{
	Provider: authulaevents.ProviderRedis.String(),
	Redis: &authulamodels.RedisConfig{
		URL:           os.Getenv(authulaenv.EnvRedisURL),
		ConsumerGroup: os.Getenv(authulaenv.EnvEventBusConsumerGroup),
	},
})

Best Practices

  • Handler Idempotency: Events may be redelivered; design handlers to handle duplicates safely
  • Topic Naming: Use dot-notation (e.g., user.created, session.expired)
  • Error Handling: Return errors for retry, nil for success; panics are caught and logged
  • Concurrency: Default limit is 100 concurrent handlers; tune via MaxConcurrentHandlers
  • Resource Cleanup: Always call bus.Close() for graceful shutdown

Resilience Features

The EventBus automatically handles:

  • Reconnection: Exponential backoff (500ms to 30s) with jitter on transport failures
  • Panics: Recovered per handler; other handlers continue processing
  • Graceful Shutdown: Waits for active handlers to complete before closing
  • Message Acknowledgment: Automatic ack/nack based on handler success

On this page