TnsAI

Channels

The Channels module (TnsAI.Channels) is a multi-channel messaging gateway that connects TnsAI agents to external messaging platforms. It uses an adapter pattern with SPI discovery so new platforms can be added without modifying the core routing logic.

Built-in adapters: Telegram, CLI (REPL + JSON), Email (IMAP poll + SMTP send), Slack (Socket Mode), Discord (Gateway), WhatsApp (Cloud API). The Writing a New Channel Adapter section shows how to plug in your own platform via the SPI. The CLI adapter is streaming-capable — see StreamingChannelAdapter below.

Architecture

Messaging Platform --> ChannelAdapter --> UnifiedMessage --> MessageRouter --> Agent Layer
                                                                                  |
Messaging Platform <-- ChannelAdapter <-- UnifiedResponse <---------------------+

The key design principles:

  • Platform-agnostic message model: All platforms convert to/from UnifiedMessage and UnifiedResponse records
  • SPI discovery: Adapters are loaded via ServiceLoader from META-INF/services
  • Optional dependencies: Platform SDKs (Telegram Bot API, etc.) are optional Maven dependencies
  • Capability-aware routing: Each adapter declares what it supports via ChannelCapabilities

Quick Start

// 1. Create registry and discover adapters on classpath
ChannelRegistry registry = new ChannelRegistry();
registry.discoverAdapters();

// 2. Create message router
MessageRouter router = new MessageRouter(registry);
router.setMessageHandler((message, callback) -> {
    // Process with your agent
    String response = agent.chat(message.text());
    callback.send(UnifiedResponse.builder()
        .channelId(message.channelId())
        .conversationId(message.conversationId())
        .text(response)
        .build());
});

// 3. Start all adapters
registry.startAll(router);

Package Layout

com.tnsai.channels
+-- api/          Core interfaces and message models
+-- annotation/   @Channel annotation
+-- enums/        ChannelState lifecycle enum
+-- model/        Attachment, MessageMeta, ResponseHints, ChannelError
+-- registry/     SPI-based adapter discovery and lifecycle
+-- routing/      MessageRouter, SessionRegistry
+-- telegram/     Telegram Bot API adapter
+-- cli/          Local REPL + JSON adapter (streaming-capable reference)
+-- email/        IMAP poll + SMTP send adapter
+-- slack/        Slack Socket Mode adapter
+-- discord/      Discord Gateway adapter
+-- whatsapp/     WhatsApp Cloud API adapter

Core Interfaces

ChannelAdapter

The main SPI contract every messaging platform must implement:

public interface ChannelAdapter {
    String id();                           // Unique identifier (e.g. "telegram")
    String displayName();                  // Human-readable name
    ChannelCapabilities capabilities();    // What the channel supports
    void start(ChannelContext context);    // Begin listening for messages
    void stop();                           // Tear down and release resources
    void send(UnifiedResponse response);  // Send a response back through the channel
    boolean isRunning();                   // Whether the adapter is operational
}

Lifecycle: ChannelRegistry calls start(context) once with a ChannelContext for delivering inbound events. Adapters call context.onMessage(unified) when a message arrives from the platform.

ChannelAdapterFactory

SPI factory for lazy adapter creation with classpath availability checks:

public interface ChannelAdapterFactory {
    String getType();                    // Channel type identifier
    String getDisplayName();             // Human-readable name
    ChannelAdapter create(Object config); // Create adapter with platform-specific config
    boolean isAvailable();               // Whether required SDK classes are on classpath
}

Factories are registered in META-INF/services/com.tnsai.channels.api.ChannelAdapterFactory and support configuration-driven, lazy instantiation.

ChannelContext

Callback interface provided to adapters for delivering inbound events to the gateway:

public interface ChannelContext {
    void onMessage(UnifiedMessage message);     // Inbound message arrived
    void onTyping(String conversationId);       // Remote user is typing
    void onError(ChannelError error);           // Adapter encountered an error
}

ChannelDiscovery

