Event Driven Architecture
A transport-agnostic, resilient event bus for Authula built on Watermill. Supports 7 message brokers with automatic reconnection, concurrent handler execution, and graceful shutdown.
Architecture
Services publish events to the EventBus, which uses Watermill PubSub under the hood. The EventBus provides features like automatic ID assignment, timestamping, multiplexing, concurrent handler execution, retry logic, and panic recovery. Supported transports include GoChannel, Redis, Kafka, RabbitMQ, NATS, PostgreSQL, and SQLite. Handlers process events concurrently, and the EventBus manages reconnection, graceful shutdown, and message acknowledgment.
Event Flow Overview:
-
Services (A, B, C, ...)
- Each service can publish events to the EventBus.
- Services can also subscribe to specific event types and handle them.
-
EventBus (built on Watermill)
- Receives published events from services.
- Assigns automatic IDs and timestamps.
- Supports multiplexing and concurrent handler execution.
- Handles retries, panic recovery, reconnection, graceful shutdown, and message acknowledgment.
-
Pub/Sub Transport Layer
- The EventBus communicates with the underlying transport (GoChannel, Redis, Kafka, RabbitMQ, NATS, PostgreSQL, or SQLite).
- The transport layer is responsible for delivering events between services.
Summary:
Services interact with the EventBus to publish and subscribe to events. The EventBus manages event delivery, reliability, and concurrency, while the transport layer handles the actual message passing between services.
Quick Start
// Initialize provider (Redis example)
config := authulaconfig.WithEventBus(authulamodels.EventBusConfig{
Provider: authulaevents.ProviderRedis.String(),
Redis: &authulamodels.RedisConfig{
URL: os.Getenv(authulaenv.EnvRedisURL),
ConsumerGroup: os.Getenv(authulaenv.EnvEventBusConsumerGroup),
},
})
// Then inside of plugins, you can use it like so:
func (p *SomePlugin) Init(ctx *models.PluginContext) error {
_, err := p.ctx.EventBus.Subscribe("event_name", func(ctx context.Context, event models.Event) error {
return nil
})
if err != nil {
return err
}
return nil
}Supported Providers
| Provider | Required Env Var | Optional Config | Best For |
|---|---|---|---|
| GoChannel | None | BufferSize | Testing, single-node apps |
| Redis | REDIS_URL | URL, ConsumerGroup | Production, caching layers |
| Kafka | KAFKA_BROKERS | Brokers, ConsumerGroup | High throughput, streaming |
| RabbitMQ | RABBITMQ_URL | URL | Complex routing, AMQP |
| NATS | NATS_URL | URL | Cloud-native, simplicity |
| PostgreSQL | POSTGRES_URL | URL | Existing DB infrastructure |
| SQLite | None | DBPath (default: events.db) | Embedded, edge deployments |
Configuration
Environment variables take precedence over config values:
REDIS_URL="redis://localhost:6379"
EVENT_BUS_CONSUMER_GROUP="consumer_group"// With explicit config
authulaconfig.WithEventBus(authulamodels.EventBusConfig{
Provider: authulaevents.ProviderRedis.String(),
Redis: &authulamodels.RedisConfig{
URL: os.Getenv(authulaenv.EnvRedisURL),
ConsumerGroup: os.Getenv(authulaenv.EnvEventBusConsumerGroup),
},
})Best Practices
- Handler Idempotency: Events may be redelivered; design handlers to handle duplicates safely
- Topic Naming: Use dot-notation (e.g.,
user.created,session.expired) - Error Handling: Return errors for retry, nil for success; panics are caught and logged
- Concurrency: Default limit is 100 concurrent handlers; tune via
MaxConcurrentHandlers - Resource Cleanup: Always call
bus.Close()for graceful shutdown
Resilience Features
The EventBus automatically handles:
- Reconnection: Exponential backoff (500ms to 30s) with jitter on transport failures
- Panics: Recovered per handler; other handlers continue processing
- Graceful Shutdown: Waits for active handlers to complete before closing
- Message Acknowledgment: Automatic ack/nack based on handler success
