t140llm

A TypeScript library that converts LLM streaming responses into T.140 real-time text format for SIP, WebRTC, and (S)RTP applications.

Low Latency

Provides approximately 10% lower latency compared to traditional WebSockets.

🔄

Multiple Transports

WebSocket, direct RTP/SRTP, and UNIX sockets support.

🛡️

Forward Error Correction

Built-in FEC for packet loss recovery in network transmissions.

🔧

Customizable

Support for custom transport implementations and metadata handling.

🔌

Compatible

Works with OpenAI, Anthropic, Mistral AI, Cohere, Ollama, and Vercel AI SDK.

📝

T.140 Compliant

Formats text according to T.140 standard for real-time text applications.

🧠

Reasoning Streams

Extracts and processes LLM reasoning alongside text for transparency.

🛠️

Tool Integration

Handles tool calls and tool results from modern LLM interactions.

🔀

Multi-Channel

Support for separate transport channels for text, reasoning, and tools.

Installation

Install t140llm using npm:

npm install t140llm

Requirements

  • Node.js ≥ 10.18.1
  • npm ≥ 6.13.4

Dependencies

The library has the following key dependencies:

  • ws - For WebSocket functionality
  • node-unix-socket - For UNIX socket communication
  • werift-rtp - For RTP/SRTP packet handling

API Reference

Core Functions

processAIStream

processAIStream(stream: TextDataStream, websocketUrl?: string, options?: ProcessorOptions): Promise<void>

Processes an AI text stream and sends it to a WebSocket server.

Parameters:

  • stream: TextDataStream - The AI text stream to process
  • websocketUrl: string (optional) - The WebSocket server URL
  • options: ProcessorOptions (optional) - Processing options

Returns:

A Promise that resolves when the stream has been completely processed.

processAIStreamToRtp

processAIStreamToRtp(stream: TextDataStream, remoteAddress: string, remotePort?: number, rtpConfig?: RtpConfig): Promise<void>

Processes an AI text stream and sends it directly using RTP.

Parameters:

  • stream: TextDataStream - The AI text stream to process
  • remoteAddress: string - The remote IP address
  • remotePort: number (optional) - The remote port (default: 5004)
  • rtpConfig: RtpConfig (optional) - RTP configuration options

Returns:

A Promise that resolves when the stream has been completely processed.

processAIStreamToSrtp

processAIStreamToSrtp(stream: TextDataStream, remoteAddress: string, srtpConfig: SrtpConfig, remotePort?: number): Promise<void>

Processes an AI text stream and sends it using Secure RTP (SRTP).

Parameters:

  • stream: TextDataStream - The AI text stream to process
  • remoteAddress: string - The remote IP address
  • srtpConfig: SrtpConfig - SRTP configuration including security keys
  • remotePort: number (optional) - The remote port (default: 5004)

Returns:

A Promise that resolves when the stream has been completely processed.

processAIStreamToDirectSocket

processAIStreamToDirectSocket(stream: TextDataStream, socketPath?: string, rtpConfig?: RtpConfig): Promise<void>

Processes an AI text stream and sends it to a UNIX socket.

Parameters:

  • stream: TextDataStream - The AI text stream to process
  • socketPath: string (optional) - Path to the UNIX socket
  • rtpConfig: RtpConfig (optional) - RTP configuration options

Returns:

A Promise that resolves when the stream has been completely processed.

Pre-Connection Functions

createT140WebSocketConnection

createT140WebSocketConnection(websocketUrl?: string): TransportStream

Creates a WebSocket transport connection without starting data transmission.

Parameters:

  • websocketUrl: string (optional) - The WebSocket server URL

Returns:

A TransportStream instance ready for use with processAIStream.

createT140RtpTransport

createT140RtpTransport(remoteAddress: string, remotePort?: number, rtpConfig?: RtpConfig): TransportStream

Creates an RTP transport connection without starting data transmission.

Parameters:

  • remoteAddress: string - The remote IP address
  • remotePort: number (optional) - The remote port
  • rtpConfig: RtpConfig (optional) - RTP configuration options

