Custom I/O handlers

Purpose

This guide explains how to implement custom I/O systems for Cabriolet, enabling support for custom storage backends, network streams, encrypted files, and other specialized scenarios.

IOSystem Architecture

Understanding the I/O Layer

Cabriolet uses an abstraction layer for all I/O operations, allowing you to customize how files are read and written. The architecture consists of:

  • IOSystem: Factory for creating handles

  • Handle: Abstract interface for I/O operations

  • FileHandle: Standard file system implementation

  • MemoryHandle: In-memory implementation

┌─────────────┐
│  IOSystem   │  Factory
└──────┬──────┘
       │ creates
       ▼
┌─────────────┐
│   Handle    │  Abstract Interface
└──────┬──────┘
       │
   ┌───┴────┬──────────┬────────────┐
   ▼        ▼          ▼            ▼
FileHandle  MemoryHandle  CustomHandle  NetworkHandle

Handle Interface

All handles must implement these methods:

class Handle
  def read(size)
    # Read up to size bytes
  end

  def write(data)
    # Write data
  end

  def seek(offset, whence = IO::SEEK_SET)
    # Seek to position
  end

  def tell
    # Return current position
  end

  def eof?
    # Check if at end of file
  end

  def close
    # Close the handle
  end

  def size
    # Return total size
  end
end

Implementing Custom Handles

Network Stream Handle

Create a handle that reads from network streams:

require 'net/http'
require 'uri'

class NetworkHandle
  attr_reader :uri

  def initialize(url)
    @uri = URI(url)
    @position = 0
    @buffer = String.new(encoding: Encoding::BINARY)
    @total_size = fetch_size
  end

  def read(size)
    # Fetch more data if needed
    while @buffer.bytesize - @position < size && !@eof
      fetch_chunk
    end

    # Read from buffer
    data = @buffer[@position, size]
    @position += data.bytesize if data
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    case whence
    when IO::SEEK_SET
      @position = offset
    when IO::SEEK_CUR
      @position += offset
    when IO::SEEK_END
      @position = @total_size + offset
    end

    @position = [@position, 0].max
    @position
  end

  def tell
    @position
  end

  def eof?
    @position >= @total_size
  end

  def size
    @total_size
  end

  def close
    @buffer.clear
  end

  private

  def fetch_size
    response = Net::HTTP.start(@uri.host, @uri.port) do |http|
      http.head(@uri.path)
    end
    response['content-length'].to_i
  end

  def fetch_chunk(chunk_size = 1024 * 1024)
    range_start = @buffer.bytesize
    range_end = range_start + chunk_size - 1

    response = Net::HTTP.start(@uri.host, @uri.port) do |http|
      request = Net::HTTP::Get.new(@uri)
      request['Range'] = "bytes=#{range_start}-#{range_end}"
      http.request(request)
    end

    if response.is_a?(Net::HTTPPartialContent)
      @buffer << response.body
    else
      @eof = true
    end
  end
end

# Usage
handle = NetworkHandle.new('https://example.com/archive.cab')
decompressor = Cabriolet::CAB::Decompressor.new(handle)
decompressor.extract_all('output')

Encrypted File Handle

Handle encrypted files transparently:

require 'openssl'

