A TypeScript library that converts LLM streaming responses into T.140 real-time text format for SIP, WebRTC, and (S)RTP applications.
Provides approximately 10% lower latency compared to traditional WebSockets.
WebSocket, direct RTP/SRTP, and UNIX sockets support.
Built-in FEC for packet loss recovery in network transmissions.
Support for custom transport implementations and metadata handling.
Works with OpenAI, Anthropic, Mistral AI, Cohere, Ollama, and Vercel AI SDK.
Formats text according to T.140 standard for real-time text applications.
Extracts and processes LLM reasoning alongside text for transparency.
Handles tool calls and tool results from modern LLM interactions.
Support for separate transport channels for text, reasoning, and tools.
Hide RTP packets within cover media for added security and obfuscation.
Dynamically create steganography algorithms using LLMs.
Extend the steganography system with custom algorithms and transport layers.
Install t140llm using npm:
npm install t140llm
The library has the following key dependencies:
ws
- For WebSocket functionalitynode-unix-socket
- For UNIX socket communicationwerift-rtp
- For RTP/SRTP packet handlingProcesses an AI text stream and sends it to a WebSocket server.
stream
: TextDataStream
- The AI text stream to processwebsocketUrl
: string
(optional) - The WebSocket server URLoptions
: ProcessorOptions
(optional) - Processing optionsA Promise that resolves when the stream has been completely processed.
Processes an AI text stream and sends it directly using RTP.
stream
: TextDataStream
- The AI text stream to processremoteAddress
: string
- The remote IP addressremotePort
: number
(optional) - The remote port (default: 5004)rtpConfig
: RtpConfig
(optional) - RTP configuration optionsA Promise that resolves when the stream has been completely processed.
Processes an AI text stream and sends it using Secure RTP (SRTP).
stream
: TextDataStream
- The AI text stream to processremoteAddress
: string
- The remote IP addresssrtpConfig
: SrtpConfig
- SRTP configuration including security keysremotePort
: number
(optional) - The remote port (default: 5004)A Promise that resolves when the stream has been completely processed.
Processes an AI text stream and sends it to a UNIX socket.
stream
: TextDataStream
- The AI text stream to processsocketPath
: string
(optional) - Path to the UNIX socketrtpConfig
: RtpConfig
(optional) - RTP configuration optionsA Promise that resolves when the stream has been completely processed.
Creates a WebSocket transport connection without starting data transmission.
websocketUrl
: string
(optional) - The WebSocket server URLA TransportStream instance ready for use with processAIStream.
Creates an RTP transport connection without starting data transmission.
remoteAddress
: string
- The remote IP addressremotePort
: number
(optional) - The remote portrtpConfig
: RtpConfig
(optional) - RTP configuration optionsA TransportStream instance ready for use with processAIStream.
Creates an SRTP transport connection without starting data transmission.
remoteAddress
: string
- The remote IP addresssrtpConfig
: SrtpConfig
- SRTP configuration including security keysremotePort
: number
(optional) - The remote portA TransportStream instance ready for use with processAIStream.
Creates a direct socket transport connection without starting data transmission.
socketPath
: string
(optional) - Path to the UNIX socketA TransportStream instance ready for use with processAIStream.
Interface for custom transport implementations. Required for creating custom transports.
Interface for streaming data sources such as LLM outputs.
Interface for LLM metadata such as tool calls, reasoning, and other structured data.
Configuration options for stream processors.
Configuration interface for RTP.
Configuration for SRTP security, extends RtpConfig.
Configuration interface for steganography.
enabled
: boolean
- Enable/disable steganographyencodeMode
: 'llm' | 'fixed'
- Use LLM-generated or built-in algorithmcoverMedia
: Buffer[]
- Array of cover media buffersprompt
: string
- Custom prompt for LLM algorithm generationalgorithm
: string
- Custom algorithm codeseed
: string
- Random seed for deterministic algorithm generationencodingRatio
: number
- Percentage of packet to encode (0-100)llmProvider
: any
- Custom LLM provider for algorithm generationA transport wrapper that adds steganography capabilities.
encode
- Apply steganography to hide data in cover mediadecode
- Extract hidden data from steganographically modified mediagetConfig
- Get current steganography configurationupdateConfig
- Update steganography configurationExtended RTP configuration with steganography support.
Creates an RTP transport with steganography support.
remoteAddress
: string
- The remote IP addressremotePort
: number
(optional) - The remote portconfig
: RtpConfigWithSteg
(optional) - RTP configuration with steganography optionsA T140RtpTransport instance with steganography capabilities.
Processes an AI text stream and sends it using RTP with steganography.
stream
: TextDataStream
- The AI text stream to processremoteAddress
: string
- The remote IP addressremotePort
: number
(optional) - The remote portconfig
: RtpConfigWithSteg
(optional) - RTP configuration with steganography optionsA T140RtpTransport instance with steganography capabilities.
t140llm supports various LLM providers including OpenAI, Anthropic Claude, Mistral AI, Cohere, and Ollama. Here are examples with different providers:
import { processAIStream } from 't140llm';
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// Create a streaming response
const stream = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: "Write a short story." }],
stream: true,
});
// Process the stream and convert to T.140
processAIStream(stream);
import { processAIStream } from 't140llm';
import Anthropic from "@anthropic-ai/sdk";
// Initialize Anthropic client
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Create a streaming response
const stream = await anthropic.messages.create({
model: "claude-3-sonnet-20240229",
messages: [{ role: "user", content: "Write a short story." }],
stream: true,
});
// Process the stream with t140llm
await processAIStream(stream, 'ws://localhost:8080');
import { processAIStream } from 't140llm';
import MistralClient from "@mistralai/mistralai";
// Initialize Mistral client
const mistral = new MistralClient({
apiKey: process.env.MISTRAL_API_KEY,
});
// Create a streaming response
const stream = await mistral.chat({
model: "mistral-large-latest",
messages: [{ role: "user", content: "Write a short story." }],
stream: true,
});
// Process the stream and convert to T.140
processAIStream(stream);
import { processAIStream } from 't140llm';
import { CohereClient } from "cohere-ai";
// Initialize Cohere client
const cohere = new CohereClient({
token: process.env.COHERE_API_KEY,
});
// Create a streaming response
const stream = await cohere.chatStream({
model: "command",
message: "Write a short story.",
});
// Process the stream and convert to T.140
processAIStream(stream);
import { processAIStream } from 't140llm';
import { EventEmitter } from 'events';
// For Ollama, you'd typically use fetch
async function streamFromOllama() {
const response = await fetch('http://localhost:11434/api/generate', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'llama3',
prompt: 'Write a short story',
stream: true,
}),
});
if (!response.ok) {
throw new Error(`Ollama API error: ${response.statusText}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
// Create an event emitter to simulate a stream
const stream = new EventEmitter();
// Process chunks as they arrive
const processChunks = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
stream.emit('end');
break;
}
const text = decoder.decode(value);
const lines = text.split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const json = JSON.parse(line);
stream.emit('data', json);
} catch (e) {
console.error('Error parsing Ollama response:', e);
}
}
}
};
processChunks().catch(err => stream.emit('error', err));
return stream;
}
// Get the stream and process it
const stream = await streamFromOllama();
processAIStream(stream);
import { processAIStreamToRtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about real-time communication' }],
});
// Process the stream using RTP to a specific IP and port
await processAIStreamToRtp(response, '192.168.1.100', 5004, {
enableFec: true, // Enable Forward Error Correction
processBackspaces: true, // Process backspace characters
});
import { processAIStreamToSrtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
import { randomBytes } from 'crypto';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Generate secure keys for SRTP
const masterKey = randomBytes(16); // 128 bits
const masterSalt = randomBytes(14); // 112 bits
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about secure communications' }],
});
// Process the stream using secure SRTP
await processAIStreamToSrtp(response, '192.168.1.100', {
masterKey,
masterSalt,
enableFec: true,
});
import { processAIStreamToDirectSocket } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about UNIX sockets' }],
});
// Process the stream using a direct UNIX socket
await processAIStreamToDirectSocket(response, '/tmp/t140llm.sock');
import { createT140RtpTransport, processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';
// Pre-create the transport before starting the LLM request
const transport = createT140RtpTransport('192.168.1.100', 5004, {
enableFec: true,
});
// Later, when you're ready to process the stream:
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about latency optimization' }],
});
// Use the pre-created transport
await processAIStream(response, null, {
preCreateConnection: true,
transport: transport,
});
import { processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Create a streaming response with reasoning enabled
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Solve this math problem: 2x + 5 = 13' }],
});
// Set up separate handling for reasoning and text
const handleMetadata = (metadata) => {
if (metadata.type === 'reasoning') {
console.log('Reasoning process:', metadata.content);
// You could send this to a different UI element to show
// the LLM's step-by-step thinking process
}
};
// Process the stream with reasoning handling
await processAIStream(response, 'ws://localhost:8080', {
handleMetadata: true,
metadataCallback: handleMetadata,
sendMetadataOverWebsocket: true, // Send reasoning over WebSocket too
});
import { processAIStreamToRtp } from 't140llm';
import { anthropic } from '@anthropic/sdk';
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-opus-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'What's the weather in San Francisco?' }],
tools: [
{
name: 'get_weather',
description: 'Get the current weather in a location',
input_schema: {
type: 'object',
properties: {
location: {
type: 'string',
description: 'The city and state, e.g. San Francisco, CA',
},
unit: {
type: 'string',
enum: ['celsius', 'fahrenheit'],
description: 'The temperature unit to use'
},
},
required: ['location'],
},
},
],
});
// Process the stream with metadata handling
await processAIStreamToRtp(response, '192.168.1.100', 5004, {
handleMetadata: true,
metadataCallback: (metadata) => {
if (metadata.type === 'tool_call') {
console.log('Tool call received:', metadata.content);
// Here you would handle the tool call and send back results
}
},
sendMetadataOverTransport: true, // Send metadata in the RTP stream
});
import { processAIStream } from 't140llm';
import { anthropic } from '@anthropic/sdk';
// Implement a custom transport
const customTransport = {
send: (data, callback) => {
// Custom logic to transport the data
console.log('Sending data:', data.toString('utf8'));
// For example, you might send to a proprietary protocol
// myCustomProtocol.send(data);
// Call the callback when done
if (callback) callback();
},
close: () => {
// Custom cleanup logic
console.log('Transport closed');
}
};
const client = new anthropic.Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await client.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about custom protocols' }],
});
// Use the custom transport
await processAIStream(response, null, {
transport: customTransport,
});
Use an LLM to dynamically generate steganography algorithms:
import { processAIStreamToStegRtp } from 't140llm';
import OpenAI from 'openai';
import { readFileSync } from 'fs';
// Initialize OpenAI for algorithm generation
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// Load some cover media (in this example, image files)
const coverMedia = [
readFileSync('./cover-image-1.jpg'),
readFileSync('./cover-image-2.jpg'),
readFileSync('./cover-image-3.jpg'),
];
// Generate a steganography algorithm
const prompt = `Create a steganography algorithm in JavaScript that:
1. Takes a data Buffer and cover media Buffer
2. Hides the data within the cover media
3. Returns the modified cover with hidden data
4. Includes a corresponding decode function
5. Uses pure JavaScript with no external dependencies`;
// Get the algorithm from OpenAI
const response = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: prompt }],
temperature: 0.5,
});
const algorithm = response.choices[0].message.content;
// Create a streaming request from Claude or any other LLM
const textStream = await claude.messages.create({
model: 'claude-3-sonnet-20240229',
max_tokens: 1000,
stream: true,
messages: [{ role: 'user', content: 'Tell me about steganography techniques' }],
});
// Process the stream with the LLM-generated steganography algorithm
const transport = processAIStreamToStegRtp(textStream, '192.168.1.100', 5004, {
steganography: {
enabled: true,
encodeMode: 'llm',
algorithm, // Use the LLM-generated algorithm
coverMedia,
seed: 'unique-random-seed-123'
},
enableFec: true, // Enable Forward Error Correction for reliability
});
Create a custom steganography transport with advanced features:
import { StegTransport, createT140RtpTransport } from 't140llm';
import OpenAI from 'openai';
// Create a custom steganography transport that extends the base StegTransport
class EnhancedStegTransport extends StegTransport {
constructor(innerTransport, config) {
super(innerTransport, config);
this.stats = {
packetsEncoded: 0,
bytesHidden: 0,
avgBitChanges: 0
};
}
// Override the encode method to add statistics
encode(data, cover) {
console.log(`Encoding ${data.length} bytes into ${cover.length} bytes cover`);
this.stats.packetsEncoded++;
this.stats.bytesHidden += data.length;
// Call the parent encode method
const encoded = super.encode(data, cover);
// Calculate bit changes for statistics
const changes = this.countBitChanges(cover, encoded);
this.stats.avgBitChanges =
(this.stats.avgBitChanges * (this.stats.packetsEncoded - 1) + changes) /
this.stats.packetsEncoded;
return encoded;
}
// Helper to count how many bits were changed during encoding
countBitChanges(original, modified) {
let count = 0;
const len = Math.min(original.length, modified.length);
for (let i = 0; i < len; i++) {
const xor = original[i] ^ modified[i];
for (let bit = 0; bit < 8; bit++) {
if ((xor >> bit) & 1) count++;
}
}
return count;
}
// Add a method to get statistics
getStats() {
return { ...this.stats };
}
}
// Generate some cover media
function generateCoverMedia(count = 5, size = 10240) {
const media = [];
for (let i = 0; i < count; i++) {
const buffer = Buffer.alloc(size);
for (let j = 0; j < buffer.length; j++) {
buffer[j] = Math.floor(Math.random() * 256);
}
media.push(buffer);
}
return media;
}
// Create a custom pattern-based algorithm
const customAlgorithm = `
function encode(data, cover) {
// Ensure cover is large enough
if (cover.length < data.length * 10) {
throw new Error('Cover media too small for data payload');
}
// Create a copy of the cover
const result = Buffer.from(cover);
// Store data length in first 4 bytes
for (let i = 0; i < 4; i++) {
result[i] = (data.length >> (i * 8)) & 0xFF;
}
// Embed data using pattern-based approach
const patternSize = 4; // 4 bytes per bit
const patternCount = Math.min(data.length * 8, Math.floor((cover.length - 32) / patternSize));
for (let i = 0; i < patternCount; i++) {
const byteIndex = Math.floor(i / 8);
const bitIndex = i % 8;
const bit = (data[byteIndex] >> bitIndex) & 1;
// Position in cover
const coverPos = 32 + (i * patternSize);
// Apply pattern change for bit
if (bit === 1) {
// For '1' bit, make gradient ascending
for (let j = 0; j < patternSize - 1; j++) {
if (result[coverPos + j] >= result[coverPos + j + 1]) {
result[coverPos + j + 1] = result[coverPos + j] + 1;
}
}
} else {
// For '0' bit, make gradient descending
for (let j = 0; j < patternSize - 1; j++) {
if (result[coverPos + j] <= result[coverPos + j + 1]) {
result[coverPos + j + 1] = Math.max(0, result[coverPos + j] - 1);
}
}
}
}
return result;
}
function decode(stegData) {
// Extract data length from first 4 bytes
let dataLength = 0;
for (let i = 0; i < 4; i++) {
dataLength |= stegData[i] << (i * 8);
}
// Validate data length
if (dataLength <= 0 || dataLength > (stegData.length - 32) / 32) {
throw new Error('Invalid data length in steganographic content');
}
// Create result buffer
const result = Buffer.alloc(dataLength);
// Extract data using pattern detection
const patternSize = 4;
const patternCount = Math.min(dataLength * 8, Math.floor((stegData.length - 32) / patternSize));
for (let i = 0; i < patternCount; i++) {
const byteIndex = Math.floor(i / 8);
const bitIndex = i % 8;
// Position in stego data
const stegPos = 32 + (i * patternSize);
// Detect pattern
let ascending = true;
for (let j = 0; j < patternSize - 1; j++) {
if (stegData[stegPos + j] >= stegData[stegPos + j + 1]) {
ascending = false;
break;
}
}
// If pattern is ascending, the bit is 1
if (ascending) {
result[byteIndex] |= (1 << bitIndex);
}
}
return result;
}`;
// Create a simple logging inner transport
const loggingTransport = {
send: (data, callback) => {
console.log(`Sending ${data.length} bytes of data`);
if (callback) callback();
},
close: () => {
console.log('Transport closed');
}
};
// Create an enhanced steganography transport
const coverMedia = generateCoverMedia();
const stegTransport = new EnhancedStegTransport(loggingTransport, {
enabled: true,
encodeMode: 'fixed',
algorithm: customAlgorithm,
coverMedia,
seed: 'custom-steg-example'
});
// Create an RTP transport using our custom steg transport
const rtpTransport = createT140RtpTransport('192.168.1.100', 5004, {
customTransport: stegTransport,
payloadType: 96,
ssrc: 12345
});
// Use the transport with an AI stream (example)
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const stream = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: "Tell me about steganography." }],
stream: true,
});
// Process the AI stream
await processAIStream(stream, null, {
transport: rtpTransport
});
// Log stats at the end
console.log('Steganography stats:', stegTransport.getStats());
You can use separate transport channels for text and reasoning data:
import { processAIStreamToDirectSocket } from 't140llm';
import { EventEmitter } from 'events';
import * as fs from 'fs';
// Create two separate socket paths
const textSocketPath = '/tmp/llm-text.sock';
const reasoningSocketPath = '/tmp/llm-reasoning.sock';
// Clean up any existing sockets
if (fs.existsSync(textSocketPath)) fs.unlinkSync(textSocketPath);
if (fs.existsSync(reasoningSocketPath)) fs.unlinkSync(reasoningSocketPath);
// Create a separate socket for reasoning data
const reasoningSocket = processAIStreamToDirectSocket(
new EventEmitter(), // Empty stream to initialize socket
reasoningSocketPath
);
// Create custom reasoning metadata handler that sends to separate socket
const handleMetadata = (metadata) => {
if (metadata.type === 'reasoning') {
reasoningSocket.write(JSON.stringify({
type: 'reasoning',
content: metadata.content
}) + '\n');
}
};
// Process the main text stream with the reasoning handler
const textSocket = processAIStreamToDirectSocket(
llmStream,
textSocketPath,
{
handleMetadata: true,
metadataCallback: handleMetadata
}
);
// This allows different clients to connect to each socket:
// - One client can show the final LLM text output
// - Another client can show the reasoning process
The library includes built-in Forward Error Correction to recover from packet loss during transmission. Enable it by setting enableFec: true
in your RTP configuration:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
enableFec: true,
fecInterval: 5, // Send FEC packets every 5 data packets
});
Some LLMs might emit backspace characters during text generation. The library can handle these automatically:
await processAIStream(stream, websocketUrl, {
processBackspaces: true,
});
You can control the transmission rate of packets:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
rateLimit: 30, // Limit to 30 packets per second
});
For improved reliability, you can enable packet redundancy:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
redundancy: 2, // Send each packet twice
});
You can customize various RTP parameters:
await processAIStreamToRtp(stream, remoteAddress, remotePort, {
payloadType: 98, // Custom payload type (default: 96)
ssrc: 12345678, // Custom SSRC (default: random)
sequenceNumber: 100, // Starting sequence number (default: random)
timestamp: 0, // Starting timestamp (default: current time)
});