Returns:

A TransportStream instance ready for use with processAIStream.

createT140SrtpTransport

createT140SrtpTransport(remoteAddress: string, srtpConfig: SrtpConfig, remotePort?: number): TransportStream

Creates an SRTP transport connection without starting data transmission.

Parameters:

  • remoteAddress: string - The remote IP address
  • srtpConfig: SrtpConfig - SRTP configuration including security keys
  • remotePort: number (optional) - The remote port

Returns:

A TransportStream instance ready for use with processAIStream.

createDirectSocketTransport

createDirectSocketTransport(socketPath?: string): TransportStream

Creates a direct socket transport connection without starting data transmission.

Parameters:

  • socketPath: string (optional) - Path to the UNIX socket

Returns:

A TransportStream instance ready for use with processAIStream.

Interfaces

TransportStream

Interface for custom transport implementations. Required for creating custom transports.

interface TransportStream { send(data: Buffer, callback?: (error?: Error) => void): void; close?(): void; }

TextDataStream

Interface for streaming data sources such as LLM outputs.

interface TextDataStream { on(event: 'data', listener: (chunk: any) => void): this; on(event: 'end', listener: () => void): this; on(event: 'error', listener: (err: Error) => void): this; on(event: 'metadata', listener: (metadata: LLMMetadata) => void): this; }

LLMMetadata

Interface for LLM metadata such as tool calls, reasoning, and other structured data.

interface LLMMetadata { type: 'tool_call' | 'tool_result' | 'custom' | 'reasoning' | string; content: any; id?: string; }

ProcessorOptions

Configuration options for stream processors.

interface ProcessorOptions { processBackspaces?: boolean; handleMetadata?: boolean; metadataCallback?: (metadata: LLMMetadata) => void; sendMetadataOverTransport?: boolean; preCreateConnection?: boolean; }

RtpConfig

Configuration interface for RTP.

interface RtpConfig { payloadType?: number; ssrc?: number; sequenceNumber?: number; timestamp?: number; enableFec?: boolean; fecInterval?: number; processBackspaces?: boolean; rateLimit?: number; redundancy?: number; handleMetadata?: boolean; metadataCallback?: (metadata: LLMMetadata) => void; sendMetadataOverTransport?: boolean; }

SrtpConfig

Configuration for SRTP security, extends RtpConfig.

interface SrtpConfig extends RtpConfig { masterKey: Buffer; masterSalt: Buffer; profile?: number; isSRTCP?: boolean; }

Examples

LLM Provider Support

t140llm supports various LLM providers including OpenAI, Anthropic Claude, Mistral AI, Cohere, and Ollama. Here are examples with different providers:

With OpenAI

import { processAIStream } from 't140llm';
import OpenAI from 'openai';

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Create a streaming response
const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream and convert to T.140
processAIStream(stream);

With Anthropic Claude

import { processAIStream } from 't140llm';
import Anthropic from "@anthropic-ai/sdk";

// Initialize Anthropic client
const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Create a streaming response
const stream = await anthropic.messages.create({
  model: "claude-3-sonnet-20240229",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream with t140llm
await processAIStream(stream, 'ws://localhost:8080');

With Mistral AI

import { processAIStream } from 't140llm';
import MistralClient from "@mistralai/mistralai";

// Initialize Mistral client
const mistral = new MistralClient({
  apiKey: process.env.MISTRAL_API_KEY,
});

// Create a streaming response
const stream = await mistral.chat({
  model: "mistral-large-latest",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream and convert to T.140
processAIStream(stream);

With Cohere

import { processAIStream } from 't140llm';
import { CohereClient } from "cohere-ai";

// Initialize Cohere client
const cohere = new CohereClient({
  token: process.env.COHERE_API_KEY,
});

// Create a streaming response
const stream = await cohere.chatStream({
  model: "command",
  message: "Write a short story.",
});

// Process the stream and convert to T.140
processAIStream(stream);

With Ollama

import { processAIStream } from 't140llm';
import { EventEmitter } from 'events';

// For Ollama, you'd typically use fetch
async function streamFromOllama() {
  const response = await fetch('http://localhost:11434/api/generate', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'llama3',
      prompt: 'Write a short story',
      stream: true,
    }),
  });

  if (!response.ok) {
    throw new Error(`Ollama API error: ${response.statusText}`);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  // Create an event emitter to simulate a stream
  const stream = new EventEmitter();
  
  // Process chunks as they arrive
  const processChunks = async () => {
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        stream.emit('end');
        break;
      }
      const text = decoder.decode(value);
      const lines = text.split('\n').filter(line => line.trim());
      
      for (const line of lines) {
        try {
          const json = JSON.parse(line);
          stream.emit('data', json);
        } catch (e) {
          console.error('Error parsing Ollama response:', e);
        }
      }
    }
  };
  
  processChunks().catch(err => stream.emit('error', err));
  return stream;
}