class EncryptedHandle
  def initialize(filename, key)
    @file = File.open(filename, 'rb')
    @cipher = OpenSSL::Cipher.new('AES-256-CBC')
    @cipher.decrypt
    @cipher.key = key

    # Read and process IV (first 16 bytes)
    @iv = @file.read(16)
    @cipher.iv = @iv

    @position = 0
    @decrypted_buffer = String.new(encoding: Encoding::BINARY)
    @encrypted_size = @file.size - 16
  end

  def read(size)
    # Decrypt more data if needed
    while @decrypted_buffer.bytesize - @position < size && !eof?
      decrypt_chunk
    end

    data = @decrypted_buffer[@position, size]
    @position += data.bytesize if data
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    # Seeking in encrypted files requires re-decryption
    case whence
    when IO::SEEK_SET
      new_pos = offset
    when IO::SEEK_CUR
      new_pos = @position + offset
    when IO::SEEK_END
      new_pos = size + offset
    end

    if new_pos < @position
      # Need to restart decryption
      reset_decryption
    end

    # Decrypt up to new position
    while @decrypted_buffer.bytesize < new_pos && !eof?
      decrypt_chunk
    end

    @position = new_pos
  end

  def tell
    @position
  end

  def eof?
    @file.eof?
  end

  def size
    # Encrypted size (may need padding adjustment)
    @encrypted_size
  end

  def close
    @file.close
    @decrypted_buffer.clear
  end

  private

  def decrypt_chunk(chunk_size = 64 * 1024)
    encrypted_data = @file.read(chunk_size)
    return unless encrypted_data

    decrypted = @cipher.update(encrypted_data)
    decrypted << @cipher.final if @file.eof?

    @decrypted_buffer << decrypted
  end

  def reset_decryption
    @file.seek(16)  # Skip IV
    @cipher.reset
    @decrypted_buffer.clear
    @position = 0
  end
end

# Usage
key = OpenSSL::Digest::SHA256.digest('my_password')
handle = EncryptedHandle.new('encrypted_archive.cab.enc', key)
decompressor = Cabriolet::CAB::Decompressor.new(handle)

Database BLOB Handle

Read from database BLOB fields:

require 'pg'

class PostgresBlobHandle
  def initialize(connection, table, column, id)
    @conn = connection
    @table = table
    @column = column
    @id = id
    @position = 0

    # Get total size
    result = @conn.exec_params(
      "SELECT LENGTH(#{@column}) FROM #{@table} WHERE id = $1",
      [@id]
    )
    @size = result[0]['length'].to_i
  end

  def read(size)
    # Read chunk from database
    result = @conn.exec_params(
      "SELECT SUBSTRING(#{@column} FROM $1 FOR $2) as data FROM #{@table} WHERE id = $3",
      [@position + 1, size, @id]  # PostgreSQL uses 1-based indexing
    )

    data = result[0]['data']
    @position += data.bytesize if data
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    case whence
    when IO::SEEK_SET
      @position = offset
    when IO::SEEK_CUR
      @position += offset
    when IO::SEEK_END
      @position = @size + offset
    end

    @position = [@position, 0].max
    @position
  end

  def tell
    @position
  end

  def eof?
    @position >= @size
  end

  def size
    @size
  end

  def close
    # Connection managed externally
  end
end

# Usage
conn = PG.connect(dbname: 'archives')
handle = PostgresBlobHandle.new(conn, 'archives', 'data', 123)
decompressor = Cabriolet::CAB::Decompressor.new(handle)

Cloud Storage Handle

Access files from cloud storage:

require 'aws-sdk-s3'

class S3Handle
  def initialize(bucket, key, region: 'us-east-1')
    @s3 = Aws::S3::Client.new(region: region)
    @bucket = bucket
    @key = key
    @position = 0

    # Get object size
    response = @s3.head_object(bucket: @bucket, key: @key)
    @size = response.content_length

    # Buffer for caching
    @buffer = String.new(encoding: Encoding::BINARY)
    @buffer_start = 0
  end

  def read(size)
    # Check if we need to fetch more data
    buffer_end = @buffer_start + @buffer.bytesize
    needed_end = @position + size

    if @position < @buffer_start || needed_end > buffer_end
      fetch_range(@position, size)
    end

    # Read from buffer
    offset = @position - @buffer_start
    data = @buffer[offset, size]
    @position += data.bytesize if data
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    case whence
    when IO::SEEK_SET
      @position = offset
    when IO::SEEK_CUR
      @position += offset
    when IO::SEEK_END
      @position = @size + offset
    end

    @position = [@position, 0].max
    @position
  end

  def tell
    @position
  end

  def eof?
    @position >= @size
  end

  def size
    @size
  end

  def close
    @buffer.clear
  end

  private

  def fetch_range(start, size, cache_size = 1024 * 1024)
    # Fetch more than requested for buffering
    fetch_size = [size, cache_size].max
    range = "bytes=#{start}-#{start + fetch_size - 1}"

    response = @s3.get_object(
      bucket: @bucket,
      key: @key,
      range: range
    )

    @buffer = response.body.read
    @buffer_start = start
  end
