Connection Management¶
The AMPS binder uses a layered connection abstraction to manage AMPS client connections with support for high availability, authentication, and custom stores.
Architecture¶
AmpsConnectionFactoryProvider
│
▼
AmpsConnectionFactory ─── creates ──→ AmpsConnection
│ │
│ ├── subscribe()
│ ├── publish()
│ ├── setHeartbeat()
│ ├── setAckBatchSize()
│ ├── setAckTimeout()
│ └── close()
│
├── MessageStoreProvider (publish store)
├── BookmarkStoreProvider (bookmark store)
├── AmpsHeaderConverter (header encoding)
└── Authenticator (optional)
Interfaces¶
AmpsConnection¶
Wraps an AMPS HAClient and provides methods for subscribing, publishing, configuring heartbeats, acknowledgment batching, and closing connections.
AmpsConnectionFactory¶
Creates AmpsConnection instances. Each consumer binding and producer binding gets its own connection(s).
AmpsConnectionFactoryProvider¶
Creates AmpsConnectionFactory from the binder configuration. This is the top-level entry point resolved by the binder during initialization.
HA Client¶
The binder uses the AMPS High-Availability Client (HAClient). Key behaviors:
-
Server Chooser — A
DefaultServerChooseris configured with all broker URLs from thebrokersproperty. If one server becomes unavailable, the client automatically fails over to the next. -
Reconnection Strategy — An
ExponentialDelayStrategyis used, doubling the reconnection delay on each failure up to themaxReconnectTimecap. -
Unique Client Names — Each connection gets a unique name in the format
{name}_{PID}_{sequence}, ensuring no client name collisions on the AMPS server. -
Heartbeats — The binder-level
heartBeatIntervalkeeps connections alive and enables quick detection of stale connections.
Connection Lifecycle¶
Consumer Connections¶
When a consumer binding starts:
- The binder creates
concurrencynumber of AMPS connections (default is 1). - Each connection:
- Connects and logs on to the AMPS server.
- Configures heartbeat, ACK batch size, and ACK timeout.
- Issues the subscription command (Subscribe, SOW, or SOWAndSubscribe).
- As messages arrive, they are converted to Spring
Message<byte[]>and dispatched to the bound channel.
When stopping:
- All connections are flushed (publishes and ACKs).
- Connections are closed gracefully.
Producer Connections¶
When a producer binding starts:
- A single AMPS connection is created.
- The connection is opened to the AMPS server.
When a message is published:
- The payload is extracted (String or byte[]).
- If
publishAmpsHeaderis enabled, the correlation ID is set from encoded headers. - The message is published to the destination topic with the configured ACK type and timeout.
When stopping:
- The connection is flushed and closed.
Custom Stores¶
Publish Store¶
The publish store buffers outgoing messages for at-least-once delivery. If the connection fails, buffered messages are replayed on reconnection.
- Default:
MemoryPublishStorewithpublishStoreSizeentries. - Custom: Provide a
MessageStoreProviderbean and setpublishMessageStoreProviderBeanName.
@Bean
public MessageStoreProvider myPublishStore() {
return () -> new SOWStore("./publish-store", 1);
}
Bookmark Store¶
The bookmark store persists consumed message bookmarks for durable subscriptions. On reconnection, the client can resume from the last acknowledged bookmark.
- Default:
MemoryBookmarkStore(bookmarks lost on restart). - Custom: Provide a
BookmarkStoreProviderbean and setsubscriptionBookmarkStoreProviderBeanName.
@Bean
public BookmarkStoreProvider myBookmarkStore() {
return () -> new LoggedBookmarkStore("./bookmark-store");
}
Header Converter¶
The AmpsHeaderConverter encodes/decodes Spring message headers to/from AMPS correlation IDs.
- Default:
AmpsHeaderConverterImpl— serializes headers to JSON, then Base64-encodes as the correlation ID. - Custom: Provide an
AmpsHeaderConverterbean and setampsHeaderConverterBeanName.
See Custom Header Propagation for details.
Bean Resolution¶
The binder resolves extension beans (stores, converter, authenticator) from the Spring ApplicationContext. Due to Spring Cloud Stream's DefaultBinderFactory creating child application contexts, the binder tries two name variants:
{beanName}{beanName}_child
This ensures beans declared at the parent level are correctly resolved in multi-binder scenarios.