// Get the stream and process it
const stream = await streamFromOllama();
processAIStream(stream);

Direct RTP Example

import { processAIStreamToRtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';

const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

const response = await client.messages.create({
  model: 'claude-3-sonnet-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'Tell me about real-time communication' }],
});

// Process the stream using RTP to a specific IP and port
await processAIStreamToRtp(response, '192.168.1.100', 5004, {
  enableFec: true,  // Enable Forward Error Correction
  processBackspaces: true,  // Process backspace characters
});

SRTP Secure Transport Example

import { processAIStreamToSrtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
import { randomBytes } from 'crypto';

const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Generate secure keys for SRTP
const masterKey = randomBytes(16);  // 128 bits
const masterSalt = randomBytes(14); // 112 bits

const response = await client.messages.create({
  model: 'claude-3-sonnet-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'Tell me about secure communications' }],
});

// Process the stream using secure SRTP
await processAIStreamToSrtp(response, '192.168.1.100', {
  masterKey,
  masterSalt,
  enableFec: true,
});

Direct Socket Example

import { processAIStreamToDirectSocket } from 't140llm';
import { anthropic } from '@anthropic/sdk';

const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

const response = await client.messages.create({
  model: 'claude-3-sonnet-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'Tell me about UNIX sockets' }],
});

// Process the stream using a direct UNIX socket
await processAIStreamToDirectSocket(response, '/tmp/t140llm.sock');

Pre-Connection Example

import { createT140RtpTransport, processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';

// Pre-create the transport before starting the LLM request
const transport = createT140RtpTransport('192.168.1.100', 5004, {
  enableFec: true,
});

// Later, when you're ready to process the stream:
const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

const response = await client.messages.create({
  model: 'claude-3-sonnet-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'Tell me about latency optimization' }],
});

// Use the pre-created transport
await processAIStream(response, null, {
  preCreateConnection: true,
  transport: transport,
});

Reasoning Stream Example

import { processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';

const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Create a streaming response with reasoning enabled
const response = await client.messages.create({
  model: 'claude-3-sonnet-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'Solve this math problem: 2x + 5 = 13' }],
});

// Set up separate handling for reasoning and text
const handleMetadata = (metadata) => {
  if (metadata.type === 'reasoning') {
    console.log('Reasoning process:', metadata.content);
    // You could send this to a different UI element to show
    // the LLM's step-by-step thinking process
  }
};

// Process the stream with reasoning handling
await processAIStream(response, 'ws://localhost:8080', {
  handleMetadata: true,
  metadataCallback: handleMetadata,
  sendMetadataOverWebsocket: true, // Send reasoning over WebSocket too
});

Metadata Handling Example

import { processAIStreamToRtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';

const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

const response = await client.messages.create({
  model: 'claude-3-opus-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'What's the weather in San Francisco?' }],
  tools: [
    {
      name: 'get_weather',
      description: 'Get the current weather in a location',
      input_schema: {
        type: 'object',
        properties: {
          location: {
            type: 'string',
            description: 'The city and state, e.g. San Francisco, CA',
          },
          unit: { 
            type: 'string', 
            enum: ['celsius', 'fahrenheit'],
            description: 'The temperature unit to use'
          },
        },
        required: ['location'],
      },
    },
  ],
});

// Process the stream with metadata handling
await processAIStreamToRtp(response, '192.168.1.100', 5004, {
  handleMetadata: true,
  metadataCallback: (metadata) => {
    if (metadata.type === 'tool_call') {
      console.log('Tool call received:', metadata.content);
      // Here you would handle the tool call and send back results
    }
  },
  sendMetadataOverTransport: true, // Send metadata in the RTP stream
});

Custom Transport Example

import { processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';

// Implement a custom transport
const customTransport = {
  send: (data, callback) => {
    // Custom logic to transport the data
    console.log('Sending data:', data.toString('utf8'));
    
    // For example, you might send to a proprietary protocol
    // myCustomProtocol.send(data);
    
    // Call the callback when done
    if (callback) callback();
  },
  close: () => {
    // Custom cleanup logic
    console.log('Transport closed');
  }
};

const client = new anthropic.Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

const response = await client.messages.create({
  model: 'claude-3-sonnet-20240229',
  max_tokens: 1000,
  stream: true,
  messages: [{ role: 'user', content: 'Tell me about custom protocols' }],
});

// Use the custom transport
await processAIStream(response, null, {
  transport: customTransport,
});

Advanced Usage

Separate Transports for Reasoning

You can use separate transport channels for text and reasoning data:

import { processAIStreamToDirectSocket } from 't140llm';
import { EventEmitter } from 'events';
import * as fs from 'fs';

// Create two separate socket paths
const textSocketPath = '/tmp/llm-text.sock';
const reasoningSocketPath = '/tmp/llm-reasoning.sock';

// Clean up any existing sockets
if (fs.existsSync(textSocketPath)) fs.unlinkSync(textSocketPath);
if (fs.existsSync(reasoningSocketPath)) fs.unlinkSync(reasoningSocketPath);

// Create a separate socket for reasoning data
const reasoningSocket = processAIStreamToDirectSocket(
  new EventEmitter(), // Empty stream to initialize socket
  reasoningSocketPath
);

// Create custom reasoning metadata handler that sends to separate socket
const handleMetadata = (metadata) => {
  if (metadata.type === 'reasoning') {
    reasoningSocket.write(JSON.stringify({
      type: 'reasoning',
      content: metadata.content
    }) + '\n');
  }
};

// Process the main text stream with the reasoning handler
const textSocket = processAIStreamToDirectSocket(
  llmStream,
  textSocketPath, 
  {
    handleMetadata: true,
    metadataCallback: handleMetadata
  }
);

// This allows different clients to connect to each socket:
// - One client can show the final LLM text output
// - Another client can show the reasoning process

Forward Error Correction (FEC)

The library includes built-in Forward Error Correction to recover from packet loss during transmission. Enable it by setting enableFec: true in your RTP configuration:

await processAIStreamToRtp(stream, remoteAddress, remotePort, {
  enableFec: true,
  fecInterval: 5, // Send FEC packets every 5 data packets
});

Backspace Processing

Some LLMs might emit backspace characters during text generation. The library can handle these automatically:

await processAIStream(stream, websocketUrl, {
  processBackspaces: true,
});

Rate Limiting

You can control the transmission rate of packets:

await processAIStreamToRtp(stream, remoteAddress, remotePort, {
  rateLimit: 30, // Limit to 30 packets per second
});

Redundancy

For improved reliability, you can enable packet redundancy:

await processAIStreamToRtp(stream, remoteAddress, remotePort, {
  redundancy: 2, // Send each packet twice
});

Custom RTP Configuration

You can customize various RTP parameters:

await processAIStreamToRtp(stream, remoteAddress, remotePort, {
  payloadType: 98,     // Custom payload type (default: 96)
  ssrc: 12345678,      // Custom SSRC (default: random)
  sequenceNumber: 100, // Starting sequence number (default: random)
  timestamp: 0,        // Starting timestamp (default: current time)
});