end

# Usage
handle = S3Handle.new('my-bucket', 'archives/data.cab')
decompressor = Cabriolet::CAB::Decompressor.new(handle)

Custom IOSystem Implementation

Creating a Complete IOSystem

Implement a custom IOSystem factory:

class CustomIOSystem
  def initialize
    @handles = {}
  end

  def open(filename, mode)
    # Determine handle type based on filename pattern
    handle = case filename
    when /^https?:\/\//
      NetworkHandle.new(filename)
    when /^s3:\/\/(.+?)\/(.+)/
      bucket, key = $1, $2
      S3Handle.new(bucket, key)
    when /^db:\/\/(.+?)\/(.+?)\/(.+?)\/(.+)/
      # Format: db://host/database/table/id
      db, table, id = $1, $2, $3
      conn = PG.connect(dbname: db)
      PostgresBlobHandle.new(conn, table, 'data', id)
    when /\.enc$/
      # Encrypted file
      key = ENV['ENCRYPTION_KEY'] || raise('No encryption key')
      EncryptedHandle.new(filename, key)
    else
      # Default to file system
      Cabriolet::System::FileHandle.new(filename, mode)
    end

    @handles[handle.object_id] = handle
    handle
  end

  def close(handle)
    handle.close
    @handles.delete(handle.object_id)
  end

  def close_all
    @handles.values.each(&:close)
    @handles.clear
  end
end

# Usage
io_system = CustomIOSystem.new

# Works with different sources
decompressor1 = Cabriolet::CAB::Decompressor.new(
  'https://example.com/archive.cab',
  io_system: io_system
)

decompressor2 = Cabriolet::CAB::Decompressor.new(
  's3://my-bucket/archive.cab',
  io_system: io_system
)

decompressor3 = Cabriolet::CAB::Decompressor.new(
  'encrypted_archive.cab.enc',
  io_system: io_system
)

Advanced Patterns

Caching Layer

Add caching to any handle:

class CachedHandle
  def initialize(inner_handle, cache_size: 10 * 1024 * 1024)
    @inner = inner_handle
    @cache_size = cache_size
    @cache = {}
    @position = 0
  end

  def read(size)
    # Check cache
    cache_key = (@position / @cache_size).floor

    unless @cache[cache_key]
      # Load cache block
      @inner.seek(cache_key * @cache_size)
      @cache[cache_key] = @inner.read(@cache_size)

      # Limit cache size (LRU)
      if @cache.size > 10
        @cache.delete(@cache.keys.first)
      end
    end

    # Read from cache
    offset = @position % @cache_size
    data = @cache[cache_key][offset, size]
    @position += data.bytesize if data
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    @inner.seek(offset, whence)
    @position = @inner.tell
  end

  def tell
    @position
  end

  def eof?
    @inner.eof?
  end

  def size
    @inner.size
  end

  def close
    @cache.clear
    @inner.close
  end
end

# Wrap any handle with caching
network_handle = NetworkHandle.new('https://example.com/large.cab')
cached_handle = CachedHandle.new(network_handle)
decompressor = Cabriolet::CAB::Decompressor.new(cached_handle)

Progress Tracking

Monitor I/O progress:

