Custom I/O handlers
Purpose
This guide explains how to implement custom I/O systems for Cabriolet, enabling support for custom storage backends, network streams, encrypted files, and other specialized scenarios.
IOSystem Architecture
Understanding the I/O Layer
Cabriolet uses an abstraction layer for all I/O operations, allowing you to customize how files are read and written. The architecture consists of:
-
IOSystem: Factory for creating handles
-
Handle: Abstract interface for I/O operations
-
FileHandle: Standard file system implementation
-
MemoryHandle: In-memory implementation
┌─────────────┐
│ IOSystem │ Factory
└──────┬──────┘
│ creates
▼
┌─────────────┐
│ Handle │ Abstract Interface
└──────┬──────┘
│
┌───┴────┬──────────┬────────────┐
▼ ▼ ▼ ▼
FileHandle MemoryHandle CustomHandle NetworkHandleHandle Interface
All handles must implement these methods:
class Handle
def read(size)
# Read up to size bytes
end
def write(data)
# Write data
end
def seek(offset, whence = IO::SEEK_SET)
# Seek to position
end
def tell
# Return current position
end
def eof?
# Check if at end of file
end
def close
# Close the handle
end
def size
# Return total size
end
endImplementing Custom Handles
Network Stream Handle
Create a handle that reads from network streams:
require 'net/http'
require 'uri'
class NetworkHandle
attr_reader :uri
def initialize(url)
@uri = URI(url)
@position = 0
@buffer = String.new(encoding: Encoding::BINARY)
@total_size = fetch_size
end
def read(size)
# Fetch more data if needed
while @buffer.bytesize - @position < size && !@eof
fetch_chunk
end
# Read from buffer
data = @buffer[@position, size]
@position += data.bytesize if data
data
end
def seek(offset, whence = IO::SEEK_SET)
case whence
when IO::SEEK_SET
@position = offset
when IO::SEEK_CUR
@position += offset
when IO::SEEK_END
@position = @total_size + offset
end
@position = [@position, 0].max
@position
end
def tell
@position
end
def eof?
@position >= @total_size
end
def size
@total_size
end
def close
@buffer.clear
end
private
def fetch_size
response = Net::HTTP.start(@uri.host, @uri.port) do |http|
http.head(@uri.path)
end
response['content-length'].to_i
end
def fetch_chunk(chunk_size = 1024 * 1024)
range_start = @buffer.bytesize
range_end = range_start + chunk_size - 1
response = Net::HTTP.start(@uri.host, @uri.port) do |http|
request = Net::HTTP::Get.new(@uri)
request['Range'] = "bytes=#{range_start}-#{range_end}"
http.request(request)
end
if response.is_a?(Net::HTTPPartialContent)
@buffer << response.body
else
@eof = true
end
end
end
# Usage
handle = NetworkHandle.new('https://example.com/archive.cab')
decompressor = Cabriolet::CAB::Decompressor.new(handle)
decompressor.extract_all('output')Encrypted File Handle
Handle encrypted files transparently:
require 'openssl'
class EncryptedHandle
def initialize(filename, key)
@file = File.open(filename, 'rb')
@cipher = OpenSSL::Cipher.new('AES-256-CBC')
@cipher.decrypt
@cipher.key = key
# Read and process IV (first 16 bytes)
@iv = @file.read(16)
@cipher.iv = @iv
@position = 0
@decrypted_buffer = String.new(encoding: Encoding::BINARY)
@encrypted_size = @file.size - 16
end
def read(size)
# Decrypt more data if needed
while @decrypted_buffer.bytesize - @position < size && !eof?
decrypt_chunk
end
data = @decrypted_buffer[@position, size]
@position += data.bytesize if data
data
end
def seek(offset, whence = IO::SEEK_SET)
# Seeking in encrypted files requires re-decryption
case whence
when IO::SEEK_SET
new_pos = offset
when IO::SEEK_CUR
new_pos = @position + offset
when IO::SEEK_END
new_pos = size + offset
end
if new_pos < @position
# Need to restart decryption
reset_decryption
end
# Decrypt up to new position
while @decrypted_buffer.bytesize < new_pos && !eof?
decrypt_chunk
end
@position = new_pos
end
def tell
@position
end
def eof?
@file.eof?
end
def size
# Encrypted size (may need padding adjustment)
@encrypted_size
end
def close
@file.close
@decrypted_buffer.clear
end
private
def decrypt_chunk(chunk_size = 64 * 1024)
encrypted_data = @file.read(chunk_size)
return unless encrypted_data
decrypted = @cipher.update(encrypted_data)
decrypted << @cipher.final if @file.eof?
@decrypted_buffer << decrypted
end
def reset_decryption
@file.seek(16) # Skip IV
@cipher.reset
@decrypted_buffer.clear
@position = 0
end
end
# Usage
key = OpenSSL::Digest::SHA256.digest('my_password')
handle = EncryptedHandle.new('encrypted_archive.cab.enc', key)
decompressor = Cabriolet::CAB::Decompressor.new(handle)Database BLOB Handle
Read from database BLOB fields:
require 'pg'
class PostgresBlobHandle
def initialize(connection, table, column, id)
@conn = connection
@table = table
@column = column
@id = id
@position = 0
# Get total size
result = @conn.exec_params(
"SELECT LENGTH(#{@column}) FROM #{@table} WHERE id = $1",
[@id]
)
@size = result[0]['length'].to_i
end
def read(size)
# Read chunk from database
result = @conn.exec_params(
"SELECT SUBSTRING(#{@column} FROM $1 FOR $2) as data FROM #{@table} WHERE id = $3",
[@position + 1, size, @id] # PostgreSQL uses 1-based indexing
)
data = result[0]['data']
@position += data.bytesize if data
data
end
def seek(offset, whence = IO::SEEK_SET)
case whence
when IO::SEEK_SET
@position = offset
when IO::SEEK_CUR
@position += offset
when IO::SEEK_END
@position = @size + offset
end
@position = [@position, 0].max
@position
end
def tell
@position
end
def eof?
@position >= @size
end
def size
@size
end
def close
# Connection managed externally
end
end
# Usage
conn = PG.connect(dbname: 'archives')
handle = PostgresBlobHandle.new(conn, 'archives', 'data', 123)
decompressor = Cabriolet::CAB::Decompressor.new(handle)Cloud Storage Handle
Access files from cloud storage:
require 'aws-sdk-s3'
class S3Handle
def initialize(bucket, key, region: 'us-east-1')
@s3 = Aws::S3::Client.new(region: region)
@bucket = bucket
@key = key
@position = 0
# Get object size
response = @s3.head_object(bucket: @bucket, key: @key)
@size = response.content_length
# Buffer for caching
@buffer = String.new(encoding: Encoding::BINARY)
@buffer_start = 0
end
def read(size)
# Check if we need to fetch more data
buffer_end = @buffer_start + @buffer.bytesize
needed_end = @position + size
if @position < @buffer_start || needed_end > buffer_end
fetch_range(@position, size)
end
# Read from buffer
offset = @position - @buffer_start
data = @buffer[offset, size]
@position += data.bytesize if data
data
end
def seek(offset, whence = IO::SEEK_SET)
case whence
when IO::SEEK_SET
@position = offset
when IO::SEEK_CUR
@position += offset
when IO::SEEK_END
@position = @size + offset
end
@position = [@position, 0].max
@position
end
def tell
@position
end
def eof?
@position >= @size
end
def size
@size
end
def close
@buffer.clear
end
private
def fetch_range(start, size, cache_size = 1024 * 1024)
# Fetch more than requested for buffering
fetch_size = [size, cache_size].max
range = "bytes=#{start}-#{start + fetch_size - 1}"
response = @s3.get_object(
bucket: @bucket,
key: @key,
range: range
)
@buffer = response.body.read
@buffer_start = start
end
end
# Usage
handle = S3Handle.new('my-bucket', 'archives/data.cab')
decompressor = Cabriolet::CAB::Decompressor.new(handle)Custom IOSystem Implementation
Creating a Complete IOSystem
Implement a custom IOSystem factory:
class CustomIOSystem
def initialize
@handles = {}
end
def open(filename, mode)
# Determine handle type based on filename pattern
handle = case filename
when /^https?:\/\//
NetworkHandle.new(filename)
when /^s3:\/\/(.+?)\/(.+)/
bucket, key = $1, $2
S3Handle.new(bucket, key)
when /^db:\/\/(.+?)\/(.+?)\/(.+?)\/(.+)/
# Format: db://host/database/table/id
db, table, id = $1, $2, $3
conn = PG.connect(dbname: db)
PostgresBlobHandle.new(conn, table, 'data', id)
when /\.enc$/
# Encrypted file
key = ENV['ENCRYPTION_KEY'] || raise('No encryption key')
EncryptedHandle.new(filename, key)
else
# Default to file system
Cabriolet::System::FileHandle.new(filename, mode)
end
@handles[handle.object_id] = handle
handle
end
def close(handle)
handle.close
@handles.delete(handle.object_id)
end
def close_all
@handles.values.each(&:close)
@handles.clear
end
end
# Usage
io_system = CustomIOSystem.new
# Works with different sources
decompressor1 = Cabriolet::CAB::Decompressor.new(
'https://example.com/archive.cab',
io_system: io_system
)
decompressor2 = Cabriolet::CAB::Decompressor.new(
's3://my-bucket/archive.cab',
io_system: io_system
)
decompressor3 = Cabriolet::CAB::Decompressor.new(
'encrypted_archive.cab.enc',
io_system: io_system
)Advanced Patterns
Caching Layer
Add caching to any handle:
class CachedHandle
def initialize(inner_handle, cache_size: 10 * 1024 * 1024)
@inner = inner_handle
@cache_size = cache_size
@cache = {}
@position = 0
end
def read(size)
# Check cache
cache_key = (@position / @cache_size).floor
unless @cache[cache_key]
# Load cache block
@inner.seek(cache_key * @cache_size)
@cache[cache_key] = @inner.read(@cache_size)
# Limit cache size (LRU)
if @cache.size > 10
@cache.delete(@cache.keys.first)
end
end
# Read from cache
offset = @position % @cache_size
data = @cache[cache_key][offset, size]
@position += data.bytesize if data
data
end
def seek(offset, whence = IO::SEEK_SET)
@inner.seek(offset, whence)
@position = @inner.tell
end
def tell
@position
end
def eof?
@inner.eof?
end
def size
@inner.size
end
def close
@cache.clear
@inner.close
end
end
# Wrap any handle with caching
network_handle = NetworkHandle.new('https://example.com/large.cab')
cached_handle = CachedHandle.new(network_handle)
decompressor = Cabriolet::CAB::Decompressor.new(cached_handle)Progress Tracking
Monitor I/O progress:
class ProgressHandle
def initialize(inner_handle, callback: nil)
@inner = inner_handle
@callback = callback || ->(bytes, total) { puts "#{bytes}/#{total}" }
@bytes_read = 0
@total_size = inner_handle.size
end
def read(size)
data = @inner.read(size)
if data
@bytes_read += data.bytesize
@callback.call(@bytes_read, @total_size)
end
data
end
def seek(offset, whence = IO::SEEK_SET)
@inner.seek(offset, whence)
end
def tell
@inner.tell
end
def eof?
@inner.eof?
end
def size
@total_size
end
def close
@inner.close
end
end
# Usage with progress bar
require 'ruby-progressbar'
progress_bar = ProgressBar.create(
title: 'Extracting',
total: File.size('archive.cab'),
format: '%t: |%B| %p%% %e'
)
handle = ProgressHandle.new(
Cabriolet::System::FileHandle.new('archive.cab', 'rb'),
callback: ->(bytes, total) { progress_bar.progress = bytes }
)
decompressor = Cabriolet::CAB::Decompressor.new(handle)
decompressor.extract_all('output')Compression-Aware Handle
Handle pre-compressed storage:
require 'zlib'
class GzipHandle
def initialize(filename)
@gzip = Zlib::GzipReader.new(File.open(filename, 'rb'))
@position = 0
@buffer = String.new(encoding: Encoding::BINARY)
end
def read(size)
# Decompress on-the-fly
while @buffer.bytesize - @position < size && !@gzip.eof?
@buffer << @gzip.read(64 * 1024)
end
data = @buffer[@position, size]
@position += data.bytesize if data
data
end
def seek(offset, whence = IO::SEEK_SET)
# Can only seek forward in compressed stream
case whence
when IO::SEEK_SET
if offset < @position
# Must restart
@gzip.close
@gzip = Zlib::GzipReader.new(File.open(@filename, 'rb'))
@buffer.clear
@position = 0
end
# Read and discard to reach position
skip_bytes = offset - @position
read(skip_bytes) if skip_bytes > 0
when IO::SEEK_CUR
read(offset) if offset > 0
end
@position
end
def tell
@position
end
def eof?
@gzip.eof? && @position >= @buffer.bytesize
end
def size
# Unknown for compressed streams
-1
end
def close
@gzip.close
end
endTesting Custom Handles
Unit Testing
RSpec.describe NetworkHandle do
let(:url) { 'https://example.com/test.cab' }
let(:handle) { NetworkHandle.new(url) }
describe '#read' do
it 'reads data from network' do
data = handle.read(100)
expect(data).to be_a(String)
expect(data.bytesize).to eq(100)
end
end
describe '#seek' do
it 'seeks to position' do
handle.seek(1000)
expect(handle.tell).to eq(1000)
end
it 'supports relative seeking' do
handle.seek(100)
handle.seek(50, IO::SEEK_CUR)
expect(handle.tell).to eq(150)
end
end
describe '#eof?' do
it 'detects end of file' do
handle.seek(0, IO::SEEK_END)
expect(handle.eof?).to be true
end
end
endBest practices
Performance
-
Buffer reads: Read in chunks, not byte-by-byte
-
Cache aggressively: Store frequently accessed data
-
Minimize seeks: Sequential reads are faster
-
Use async I/O: For network and slow storage
-
Profile performance: Measure actual bottlenecks