Utility for discovering available adapter factories via ServiceLoader:

// Only factories whose SDK is on the classpath
List<ChannelAdapterFactory> factories = ChannelDiscovery.discoverFactories();

// All registered factories, regardless of availability
List<ChannelAdapterFactory> all = ChannelDiscovery.discoverAllFactories();

StreamingChannelAdapter (mixin)

since 0.10.2 — Mixin interface for adapters that want per-token delivery as the agent's reply streams, rather than waiting for the assembled UnifiedResponse:

public interface StreamingChannelAdapter extends ChannelAdapter {
    void sendChunk(UnifiedChunk chunk);
}

Because it extends ChannelAdapter, every StreamingChannelAdapter is also a regular adapter — gateway code typically does an instanceof check and dispatches chunks through sendChunk(...) as they arrive, then still calls send(UnifiedResponse) once with the assembled reply for non-streaming downstreams (logging, audit). Adapters that don't implement the mixin keep working unchanged — send(UnifiedResponse) remains the single delivery point.

The reference implementation is CliChannel (TNS-440 / #335):

  • REPL mode prints the assistant: prefix once on the first chunk, then concatenates deltas inline.
  • JSON mode emits one {"type":"chunk","content":"...","done":...} record per chunk.
  • The post-stream send(UnifiedResponse) is suppressed (the body was already rendered chunk-by-chunk); a suppressNextSend latch resets after one consumption so subsequent standalone send() calls render normally.

For the Sona-side gateway plumbing that fans framework Agent.streamChatWithTools chunks through this mixin, see Sona → How It Works.

UnifiedChunk

since 0.10.2 — The delta record StreamingChannelAdapter.sendChunk carries:

public record UnifiedChunk(
    String conversationId,         // Same key as the parent UnifiedResponse
    String delta,                  // Text fragment (may be empty for control chunks)
    boolean done,                  // true for the final chunk in a reply
    Map<String, Object> metadata,  // Free-form per-chunk metadata; null treated as empty
    Instant timestamp              // When the chunk was emitted
) { ... }

Compact constructor null-normalises delta to "" and metadata to Map.of(), defensively copies metadata, and requires conversationId + timestamp. Two factory methods cover the common cases:

UnifiedChunk.text(conversationId, "hello");   // non-terminal text chunk
UnifiedChunk.done(conversationId);             // final empty chunk with done=true

done is the streaming sentinel, not the lifecycle terminator — the matching UnifiedResponse still arrives via send(...) afterward unless the adapter explicitly suppresses it (as CliChannel does).

Message Models

UnifiedMessage (Inbound)

Platform-agnostic inbound message. Every adapter converts its native format into this record before handing it to the router.

public record UnifiedMessage(
    String channelId,          // Adapter id (e.g. "telegram")
    String senderId,           // Platform-specific user id
    String senderName,         // Human-readable sender name
    String conversationId,     // Platform-specific chat/channel id
    String text,               // Message text (null for media-only)
    List<Attachment> attachments,
    MessageMeta meta,          // Reply-to, thread id, platform message id
    Instant timestamp
) {
    // Convenience method: unique session key
    public String sessionKey();  // Returns "channelId:conversationId"
}

Validation: channelId, senderId, and conversationId must be non-blank. Attachments are defensively copied.

Build with the fluent builder:

UnifiedMessage message = UnifiedMessage.builder()
    .channelId("telegram")
    .senderId("123456")
    .senderName("Alice")
    .conversationId("789")
    .text("Hello, agent!")
    .timestamp(Instant.now())
    .build();

UnifiedResponse (Outbound)

Platform-agnostic outbound response. The gateway produces this after agent processing; the adapter converts it into the native platform format.

public record UnifiedResponse(
    String channelId,            // Target adapter id
    String conversationId,       // Target chat/channel
    String text,                 // Response text
    List<Attachment> attachments, // Files to send
    ResponseHints hints          // Delivery hints
) {}

Build with the fluent builder:

UnifiedResponse response = UnifiedResponse.builder()
    .channelId("telegram")
    .conversationId("789")
    .text("Here is your answer.")
    .hints(ResponseHints.withStreaming())
    .build();

Attachment

A file or media item attached to a message or response:

public record Attachment(
    String fileName,
    String mimeType,
    String url,            // Remote URL (null for local data)
    byte[] data,           // Raw bytes (null for remote)
    long sizeBytes,        // -1 if unknown
    AttachmentType type    // IMAGE, AUDIO, VIDEO, DOCUMENT, VOICE, STICKER, OTHER
) {
    // Factory methods
    static Attachment ofUrl(String fileName, String mimeType, String url, AttachmentType type);
    static Attachment ofData(String fileName, String mimeType, byte[] data, AttachmentType type);
}

MessageMeta

Optional platform-specific metadata:

public record MessageMeta(
    String replyToMessageId,    // Message this is replying to
    String threadId,            // Thread identifier
    String platformMessageId,   // Native message id from source platform
    Map<String, String> extra   // Arbitrary platform-specific data
) {
    static MessageMeta of(String platformMessageId);
    static MessageMeta reply(String platformMessageId, String replyToMessageId);
}

ResponseHints

Controls how the adapter delivers a response:

public record ResponseHints(
    boolean streaming,           // Token-by-token delivery if supported
    boolean showTyping,          // Send typing indicator first
    boolean splitLongMessages,   // Auto-split text exceeding channel max length
    String replyToMessageId      // Native message id to reply to
) {
    static ResponseHints defaults();        // typing=true, split=true
    static ResponseHints withStreaming();    // streaming=true, typing=true, split=true
    static ResponseHints replyTo(String messageId);
}

ChannelError

Error record with severity classification:

public record ChannelError(
    String channelId,
    Severity severity,    // TRANSIENT, DEGRADED, FATAL
    String message,
    Throwable cause,
    Instant timestamp
) {
    static ChannelError transient_(String channelId, String message, Throwable cause);
    static ChannelError degraded(String channelId, String message, Throwable cause);
    static ChannelError fatal(String channelId, String message, Throwable cause);
}

ChannelCapabilities

Each adapter declares what it supports. The router and response formatter use this to adapt behavior per channel.

public record ChannelCapabilities(
    boolean threads,           // Threaded conversations
    boolean reactions,         // Emoji reactions
    boolean fileUpload,        // File/document uploads
    boolean voice,             // Voice messages
    boolean typing,            // Typing indicators
    boolean editing,           // Message editing (for streaming)
    boolean streaming,         // Token-by-token streaming via editing
    long maxMessageLength      // Max text length per message
) {}

Build with the fluent builder:

ChannelCapabilities capabilities = ChannelCapabilities.builder()
    .threads(false)
    .reactions(true)
    .fileUpload(true)
    .voice(true)
    .typing(true)
    .editing(true)
    .streaming(true)
    .maxMessageLength(4096)
    .build();

ChannelState

Lifecycle state machine for adapters:

REGISTERED --> STARTING --> RUNNING --> STOPPING --> STOPPED
                  |            |
                  v            v
               FAILED       DEGRADED
StateMeaning
REGISTEREDRegistered but not yet started
STARTINGCurrently connecting to platform
RUNNINGAccepting messages
DEGRADEDTemporarily impaired (rate limited, reconnecting)
STOPPINGShutting down
STOPPEDNo longer accepting messages
FAILEDFatal error during start or operation

Registry and Routing

ChannelRegistry

Discovers and manages adapter lifecycle. Adapters can be loaded via SPI or registered programmatically.

public class ChannelRegistry {
    void discoverAdapters();                               // Load via ServiceLoader
    void register(ChannelAdapter adapter);                 // Register programmatically
    void startAdapter(String channelId, ChannelContext context);
    void startAll(ChannelContext context);
    void stopAdapter(String channelId);
    void stopAll();
    Optional<ChannelAdapter> get(String channelId);
    ChannelState getState(String channelId);
    Collection<ChannelAdapter> getAllAdapters();
    Collection<String> getRegisteredIds();
}

Thread-safe: backed by ConcurrentHashMap. Duplicate registrations are logged and ignored.

MessageRouter

The central routing hub that bridges the channel layer with the agent layer. Implements ChannelContext so it receives all inbound events from adapters.

public class MessageRouter implements ChannelContext {

    MessageRouter(ChannelRegistry registry);

    // Connect to agent layer
    void setMessageHandler(BiConsumer<UnifiedMessage, ResponseCallback> handler);

    // Proactive outbound messaging
    void sendProactive(String channelId, String conversationId, String text);

    SessionRegistry getSessions();

    // Callback interface for agent responses
    @FunctionalInterface
    interface ResponseCallback {
        void send(UnifiedResponse response);
    }
}

The message handler receives the inbound message and a ResponseCallback for sending the agent's response back to the originating channel. This keeps TnsAI.Channels decoupled from specific agent implementations.

SessionRegistry

Tracks active conversations with idle reaping:

public class SessionRegistry {
    void touch(String sessionKey, String channelId, String conversationId);
    Optional<ChannelSession> get(String sessionKey);
    Collection<ChannelSession> getAll();
    void remove(String sessionKey);
    int reapIdle(Duration maxIdle);    // Remove sessions idle longer than threshold
    int size();

    record ChannelSession(
        String sessionKey,        // "channelId:conversationId"
        String channelId,
        String conversationId,
        Instant createdAt,
        Instant lastActive
    ) {}
}

@Channel Annotation

Marks a class as a channel adapter for registry discovery:

@Channel(id = "telegram", displayName = "Telegram")
public class TelegramAdapter implements ChannelAdapter { ... }
ElementTypeDescription
idStringUnique channel identifier (required)
displayNameStringHuman-readable name for logs and UI

Reference Implementation: Telegram

The TelegramAdapter is the reference adapter implementation, showing how to build a complete channel integration.

Class: com.tnsai.channels.telegram.TelegramAdapter

Features:

  • Long-polling via com.pengrad.telegrambot.TelegramBot
  • Extracts text, photos, documents, voice, audio, video, stickers
  • Sends typing indicators before responses
  • Auto-splits long messages at newline/space boundaries (max 4096 chars)
  • Reply-to support via ReplyParameters
  • Token resolution from TELEGRAM_BOT_TOKEN env var or telegram.bot.token system property
@Channel(id = "telegram", displayName = "Telegram")
public class TelegramAdapter implements ChannelAdapter {

    @Override
    public ChannelCapabilities capabilities() {
        return ChannelCapabilities.builder()
            .threads(false).reactions(true).fileUpload(true)
            .voice(true).typing(true).editing(true)
            .streaming(true).maxMessageLength(4096)
            .build();
    }

    // ... start(), stop(), send(), isRunning()
}

Factory (TelegramAdapterFactory): Reports isAvailable() == true only when com.pengrad.telegrambot.TelegramBot is on the classpath.

Writing a New Channel Adapter

To add a new messaging platform — e.g., a Microsoft Teams adapter or an SMS gateway:

  1. Create a package: com.tnsai.channels.myplatform/
  2. Implement ChannelAdapter, annotate with @Channel(id = "myplatform", displayName = "My Platform")
  3. Implement ChannelAdapterFactory with isAvailable() checking for the platform SDK
  4. Register in META-INF/services/com.tnsai.channels.api.ChannelAdapterFactory
  5. Add the platform SDK as an <optional> dependency in pom.xml

Key requirements:

  • Thread safety: adapters may receive concurrent messages
  • Convert all inbound messages to UnifiedMessage before calling context.onMessage()
  • Convert UnifiedResponse back to the platform's native format in send()
  • Handle long messages by respecting maxMessageLength from capabilities
  • Report errors via context.onError() with appropriate Severity

On this page