class ProgressHandle
  def initialize(inner_handle, callback: nil)
    @inner = inner_handle
    @callback = callback || ->(bytes, total) { puts "#{bytes}/#{total}" }
    @bytes_read = 0
    @total_size = inner_handle.size
  end

  def read(size)
    data = @inner.read(size)
    if data
      @bytes_read += data.bytesize
      @callback.call(@bytes_read, @total_size)
    end
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    @inner.seek(offset, whence)
  end

  def tell
    @inner.tell
  end

  def eof?
    @inner.eof?
  end

  def size
    @total_size
  end

  def close
    @inner.close
  end
end

# Usage with progress bar
require 'ruby-progressbar'

progress_bar = ProgressBar.create(
  title: 'Extracting',
  total: File.size('archive.cab'),
  format: '%t: |%B| %p%% %e'
)

handle = ProgressHandle.new(
  Cabriolet::System::FileHandle.new('archive.cab', 'rb'),
  callback: ->(bytes, total) { progress_bar.progress = bytes }
)

decompressor = Cabriolet::CAB::Decompressor.new(handle)
decompressor.extract_all('output')

Compression-Aware Handle

Handle pre-compressed storage:

require 'zlib'

class GzipHandle
  def initialize(filename)
    @gzip = Zlib::GzipReader.new(File.open(filename, 'rb'))
    @position = 0
    @buffer = String.new(encoding: Encoding::BINARY)
  end

  def read(size)
    # Decompress on-the-fly
    while @buffer.bytesize - @position < size && !@gzip.eof?
      @buffer << @gzip.read(64 * 1024)
    end

    data = @buffer[@position, size]
    @position += data.bytesize if data
    data
  end

  def seek(offset, whence = IO::SEEK_SET)
    # Can only seek forward in compressed stream
    case whence
    when IO::SEEK_SET
      if offset < @position
        # Must restart
        @gzip.close
        @gzip = Zlib::GzipReader.new(File.open(@filename, 'rb'))
        @buffer.clear
        @position = 0
      end

      # Read and discard to reach position
      skip_bytes = offset - @position
      read(skip_bytes) if skip_bytes > 0
    when IO::SEEK_CUR
      read(offset) if offset > 0
    end

    @position
  end

  def tell
    @position
  end

  def eof?
    @gzip.eof? && @position >= @buffer.bytesize
  end

  def size
    # Unknown for compressed streams
    -1
  end

  def close
    @gzip.close
  end
end

Testing Custom Handles

Unit Testing

RSpec.describe NetworkHandle do
  let(:url) { 'https://example.com/test.cab' }
  let(:handle) { NetworkHandle.new(url) }

  describe '#read' do
    it 'reads data from network' do
      data = handle.read(100)
      expect(data).to be_a(String)
      expect(data.bytesize).to eq(100)
    end
  end

  describe '#seek' do
    it 'seeks to position' do
      handle.seek(1000)
      expect(handle.tell).to eq(1000)
    end

    it 'supports relative seeking' do
      handle.seek(100)
      handle.seek(50, IO::SEEK_CUR)
      expect(handle.tell).to eq(150)
    end
  end

  describe '#eof?' do
    it 'detects end of file' do
      handle.seek(0, IO::SEEK_END)
      expect(handle.eof?).to be true
    end
  end
end

Best practices

Performance

  1. Buffer reads: Read in chunks, not byte-by-byte

  2. Cache aggressively: Store frequently accessed data

  3. Minimize seeks: Sequential reads are faster

  4. Use async I/O: For network and slow storage

  5. Profile performance: Measure actual bottlenecks

Error handling

  1. Validate inputs: Check parameters before operations

  2. Handle network errors: Retry with exponential backoff

  3. Clean up resources: Always close handles

  4. Provide context: Include position and operation in errors

  5. Support recovery: Allow resuming after errors

Compatibility

  1. Follow interface: Implement all required methods

  2. Handle edge cases: Empty files, zero-length reads

  3. Support standard modes: 'r', 'rb', 'w', 'wb'

  4. Document limitations: Explain constraints clearly

  5. Test thoroughly: Cover all code paths