A TypeScript library that converts LLM streaming responses into T.140 real-time text format for SIP, WebRTC, and (S)RTP applications.
Provides approximately 10% lower latency compared to traditional WebSockets.
WebSocket, direct RTP/SRTP, and UNIX sockets support.
Built-in FEC for packet loss recovery in network transmissions.
Support for custom transport implementations and metadata handling.
Works with OpenAI, Anthropic, Mistral AI, Cohere, Ollama, and Vercel AI SDK.
Formats text according to T.140 standard for real-time text applications.
Extracts and processes LLM reasoning alongside text for transparency.
Handles tool calls and tool results from modern LLM interactions.
Support for separate transport channels for text, reasoning, and tools.
Install t140llm using npm:
npm install t140llm
The library has the following key dependencies:
ws
- For WebSocket functionalitynode-unix-socket
- For UNIX socket communicationwerift-rtp
- For RTP/SRTP packet handlingProcesses an AI text stream and sends it to a WebSocket server.
stream
: TextDataStream
- The AI text stream to processwebsocketUrl
: string
(optional) - The WebSocket server URLoptions
: ProcessorOptions
(optional) - Processing optionsA Promise that resolves when the stream has been completely processed.
Processes an AI text stream and sends it directly using RTP.
stream
: TextDataStream
- The AI text stream to processremoteAddress
: string
- The remote IP addressremotePort
: number
(optional) - The remote port (default: 5004)rtpConfig
: RtpConfig
(optional) - RTP configuration optionsA Promise that resolves when the stream has been completely processed.
Processes an AI text stream and sends it using Secure RTP (SRTP).
stream
: TextDataStream
- The AI text stream to processremoteAddress
: string
- The remote IP addresssrtpConfig
: SrtpConfig
- SRTP configuration including security keysremotePort
: number
(optional) - The remote port (default: 5004)A Promise that resolves when the stream has been completely processed.
Processes an AI text stream and sends it to a UNIX socket.
stream
: TextDataStream
- The AI text stream to processsocketPath
: string
(optional) - Path to the UNIX socketrtpConfig
: RtpConfig
(optional) - RTP configuration optionsA Promise that resolves when the stream has been completely processed.
Creates a WebSocket transport connection without starting data transmission.
websocketUrl
: string
(optional) - The WebSocket server URLA TransportStream instance ready for use with processAIStream.
Creates an RTP transport connection without starting data transmission.
remoteAddress
: string
- The remote IP addressremotePort
: number
(optional) - The remote portrtpConfig
: RtpConfig
(optional) - RTP configuration optionsA TransportStream instance ready for use with processAIStream.
Creates an SRTP transport connection without starting data transmission.
remoteAddress
: string
- The remote IP addresssrtpConfig
: SrtpConfig
- SRTP configuration including security keysremotePort
: number
(optional) - The remote portA TransportStream instance ready for use with processAIStream.
Creates a direct socket transport connection without starting data transmission.
socketPath
: string
(optional) - Path to the UNIX socketA TransportStream instance ready for use with processAIStream.
Interface for custom transport implementations. Required for creating custom transports.
Interface for streaming data sources such as LLM outputs.
Interface for LLM metadata such as tool calls, reasoning, and other structured data.
Configuration options for stream processors.
Configuration interface for RTP.
Configuration for SRTP security, extends RtpConfig.
t140llm supports various LLM providers including OpenAI, Anthropic Claude, Mistral AI, Cohere, and Ollama. Here are examples with different providers:
import { processAIStream } from 't140llm';
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// Create a streaming response
const stream = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: "Write a short story." }],
stream: true,
});
// Process the stream and convert to T.140
processAIStream(stream);
import { processAIStream } from 't140llm';
import Anthropic from "@anthropic-ai/sdk";
// Initialize Anthropic client
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Create a streaming response
const stream = await anthropic.messages.create({
model: "claude-3-sonnet-20240229",
messages: [{ role: "user", content: "Write a short story." }],
stream: true,
});
// Process the stream with t140llm
await processAIStream(stream, 'ws://localhost:8080');
import { processAIStream } from 't140llm';
import MistralClient from "@mistralai/mistralai";
// Initialize Mistral client
const mistral = new MistralClient({
apiKey: process.env.MISTRAL_API_KEY,
});
// Create a streaming response
const stream = await mistral.chat({
model: "mistral-large-latest",
messages: [{ role: "user", content: "Write a short story." }],
stream: true,
});
// Process the stream and convert to T.140
processAIStream(stream);
import { processAIStream } from 't140llm';
import { CohereClient } from "cohere-ai";
// Initialize Cohere client
const cohere = new CohereClient({
token: process.env.COHERE_API_KEY,
});
// Create a streaming response
const stream = await cohere.chatStream({
model: "command",
message: "Write a short story.",
});
// Process the stream and convert to T.140
processAIStream(stream);
import { processAIStream } from 't140llm';
import { EventEmitter } from 'events';
// For Ollama, you'd typically use fetch
async function streamFromOllama() {
const response = await fetch('http://localhost:11434/api/generate', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'llama3',
prompt: 'Write a short story',
stream: true,
}),
});
if (!response.ok) {
throw new Error(`Ollama API error: ${response.statusText}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
// Create an event emitter to simulate a stream
const stream = new EventEmitter();
// Process chunks as they arrive
const processChunks = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
stream.emit('end');
break;
}
const text = decoder.decode(value);
const lines = text.split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const json = JSON.parse(line);
stream.emit('data', json);
} catch (e) {
console.error('Error parsing Ollama response:', e);
}
}
}
};
processChunks().catch(err => stream.emit('error', err));
return stream;
}
// Get the stream and process it
const stream = await streamFromOllama();
processAIStream(stream);
import { processAIStreamToRtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about real-time communication' }],
});
// Process the stream using RTP to a specific IP and port
await processAIStreamToRtp(response, '192.168.1.100', 5004, {
enableFec: true, // Enable Forward Error Correction
processBackspaces: true, // Process backspace characters
});
import { processAIStreamToSrtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
import { randomBytes } from 'crypto';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Generate secure keys for SRTP
const masterKey = randomBytes(16); // 128 bits
const masterSalt = randomBytes(14); // 112 bits
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about secure communications' }],
});
// Process the stream using secure SRTP
await processAIStreamToSrtp(response, '192.168.1.100', {
masterKey,
masterSalt,
enableFec: true,
});
import { processAIStreamToDirectSocket } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about UNIX sockets' }],
});
// Process the stream using a direct UNIX socket
await processAIStreamToDirectSocket(response, '/tmp/t140llm.sock');
import { createT140RtpTransport, processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';
// Pre-create the transport before starting the LLM request
const transport = createT140RtpTransport('192.168.1.100', 5004, {
enableFec: true,
});
// Later, when you're ready to process the stream:
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about latency optimization' }],
});
// Use the pre-created transport
await processAIStream(response, null, {
preCreateConnection: true,
transport: transport,
});
import { processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Create a streaming response with reasoning enabled
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Solve this math problem: 2x + 5 = 13' }],
});
// Set up separate handling for reasoning and text
const handleMetadata = (metadata) => {
if (metadata.type === 'reasoning') {
console.log('Reasoning process:', metadata.content);
// You could send this to a different UI element to show
// the LLM's step-by-step thinking process
}
};
// Process the stream with reasoning handling
await processAIStream(response, 'ws://localhost:8080', {
handleMetadata: true,
metadataCallback: handleMetadata,
sendMetadataOverWebsocket: true, // Send reasoning over WebSocket too
});
import { processAIStreamToRtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-opus-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'What's the weather in San Francisco?' }],
tools: [
{
name: 'get_weather',
description: 'Get the current weather in a location',
input_schema: {
type: 'object',
properties: {
location: {
type: 'string',
description: 'The city and state, e.g. San Francisco, CA',
},
unit: {
type: 'string',
enum: ['celsius', 'fahrenheit'],
description: 'The temperature unit to use'
},
},
required: ['location'],
},
},
],
});
// Process the stream with metadata handling
await processAIStreamToRtp(response, '192.168.1.100', 5004, {
handleMetadata: true,
metadataCallback: (metadata) => {
if (metadata.type === 'tool_call') {
console.log('Tool call received:', metadata.content);
// Here you would handle the tool call and send back results
}
},
sendMetadataOverTransport: true, // Send metadata in the RTP stream
});
import { processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';
// Implement a custom transport
const customTransport = {
send: (data, callback) => {
// Custom logic to transport the data
console.log('Sending data:', data.toString('utf8'));
// For example, you might send to a proprietary protocol
// myCustomProtocol.send(data);
// Call the callback when done
if (callback) callback();
},
close: () => {
// Custom cleanup logic
console.log('Transport closed');
}
};
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about custom protocols' }],
});
// Use the custom transport
await processAIStream(response, null, {
transport: customTransport,
});
You can use separate transport channels for text and reasoning data:
import { processAIStreamToDirectSocket } from 't140llm';
import { EventEmitter } from 'events';
import * as fs from 'fs';
// Create two separate socket paths
const textSocketPath = '/tmp/llm-text.sock';
const reasoningSocketPath = '/tmp/llm-reasoning.sock';
// Clean up any existing sockets
if (fs.existsSync(textSocketPath)) fs.unlinkSync(textSocketPath);
if (fs.existsSync(reasoningSocketPath)) fs.unlinkSync(reasoningSocketPath);
// Create a separate socket for reasoning data
const reasoningSocket = processAIStreamToDirectSocket(
new EventEmitter(), // Empty stream to initialize socket
reasoningSocketPath
);
// Create custom reasoning metadata handler that sends to separate socket
const handleMetadata = (metadata) => {
if (metadata.type === 'reasoning') {
reasoningSocket.write(JSON.stringify({
type: 'reasoning',
content: metadata.content
}) + '\n');
}
};
// Process the main text stream with the reasoning handler
const textSocket = processAIStreamToDirectSocket(
llmStream,
textSocketPath,
{
handleMetadata: true,
metadataCallback: handleMetadata
}
);
// This allows different clients to connect to each socket:
// - One client can show the final LLM text output
// - Another client can show the reasoning process
The library includes built-in Forward Error Correction to recover from packet loss during transmission. Enable it by setting enableFec: true
in your RTP configuration:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
enableFec: true,
fecInterval: 5, // Send FEC packets every 5 data packets
});
Some LLMs might emit backspace characters during text generation. The library can handle these automatically:
await processAIStream(stream, websocketUrl, {
processBackspaces: true,
});
You can control the transmission rate of packets:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
rateLimit: 30, // Limit to 30 packets per second
});
For improved reliability, you can enable packet redundancy:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
redundancy: 2, // Send each packet twice
});
You can customize various RTP parameters:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
payloadType: 98, // Custom payload type (default: 96)
ssrc: 12345678, // Custom SSRC (default: random)
sequenceNumber: 100, // Starting sequence number (default: random)
timestamp: 0, // Starting timestamp (default: current time)
});