HELIX-HAL: Instrument Driver Interface Contract
Hardware Abstraction Layer — Rust Trait System & Driver Architecture Specification
Document ID: HELIX-HAL-0001
Status: Normative
Revision: 1.3.0
Authors: Embedded Systems Engineering Group, HELIX OS
Applies To: helix-hal ≥ 2.4, helix-core ≥ 2.4
Companion Specs: HELIX-ARCH-0002 (Zero-Copy Memory), HELIX-COMP-0001 (Audit Ledger)
Purpose. This document is the definitive contract between the HELIX OS core runtime and any instrument driver — whether first-party, third-party, or site-developed. A driver that correctly implements every trait and invariant defined here will integrate with HELIX OS's telemetry pipeline, backpressure system, audit ledger, and hot-reload infrastructure without modification to the core. A driver that violates any invariant will be isolated, unloaded, and replaced without disrupting other instruments or the core runtime.
This document is written for engineers implementing drivers. It is not a tutorial for instrument operators.
Table of Contents
- Architectural Overview
- Core Trait Definitions
- Error Handling Taxonomy
- Backpressure Architecture
- Async Runtime Contract
- Hot-Reload Architecture
- Reference Driver Implementations
- Driver Validation & Certification
- Appendix A — Full Trait Reference
- Appendix B — Driver Manifest Schema
1. Architectural Overview
The HAL sits between raw hardware protocols and the HELIX OS data pipeline. Its job is to present every instrument — regardless of whether it speaks OPC-UA, Modbus TCP, a proprietary serial protocol, or a vendor SDK — as a uniform source of TelemetryFrame values flowing into the zero-copy ring buffer defined in HELIX-ARCH-0002.
┌──────────────────────────────────────────────────────────────────────────┐
│ HELIX OS Core Runtime │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Ring Buffer │ │ Audit Ledger │ │ Alarm Bus │ │
│ │ (HELIX-ARCH- │ │ (HELIX-COMP- │ │ (async MPSC) │ │
│ │ 0002) │ │ 0001) │ │ │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬─────────┘ │
│ │ │ │ │
│ └──────────────────────┼──────────────────────┘ │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ │ HAL Supervisor │ │
│ │ · Driver registry │ │
│ │ · Lifecycle management │ │
│ │ · Backpressure coordinator │ │
│ │ · Hot-reload orchestrator │ │
│ └──────────────┬──────────────┘ │
│ │ │
└───────────────────────────────────┼─────────────────────────────────────┘
│ Trait boundary — the ABI contract
│ Everything below is driver territory
┌─────────────────────────┼───────────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌────────────────────────┐ ┌─────────────────────┐
│ sartorius.so │ │ illumina-dragen.so │ │ opcua-generic.so │
│ │ │ │ │ │
│ impl Instrument │ │ impl Instrument │ │ impl Instrument │
│ impl Telemetry │ │ impl TelemetrySource │ │ impl TelemetrySource│
│ impl Commandable│ │ (read-only instrument) │ │ impl Commandable │
│ impl Calibratable│ │ │ │ │
└─────────┬────────┘ └────────────┬────────────┘ └──────────┬──────────┘
│ │ │
▼ ▼ ▼
[Sartorius [Illumina NovaSeq X / [Any OPC-UA Server
Biostat B|DCU DRAGEN Platform Bioreactor, Analyzer,
Ethernet/RS-232] PCIe + proprietary SDK] PLC, MES Gateway]
The non-negotiable principle. The HELIX OS core has zero knowledge of instrument-specific protocols, vendor SDKs, or wire formats. All of that complexity lives in the driver. The core knows only the traits defined in this document. A driver can implement a vendor SDK written in C++ via FFI, a pure-Rust async TCP client, or a Linux kernel character device — the core does not care, as long as the trait contract is honored.
2. Core Trait Definitions
2.1 The Instrument Trait
The Instrument trait is the base capability that every driver must implement. It handles connection lifecycle, identity, and capability advertisement.
// helix-hal/src/traits/instrument.rs
use async_trait::async_trait;
use std::collections::HashMap;
use crate::{
InstrumentError, InstrumentId, InstrumentCapabilities,
ConnectionConfig, InstrumentStatus, HealthReport,
};
/// The foundational trait for all HELIX OS instrument drivers.
///
/// # Implementation Requirements
///
/// 1. `connect()` MUST be idempotent. Calling it on an already-connected
/// instrument MUST return `Ok(())` without side effects.
///
/// 2. `disconnect()` MUST be safe to call at any time, including before
/// `connect()` has been called successfully. It must release all OS
/// resources (file descriptors, sockets, DMA buffers) acquired by `connect()`.
///
/// 3. `health_check()` MUST complete within 500ms. Drivers that communicate
/// with instruments over a network must use a timeout. A health check that
/// blocks indefinitely will cause the HAL supervisor to forcibly unload the driver.
///
/// 4. `instrument_id()` MUST return a stable identifier across reconnections.
/// The ID is derived from the instrument's serial number or MAC address,
/// never from a runtime-assigned handle.
///
/// 5. `capabilities()` MUST return a consistent value for the lifetime of the
/// driver instance. Capabilities cannot change after initialization.
#[async_trait]
pub trait Instrument: Send + Sync + 'static {
/// Returns the stable, unique identifier for this instrument.
///
/// The format is: `{vendor}:{model}:{serial}` — all lowercase, hyphens
/// replacing spaces. Example: `sartorius:biostat-b:SN-2024-00417`
///
/// This ID is embedded in every `TelemetryFrame.instrument_id` produced
/// by this driver and in every ledger entry referencing this instrument.
fn instrument_id(&self) -> &InstrumentId;
/// Returns a human-readable display name for operator UIs.
/// Example: "Sartorius Biostat B — BRX-07 (Upstream Suite A)"
fn display_name(&self) -> &str;
/// Returns the set of capabilities this driver supports.
///
/// The HAL supervisor uses this to determine which optional traits
/// (`Commandable`, `Calibratable`) are safe to call on this driver.
/// Advertising a capability without implementing the trait is a
/// contract violation that will be detected during driver certification (§8).
fn capabilities(&self) -> InstrumentCapabilities;
/// Establish the connection to the physical instrument.
///
/// This method is responsible for:
/// - Opening the TCP socket, serial port, or USB device
/// - Performing vendor-specific handshake and authentication
/// - Validating the instrument's firmware version against the driver's
/// compatibility matrix (§10 — DriverManifest)
/// - Allocating any driver-internal buffers (must use the allocator
/// provided by the HAL supervisor, NOT the system allocator, to enable
/// memory accounting and limit enforcement)
///
/// On success, the instrument transitions to `InstrumentStatus::Connected`.
///
/// # Timeout
/// The HAL supervisor enforces a 30-second timeout on this call.
/// Drivers that require longer connection times must implement a
/// background connection task and return `Ok(())` immediately,
/// reporting status transitions via the `status_sink` parameter.
async fn connect(&mut self, config: &ConnectionConfig) -> Result<(), InstrumentError>;
/// Gracefully disconnect from the physical instrument.
///
/// Must release all OS resources. Must be callable multiple times
/// without panicking. Must not block for longer than 5 seconds —
/// use a timeout when sending a graceful disconnect command to the instrument.
async fn disconnect(&mut self) -> Result<(), InstrumentError>;
/// Returns the current connection and operational status.
fn status(&self) -> InstrumentStatus;
/// Perform an active health check against the physical instrument.
///
/// This is more than a status query — it should actively ping the instrument
/// or query a known diagnostic register. The HAL supervisor calls this
/// every 30 seconds (configurable) to detect silent connection failures
/// (e.g., a TCP connection that is open but the instrument has locked up).
///
/// # Timeout
/// MUST complete within 500ms. The HAL supervisor enforces this.
async fn health_check(&self) -> Result<HealthReport, InstrumentError>;
/// Returns driver metadata for display in the HELIX OS instrument registry
/// and for inclusion in audit ledger entries.
fn driver_info(&self) -> DriverInfo;
}
/// Capabilities bitmap. Determines which optional traits are active.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InstrumentCapabilities {
pub streams_telemetry: bool, // impl TelemetrySource
pub accepts_commands: bool, // impl Commandable
pub supports_calibration: bool, // impl Calibratable
pub supports_simulation_mode: bool, // Can produce synthetic data for testing
pub has_onboard_logging: bool, // Instrument has its own local data store
}
/// Current lifecycle and operational state of an instrument.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InstrumentStatus {
Uninitialized,
Connecting,
Connected,
Streaming,
Degraded { reason_code: u16 },
Disconnecting,
Disconnected,
FaultIsolated, // Driver is loaded but instrument is quarantined
}
#[derive(Debug, Clone)]
pub struct HealthReport {
pub status: InstrumentStatus,
pub round_trip_latency_us: u64,
pub firmware_version: String,
pub instrument_uptime_seconds: u64,
pub active_alarms: Vec<InstrumentAlarm>,
pub diagnostics: HashMap<String, String>, // Vendor-specific key-value pairs
}
#[derive(Debug, Clone)]
pub struct DriverInfo {
pub driver_name: String, // "helix-sartorius-biostat"
pub driver_version: String, // SemVer: "2.4.1"
pub driver_author: String,
pub supported_protocols: Vec<String>, // ["Sartorius-DCU", "Modbus-TCP"]
pub min_firmware_version: String,
pub max_firmware_version: Option<String>,
}
2.2 The TelemetrySource Trait
The TelemetrySource trait defines how a driver produces a continuous stream of TelemetryFrame values and feeds them into the HELIX OS ring buffer.
// helix-hal/src/traits/telemetry.rs
use async_trait::async_trait;
use tokio::sync::mpsc;
use crate::{TelemetryFrame, InstrumentError, ChannelDescriptor, StreamConfig};
/// Implemented by instruments that produce continuous measurement data.
///
/// # The Streaming Model
///
/// The driver does NOT write directly to the ring buffer. Instead, it sends
/// `TelemetryFrame` values to a bounded `mpsc` channel provided by the HAL
/// supervisor. The supervisor's ring buffer writer task drains this channel
/// and writes to the zero-copy ring buffer. This indirection:
///
/// 1. Isolates driver crashes — a panicking driver does not corrupt the ring buffer.
/// 2. Enables the backpressure system (§4) to apply independently per driver.
/// 3. Allows the supervisor to insert audit ledger entries for dropped frames
/// without the driver needing to know about the ledger.
///
/// # Invariants
///
/// - `stream_telemetry()` MUST NOT return until `stop_signal` is received or
/// an unrecoverable error occurs. It is a long-running async task.
///
/// - `stream_telemetry()` MUST NOT block the async executor. All blocking I/O
/// (serial reads, socket reads, vendor SDK calls that block) must be
/// performed in `tokio::task::spawn_blocking` (see §5.3).
///
/// - Every `TelemetryFrame` sent to the `tx` channel MUST have its `crc32c`
/// field correctly computed over bytes 0..56 of the frame. The ring buffer
/// writer validates this; frames with invalid CRCs are dropped and logged.
///
/// - `TelemetryFrame.sequence` MUST be strictly monotonic per channel.
/// The driver is responsible for maintaining per-channel sequence counters.
/// Gaps in the sequence number trigger a `DropEvent` alarm in the core.
#[async_trait]
pub trait TelemetrySource: Instrument {
/// Returns the list of measurement channels this instrument produces.
///
/// Example for a Sartorius Biostat B:
/// [
/// ChannelDescriptor { id: 0, name: "DO", unit: "mg/L", rate_hz: 1.0 },
/// ChannelDescriptor { id: 1, name: "pH", unit: "pH", rate_hz: 1.0 },
/// ChannelDescriptor { id: 2, name: "temperature", unit: "Celsius", rate_hz: 1.0 },
/// ChannelDescriptor { id: 3, name: "agitation", unit: "RPM", rate_hz: 2.0 },
/// ChannelDescriptor { id: 4, name: "feed_rate", unit: "L/h", rate_hz: 1.0 },
/// ]
fn channels(&self) -> Vec<ChannelDescriptor>;
/// Start streaming telemetry. This is a long-running async task.
///
/// # Parameters
/// - `tx`: The bounded sender channel. The driver sends `TelemetryFrame`
/// values here. The channel capacity determines backpressure behavior (§4).
/// The driver MUST handle `SendError` (channel full) according to the
/// `StreamConfig.drop_policy` — see §4.3.
///
/// - `config`: Stream configuration including sampling rates, channel mask,
/// and backpressure drop policy.
///
/// - `stop_signal`: A one-shot channel that fires when the HAL supervisor
/// wants the stream to stop. The driver MUST check this signal at every
/// iteration of its polling loop and return within 2 seconds of receiving it.
///
/// # Return
/// - `Ok(StreamStopReason::Requested)`: Normal stop via `stop_signal`.
/// - `Ok(StreamStopReason::InstrumentEOF)`: Instrument signaled end of data
/// (e.g., a sequencer run is complete).
/// - `Err(InstrumentError::*)`: Unrecoverable streaming error. The HAL
/// supervisor will attempt reconnection per the configured retry policy.
async fn stream_telemetry(
&mut self,
tx: mpsc::Sender<TelemetryFrame>,
config: StreamConfig,
stop_signal: tokio::sync::oneshot::Receiver<()>,
) -> Result<StreamStopReason, InstrumentError>;
/// Returns the current streaming statistics for monitoring and diagnostics.
///
/// The HAL supervisor polls this every 10 seconds. Values are published
/// to the HELIX OS metrics endpoint and included in health reports.
fn streaming_stats(&self) -> StreamingStats;
}
/// Describes a single measurement channel produced by an instrument.
#[derive(Debug, Clone)]
pub struct ChannelDescriptor {
/// Channel index, 0-based. Matches `TelemetryFrame.channel_id`.
pub id: u16,
/// Human-readable channel name. Used in UI and audit records.
pub name: String,
/// UCUM unit string. Example: "Cel", "pH", "[pH]", "L/h", "RPM", "mg/L"
pub unit: String,
/// Nominal measurement rate in Hz. Actual rate may vary; this is advisory.
pub nominal_rate_hz: f64,
/// Measurement range. Used by the UI for axis scaling and alarm configuration.
pub range: ChannelRange,
/// Resolution (smallest meaningful change). Used for SPC alarm sensitivity.
pub resolution: f64,
/// Whether this channel supports setpoint writing via `Commandable`.
pub is_writable: bool,
}
#[derive(Debug, Clone, Copy)]
pub struct ChannelRange {
pub min: f64,
pub max: f64,
}
/// Configuration for a telemetry stream.
#[derive(Debug, Clone)]
pub struct StreamConfig {
/// Which channels to include. Empty = all channels.
pub channel_mask: Vec<u16>,
/// Override sampling rate (Hz) per channel. None = instrument default.
pub rate_overrides: std::collections::HashMap<u16, f64>,
/// How to handle a full channel when `tx.send()` would block.
pub drop_policy: DropPolicy,
/// Enable simulation mode (synthetic data). Requires `InstrumentCapabilities.supports_simulation_mode`.
pub simulation_mode: bool,
}
/// Why a telemetry stream terminated normally.
#[derive(Debug, Clone, Copy)]
pub enum StreamStopReason {
Requested, // Stopped by HAL supervisor via stop_signal
InstrumentEOF, // Instrument signaled end of run (e.g., sequencer complete)
RunComplete, // Batch run finished; instrument returned to idle
}
#[derive(Debug, Clone, Copy, Default)]
pub struct StreamingStats {
pub frames_sent: u64,
pub frames_dropped_backpressure: u64,
pub frames_dropped_crc_error: u64,
pub channel_sequence_gaps: u64,
pub last_frame_timestamp_ns: u64,
pub current_rate_hz: f64,
}
2.3 The Commandable Trait
// helix-hal/src/traits/commandable.rs
use async_trait::async_trait;
use crate::{InstrumentError, InstrumentCommand, CommandResult, CommandAuditRecord};
/// Implemented by instruments that accept setpoint or control commands.
///
/// # Safety — The Write Path
///
/// The `dispatch_command()` method is the ONLY path through which HELIX OS
/// writes to a physical instrument. It is a regulated write path: every call
/// produces an audit ledger entry (via the `audit_sink` parameter) regardless
/// of whether the command succeeds or fails.
///
/// **A driver MUST NOT write to instrument hardware via any path other than
/// `dispatch_command()`.** Background reconnection logic that re-applies last
/// known setpoints is the only exception, and it must still log via the
/// driver's internal command history, which is replayed to the audit ledger
/// on reconnection.
///
/// # Invariant: Acknowledge Before Execute
///
/// The method must return `Ok(CommandResult)` only after the instrument has
/// acknowledged receipt of the command (e.g., the instrument has echoed the
/// setpoint back via its status response). Returning `Ok` on a fire-and-forget
/// UDP datagram without acknowledgment is a contract violation.
///
/// Acknowledgment is distinct from execution. Returning `Ok` does not mean
/// the new setpoint has been reached — only that the instrument has accepted
/// the command and will attempt to execute it.
#[async_trait]
pub trait Commandable: Instrument {
/// Dispatch a command to the instrument and await acknowledgment.
///
/// # Parameters
/// - `command`: The command to execute (see `InstrumentCommand` below).
/// - `audit_sink`: A channel for the driver to submit an audit record.
/// The driver MUST send exactly one `CommandAuditRecord` per `dispatch_command()`
/// call, regardless of success or failure. The audit record must be sent
/// AFTER the instrument acknowledgment (or after determining the command failed),
/// never before.
///
/// # Timeout
/// The HAL supervisor enforces a 10-second timeout on `dispatch_command()`.
/// Instruments that require longer execution times should acknowledge within
/// 10 seconds and report execution progress via telemetry.
async fn dispatch_command(
&mut self,
command: InstrumentCommand,
audit_sink: tokio::sync::mpsc::Sender<CommandAuditRecord>,
) -> Result<CommandResult, InstrumentError>;
/// Returns the list of commands this instrument supports.
///
/// The HAL supervisor uses this to validate commands before dispatching.
/// Commands not in this list are rejected before reaching `dispatch_command()`.
fn supported_commands(&self) -> Vec<CommandDescriptor>;
/// Returns the last known setpoint state for all writable channels.
///
/// Used by the HAL supervisor to restore setpoints after reconnection.
/// Values must match what the instrument currently has configured —
/// not what was last sent. If the instrument does not support setpoint
/// readback, return `Err(InstrumentError::OperationNotSupported)`.
async fn read_setpoints(&self) -> Result<SetpointSnapshot, InstrumentError>;
}
/// A command to dispatch to an instrument.
///
/// All commands are strongly typed. There is no "raw bytes" escape hatch —
/// if a vendor protocol requires a command that doesn't map to this enum,
/// the driver must translate it internally.
#[derive(Debug, Clone)]
pub enum InstrumentCommand {
/// Set a process parameter to a new target value.
SetSetpoint {
channel_id: u16,
value: f64,
unit: String,
/// The human-readable rationale for this change.
/// Required for GMP-regulated parameters (identified by channel metadata).
/// Empty string is rejected for regulated channels.
rationale: String,
},
/// Enable or disable a specific control loop.
SetControlLoopEnabled {
channel_id: u16,
enabled: bool,
rationale: String,
},
/// Trigger an in-place operation (e.g., auto-zero, auto-calibration).
TriggerOperation {
operation: InstrumentOperation,
parameters: std::collections::HashMap<String, String>,
rationale: String,
},
/// Change the operational mode of the instrument.
SetMode {
mode: InstrumentMode,
rationale: String,
},
/// Acknowledge an instrument alarm. Does not clear the underlying condition.
AcknowledgeAlarm {
alarm_id: u32,
acknowledged_by: String, // LDAP DN of the acknowledging user
comment: String,
},
}
/// The result of a successfully acknowledged command.
#[derive(Debug, Clone)]
pub struct CommandResult {
/// The value the instrument confirmed as its new setpoint (may differ
/// from the requested value due to quantization or clamping).
pub acknowledged_value: Option<f64>,
/// Instrument-reported execution timestamp (if available from the instrument's
/// own clock). None if the instrument does not report execution timestamps.
pub instrument_timestamp: Option<u64>,
/// Any warnings the instrument reported alongside the acknowledgment.
/// Example: "Setpoint accepted but exceeds recommended operating range."
pub warnings: Vec<String>,
}
/// The audit record that MUST be submitted for every command dispatch.
#[derive(Debug, Clone)]
pub struct CommandAuditRecord {
pub instrument_id: String,
pub command: InstrumentCommand,
pub result: Result<CommandResult, String>, // Ok = acknowledged, Err = failure reason
pub dispatch_timestamp_ns: u64,
pub acknowledge_timestamp_ns: u64,
pub round_trip_latency_us: u64,
}
#[derive(Debug, Clone)]
pub struct CommandDescriptor {
pub command_type: &'static str, // "SetSetpoint", "TriggerOperation", etc.
pub channel_ids: Vec<u16>, // Which channels this command applies to
pub requires_rationale: bool,
pub is_gmp_regulated: bool, // Triggers mandatory audit entry
pub min_value: Option<f64>,
pub max_value: Option<f64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InstrumentOperation {
AutoZero,
AutoCalibrate,
PurgeLines,
SelfTest,
EmergencyStop,
ResetAlarms,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InstrumentMode {
Run,
Hold,
CleanInPlace,
SterilizeInPlace,
Standby,
}
#[derive(Debug, Clone)]
pub struct SetpointSnapshot {
pub channel_setpoints: std::collections::HashMap<u16, f64>,
pub snapshot_timestamp_ns: u64,
}
2.4 The Calibratable Trait
// helix-hal/src/traits/calibratable.rs
use async_trait::async_trait;
use crate::{InstrumentError, CalibrationRecord, CalibrationPoint, CalibrationStatus};
/// Implemented by instruments that support calibration operations.
///
/// # Regulatory Context
///
/// Calibration records are a primary target of FDA inspection. Every calibration
/// event — including failed calibrations — must produce a `CalibrationRecord`
/// that is persisted to the audit ledger. The driver is not responsible for
/// ledger persistence; it returns `CalibrationRecord` values which the HAL
/// supervisor persists. The driver is responsible for the accuracy of the
/// calibration data it returns.
#[async_trait]
pub trait Calibratable: Instrument {
/// Read the current calibration status for all calibratable channels.
async fn calibration_status(&self) -> Result<Vec<CalibrationStatus>, InstrumentError>;
/// Execute a single-point or multi-point calibration procedure.
///
/// # Parameters
/// - `channel_id`: The channel to calibrate.
/// - `points`: One or more reference standard values. The instrument will
/// be asked to record its measured value at each point.
///
/// # Blocking Behavior
/// Calibration can take 30–600 seconds (DO probe polarization, pH two-point
/// calibration, etc.). This method is `async` and must yield to the executor
/// while waiting for the instrument. Use `tokio::time::sleep` between polls,
/// not a busy-wait loop.
///
/// # Invariant
/// This method MUST NOT return `Ok` if the calibration did not complete.
/// Partial calibrations are surfaced as `Err(InstrumentError::CalibrationDrift)`.
async fn perform_calibration(
&mut self,
channel_id: u16,
points: Vec<CalibrationPoint>,
) -> Result<CalibrationRecord, InstrumentError>;
/// Verify calibration against known reference standards without modifying
/// the instrument's calibration coefficients. Used for out-of-tolerance checks.
async fn verify_calibration(
&self,
channel_id: u16,
reference_value: f64,
) -> Result<CalibrationVerification, InstrumentError>;
}
#[derive(Debug, Clone)]
pub struct CalibrationPoint {
pub reference_value: f64,
pub unit: String,
pub standard_lot_number: String, // Required for GMP traceability
pub standard_expiry_date: String, // ISO 8601 date
pub standard_certificate_id: String,
}
#[derive(Debug, Clone)]
pub struct CalibrationRecord {
pub channel_id: u16,
pub calibration_type: String, // "two-point", "single-point", "slope-intercept"
pub points: Vec<CalibrationDataPoint>,
pub resulting_slope: f64,
pub resulting_intercept: f64,
pub r_squared: f64, // Linearity measure — must be >= 0.9999 for GMP channels
pub pass_fail: CalibrationOutcome,
pub calibration_timestamp_ns: u64,
pub next_calibration_due_ns: u64, // Based on site's calibration interval policy
pub instrument_firmware_at_calibration: String,
}
#[derive(Debug, Clone)]
pub struct CalibrationDataPoint {
pub reference_value: f64,
pub measured_value: f64,
pub deviation_percent: f64,
pub within_tolerance: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CalibrationOutcome {
Pass,
PassWithWarning, // Within tolerance but close to limit — flag for QA review
Fail, // Outside tolerance — instrument must be taken offline
}
#[derive(Debug, Clone)]
pub struct CalibrationStatus {
pub channel_id: u16,
pub last_calibration_timestamp_ns: u64,
pub next_due_timestamp_ns: u64,
pub current_outcome: CalibrationOutcome,
pub days_until_due: i64,
pub current_slope: f64,
pub current_intercept: f64,
}
#[derive(Debug, Clone)]
pub struct CalibrationVerification {
pub expected: f64,
pub measured: f64,
pub absolute_error: f64,
pub percent_error: f64,
pub within_tolerance: bool,
pub tolerance_band_percent: f64,
}
2.5 The DriverManifest Trait
Every driver shared library must export a DriverManifest via a C-ABI function. This is the entry point for the hot-reload system.
// helix-hal/src/traits/manifest.rs
/// Metadata and factory function exported by every driver shared library.
///
/// The `DriverManifest` is the ONLY symbol the `libloading`-based loader
/// will look for in a driver `.so`/`.dylib`. The manifest function must be
/// exported with C linkage and the exact symbol name defined below.
///
/// # C ABI Symbol
/// The driver library MUST export this symbol:
/// `extern "C" fn helix_driver_manifest() -> *const DriverManifest`
///
/// The returned pointer must be valid for the lifetime of the library.
/// It is typically a reference to a `static` item.
pub struct DriverManifest {
/// ABI version this driver was compiled against.
/// MUST equal `HELIX_HAL_ABI_VERSION` from the `helix-hal` crate
/// version the driver was compiled with. A mismatch causes the loader
/// to reject the driver before calling any other functions.
pub abi_version: u32,
/// Human-readable driver name. Matches `DriverInfo.driver_name`.
pub driver_name: &'static str,
/// SemVer driver version string.
pub driver_version: &'static str,
/// Comma-separated list of instrument model strings this driver supports.
/// Example: "sartorius-biostat-b,sartorius-biostat-q-plus"
pub supported_models: &'static str,
/// Minimum HELIX OS version required.
pub min_helix_version: &'static str,
/// Factory function. Called once per driver instance creation.
/// Returns a boxed `dyn Instrument` (as a raw pointer for C ABI compatibility).
/// The caller (HAL supervisor) takes ownership of the returned pointer.
pub create_instance: unsafe extern "C" fn(
config_json: *const std::os::raw::c_char,
config_len: usize,
) -> *mut dyn Instrument,
/// Destroy function. Called when the driver instance is being dropped.
/// Must properly drop the `Box<dyn Instrument>` created by `create_instance`.
pub destroy_instance: unsafe extern "C" fn(instance: *mut dyn Instrument),
/// Checksum of the driver binary. Blake3 hash of the .so/.dylib file.
/// The HAL supervisor verifies this against the driver registry
/// (which is ledger-backed) before loading.
pub binary_checksum: [u8; 32],
}
/// The ABI version constant. Increment this whenever the trait definitions
/// in this document change in a way that breaks binary compatibility.
/// Drivers compiled against an older ABI version will be rejected.
pub const HELIX_HAL_ABI_VERSION: u32 = 4;
/// Helper macro for drivers to implement the manifest export correctly.
///
/// Usage in a driver crate:
/// ```rust
/// helix_hal::export_driver!(MySartoriusDriver, "sartorius-biostat-b");
/// ```
#[macro_export]
macro_rules! export_driver {
($driver_type:ty, $models:literal) => {
#[no_mangle]
pub extern "C" fn helix_driver_manifest() -> *const $crate::DriverManifest {
static MANIFEST: $crate::DriverManifest = $crate::DriverManifest {
abi_version: $crate::HELIX_HAL_ABI_VERSION,
driver_name: env!("CARGO_PKG_NAME"),
driver_version: env!("CARGO_PKG_VERSION"),
supported_models: $models,
min_helix_version: "2.4.0",
create_instance: __helix_create_instance::<$driver_type>,
destroy_instance: __helix_destroy_instance::<$driver_type>,
binary_checksum: [0u8; 32], // Filled in by helix-driver-sign at build time
};
&MANIFEST
}
unsafe extern "C" fn __helix_create_instance<T: $crate::Instrument + Default>(
config_json: *const std::os::raw::c_char,
config_len: usize,
) -> *mut dyn $crate::Instrument {
let config_bytes = unsafe {
std::slice::from_raw_parts(config_json as *const u8, config_len)
};
let config: T::Config = serde_json::from_slice(config_bytes)
.expect("Invalid driver config JSON");
let driver = Box::new(T::new(config));
Box::into_raw(driver) as *mut dyn $crate::Instrument
}
unsafe extern "C" fn __helix_destroy_instance<T: $crate::Instrument>(
instance: *mut dyn $crate::Instrument,
) {
if !instance.is_null() {
drop(unsafe { Box::from_raw(instance as *mut T) });
}
}
};
}
3. Error Handling Taxonomy
3.1 InstrumentError Enum
The InstrumentError enum is the unified error type for all HAL operations. It is designed to be:
- Actionable — every variant implies a specific recovery action for the HAL supervisor.
- Auditable — every variant carries enough context to generate a meaningful ledger entry.
- Non-lossy — vendor-specific error codes are preserved in structured fields, not stringified.
// helix-hal/src/error.rs
use thiserror::Error;
/// The unified error type for all HAL instrument operations.
///
/// # Variant Selection Guidelines for Driver Authors
///
/// The choice of variant determines how the HAL supervisor responds. Choose
/// the variant that most precisely describes the failure. Do not use
/// `ProtocolViolation` for a timeout (use `Timeout`), and do not use
/// `Timeout` for a protocol violation. Incorrect variant selection causes
/// the supervisor to apply the wrong recovery policy, which can range
/// from a harmful retry loop to missing a genuine instrument fault.
#[derive(Debug, Error, Clone)]
pub enum InstrumentError {
// ── Connection & Communication Errors ────────────────────────────────────
/// The physical or logical connection to the instrument was lost.
///
/// Recovery: HAL supervisor will attempt reconnection with exponential backoff
/// (initial: 1s, max: 60s, jitter: ±20%). The instrument enters
/// `InstrumentStatus::Disconnected` until reconnection succeeds.
///
/// Ledger entry: `CONNECTION_LOST` event with `last_frame_sequence` and
/// `connection_lost_timestamp_ns`.
#[error("Connection lost to {instrument_id}: {reason}")]
ConnectionLost {
instrument_id: String,
reason: String,
/// Sequence number of the last successfully received frame.
/// Used to detect data gaps after reconnection.
last_frame_sequence: u64,
/// Whether the loss was detected by a timeout or an explicit disconnect signal.
was_graceful: bool,
},
/// A connection attempt failed before it could be established.
///
/// Recovery: Exponential backoff retry. After `max_retries` attempts,
/// the instrument is moved to `FaultIsolated` and an alarm is raised.
#[error("Connection refused by {instrument_id} at {endpoint}: {reason}")]
ConnectionRefused {
instrument_id: String,
endpoint: String,
reason: String,
vendor_error_code: Option<u32>,
},
/// An operation exceeded its deadline.
///
/// Recovery: Log the timeout; retry once with a 2× timeout budget.
/// On second timeout, treat as `ConnectionLost`.
#[error("Timeout on {operation} for {instrument_id} (budget: {budget_ms}ms)")]
Timeout {
instrument_id: String,
operation: String, // "connect", "health_check", "dispatch_command", etc.
budget_ms: u64,
},
// ── Data Integrity & Protocol Errors ─────────────────────────────────────
/// The instrument sent a response that violates its documented protocol.
///
/// This is distinct from a data integrity error (bad measurement value).
/// A ProtocolViolation means the framing, checksum, sequence, or message
/// structure of the protocol is wrong.
///
/// Recovery: Close and reopen the connection. Log the offending bytes
/// in `received_bytes` for driver debugging. If violations persist across
/// reconnections, move to `FaultIsolated` — the instrument may have a
/// firmware bug or the driver may have a protocol implementation error.
///
/// Ledger entry: `PROTOCOL_VIOLATION` event. This ALWAYS generates a ledger
/// entry regardless of whether reconnection succeeds, because it may
/// indicate tampered instrument firmware in a GMP context.
#[error("Protocol violation from {instrument_id} on {endpoint}: {description}")]
ProtocolViolation {
instrument_id: String,
endpoint: String,
description: String,
/// The raw bytes that caused the violation (up to 256 bytes).
/// Used for driver debugging and forensic analysis.
received_bytes: Vec<u8>,
expected_description: String,
},
/// A received frame failed its checksum or CRC validation.
///
/// Recovery: Request retransmission if the protocol supports it.
/// If retransmission is not supported, discard the frame and log a gap.
/// Three consecutive CRC failures on the same channel triggers `ConnectionLost`.
#[error("Checksum failure from {instrument_id} channel {channel_id}: \
expected {expected:#010x}, got {actual:#010x}")]
ChecksumFailure {
instrument_id: String,
channel_id: u16,
expected: u32,
actual: u32,
},
// ── Measurement & Calibration Errors ─────────────────────────────────────
/// A sensor or measurement channel has drifted outside its calibration bounds.
///
/// This error is generated by the driver when it detects that a measurement
/// value is implausible given the instrument's operating context (e.g., a
/// DO probe reading -2 mg/L, or a pH probe with response time >5 minutes).
/// It is NOT generated for normal out-of-specification process values —
/// those are reported as `TelemetryFrame.alarm_bitmap` flags.
///
/// Recovery: The affected channel is marked `DEGRADED` in the instrument's
/// `InstrumentStatus`. The channel continues streaming (with
/// `quality_flags` indicating uncertain quality — OPC-UA code 0x0040).
/// An alarm is raised to the operator.
///
/// Ledger entry: `CALIBRATION_DRIFT_DETECTED` with full channel context.
#[error("Calibration drift on {instrument_id} channel {channel_id}: \
current value {current_value:.4} {unit} is {sigma_deviation:.1}σ from expected")]
CalibrationDrift {
instrument_id: String,
channel_id: u16,
current_value: f64,
expected_range_min: f64,
expected_range_max: f64,
sigma_deviation: f64,
unit: String,
last_calibration_timestamp_ns: u64,
},
/// A calibration procedure failed to meet acceptance criteria.
///
/// Recovery: The instrument channel must be taken offline for manual
/// recalibration. This error moves the channel to `FaultIsolated`.
/// A calibration failure is a significant GMP event and ALWAYS generates
/// a mandatory deviation record.
#[error("Calibration failure on {instrument_id} channel {channel_id}: {reason}")]
CalibrationFailed {
instrument_id: String,
channel_id: u16,
reason: String,
calibration_record: Option<Box<crate::CalibrationRecord>>,
},
// ── Command & Control Errors ──────────────────────────────────────────────
/// A command was dispatched but the instrument rejected it.
///
/// Recovery: Do not retry automatically. Surface the rejection to the
/// operator. Log the rejection to the audit ledger as a `COMMAND_REJECTED`
/// event — this is a GMP-significant event because it may indicate a safety
/// interlock, a setpoint out of instrument range, or an unauthorized command.
#[error("Command rejected by {instrument_id}: {rejection_reason}")]
CommandRejected {
instrument_id: String,
command_description: String,
rejection_reason: String,
vendor_rejection_code: Option<u32>,
},
/// A command was acknowledged but the instrument failed to execute it.
///
/// Distinct from `CommandRejected`: the instrument said "OK" but then
/// reported an execution failure (e.g., motor stall, valve stuck).
#[error("Command execution failure on {instrument_id}: {failure_reason}")]
CommandExecutionFailure {
instrument_id: String,
command_description: String,
failure_reason: String,
},
// ── Resource & Configuration Errors ──────────────────────────────────────
/// The driver configuration is invalid or missing required fields.
///
/// Recovery: Do not retry. This is a configuration error that requires
/// human intervention. Move the driver to `FaultIsolated`.
#[error("Invalid driver configuration for {driver_name}: {field} — {reason}")]
InvalidConfiguration {
driver_name: String,
field: String,
reason: String,
},
/// The instrument's firmware version is not supported by this driver.
///
/// Recovery: Log the version mismatch. Connect in read-only mode if
/// `InstrumentCapabilities.accepts_commands = true` was requested but the
/// firmware version does not support the command protocol. Alert the
/// instrument administrator.
#[error("Unsupported firmware {firmware_version} on {instrument_id} \
(driver supports {min_version}–{max_version})")]
FirmwareVersionMismatch {
instrument_id: String,
firmware_version: String,
min_version: String,
max_version: String,
},
/// An operation was requested that this instrument does not support.
///
/// Recovery: Return immediately. This is a programming error in the caller
/// (calling `dispatch_command` on an instrument that does not implement
/// `Commandable` should be caught at compile time, but protocol-level
/// unsupported operations surface here).
#[error("{instrument_id} does not support operation: {operation}")]
OperationNotSupported {
instrument_id: String,
operation: String,
},
/// A catch-all for vendor-SDK errors that do not map to a specific variant.
///
/// Use this sparingly — every new vendor error pattern that recurs more
/// than twice should be promoted to its own variant. The `context` map
/// must include at minimum `vendor_error_code` and `vendor_error_message`.
#[error("Vendor SDK error from {instrument_id}: {message}")]
VendorSdkError {
instrument_id: String,
message: String,
context: std::collections::HashMap<String, String>,
},
}
impl InstrumentError {
/// Returns the recovery action the HAL supervisor should take.
/// This is the primary dispatch key for the supervisor's error handler.
pub fn recovery_action(&self) -> RecoveryAction {
match self {
Self::ConnectionLost { .. } => RecoveryAction::ReconnectWithBackoff,
Self::ConnectionRefused { .. } => RecoveryAction::ReconnectWithBackoff,
Self::Timeout { .. } => RecoveryAction::RetryOnceWithDoubleTimeout,
Self::ProtocolViolation { .. } => RecoveryAction::ReconnectAndLogForensics,
Self::ChecksumFailure { .. } => RecoveryAction::RequestRetransmission,
Self::CalibrationDrift { .. } => RecoveryAction::DegradeChannelAndAlert,
Self::CalibrationFailed { .. } => RecoveryAction::IsolateChannelAndRaiseMandatoryDeviation,
Self::CommandRejected { .. } => RecoveryAction::SurfaceToOperatorNoRetry,
Self::CommandExecutionFailure { .. } => RecoveryAction::SurfaceToOperatorNoRetry,
Self::InvalidConfiguration { .. } => RecoveryAction::IsolateDriver,
Self::FirmwareVersionMismatch { .. } => RecoveryAction::ConnectReadOnlyAndAlert,
Self::OperationNotSupported { .. } => RecoveryAction::ReturnImmediately,
Self::VendorSdkError { .. } => RecoveryAction::ReconnectWithBackoff,
}
}
/// Returns true if this error MUST generate an audit ledger entry
/// regardless of whether recovery succeeds.
pub fn requires_audit_entry(&self) -> bool {
matches!(self,
Self::ProtocolViolation { .. }
| Self::CalibrationDrift { .. }
| Self::CalibrationFailed { .. }
| Self::CommandRejected { .. }
| Self::CommandExecutionFailure { .. }
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryAction {
ReconnectWithBackoff,
RetryOnceWithDoubleTimeout,
ReconnectAndLogForensics,
RequestRetransmission,
DegradeChannelAndAlert,
IsolateChannelAndRaiseMandatoryDeviation,
SurfaceToOperatorNoRetry,
IsolateDriver,
ConnectReadOnlyAndAlert,
ReturnImmediately,
}
3.2 Error Recovery Policy
The HAL supervisor implements a hierarchical recovery state machine. The transition from one state to another is entirely determined by InstrumentError::recovery_action() — the supervisor never pattern-matches on error variants directly.
InstrumentStatus::Connected / Streaming
│
│ InstrumentError emitted
▼
┌───────────────────────────────────────────────────────────────┐
│ RecoveryAction Dispatcher │
│ │
│ ReconnectWithBackoff ──────────────────────────────────┐ │
│ RetryOnceWithDoubleTimeout ─────► retry once ─► if fail ┤ │
│ ReconnectAndLogForensics ──────────────────────────────┐ │ │
│ │ │ │
│ DegradeChannelAndAlert ──► InstrumentStatus::Degraded │ │ │
│ │ │ │
│ IsolateChannelAndRaiseMandatoryDeviation │ │ │
│ ──► Channel to FaultIsolated + Deviation record │ │ │
│ │ │ │
│ SurfaceToOperatorNoRetry ──► Alarm + Audit entry │ │ │
│ │ │ │
│ IsolateDriver ──────────────────────────────────────┐ │ │ │
│ │ │ │ │
│ ConnectReadOnlyAndAlert ──► reconnect, no commands │ │ │ │
└──────────────────────────────────────────────────────┼──┼─┼───┘
│ │ │
┌────────────────────────┘ │ │
▼ │ │
┌──────────────────┐ │ │
│ FaultIsolated │ ◄─────────────────┘ │
│ (driver loaded, │ │
│ instrument │ │
│ quarantined) │ │
└──────────────────┘ │
│
┌───────────────────────────────────────┐│
│ Exponential Backoff Reconnection ││
│ Initial: 1s → 2s → 4s → ... → 60s ││
│ Jitter: ±20% (prevents thundering herd)││
│ Max attempts: configurable (default 10)◄┘
└───────────────────────────────────────┘
3.3 Error Propagation to the Audit Ledger
Errors that requires_audit_entry() returns true for are handled specially by the HAL supervisor: even if the recovery action succeeds and the instrument returns to normal operation, the error event is still written to the audit ledger. A protocol violation that self-heals is still a protocol violation that happened and must be traceable.
// helix-hal/src/supervisor.rs (excerpt)
async fn handle_instrument_error(
&self,
error: &InstrumentError,
instrument_id: &InstrumentId,
) {
// Always log to the metrics system
self.metrics.increment_counter(
"hal.instrument.errors",
&[("instrument", instrument_id.as_str()), ("error_type", error.variant_name())],
);
// Conditionally log to the audit ledger
if error.requires_audit_entry() {
let entry = AuditEvent::InstrumentError {
instrument_id: instrument_id.clone(),
error_description: error.to_string(),
error_detail: serde_json::to_value(error).unwrap_or_default(),
timestamp_ns: crate::time::monotonic_ns(),
};
// This send is fire-and-forget — the ledger writer has its own
// bounded queue. If the ledger queue is full, the error is written
// to the emergency WAL (a separate file that is replayed on restart).
let _ = self.audit_sink.try_send(entry);
}
// Dispatch recovery action
match error.recovery_action() {
RecoveryAction::ReconnectWithBackoff => self.schedule_reconnect(instrument_id),
RecoveryAction::IsolateChannelAndRaiseMandatoryDeviation => {
self.isolate_channel(instrument_id);
self.raise_mandatory_deviation(instrument_id, error);
}
// ... other actions
}
}
4. Backpressure Architecture
4.1 The Producer-Fast Consumer-Slow Problem
A modern bioreactor sampling at 10 Hz across 8 channels produces 80 TelemetryFrame values per second. An Illumina NovaSeq X in real-time base-calling mode can produce bursts of >100,000 quality score events per second. Under normal operation, the ring buffer writer (consumer) can drain these faster than the driver (producer) generates them.
The problem arises during three predictable scenarios:
-
Audit ledger write pressure. An NVMe with
O_SYNChas a P99 write latency of ~200µs. At 10,000 telemetry events/second, the ledger writer is the bottleneck. If the ring buffer fills, the driver's channel fills next. -
GPU render pipeline stall. The render thread drops frames during a viewport resize or shader recompilation. During the stall, frames queue up in the telemetry channel.
-
Digital twin ODE divergence. If the ODE solver encounters a stiff system and reduces its step size, it consumes frames faster than usual, then slows to a crawl — causing a producer burst relative to consumer rate.
In all three scenarios, the driver continues producing data at the instrument's native rate. The question is: what happens to the excess?
4.2 Bounded Channel Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ HAL Supervisor │
│ │
│ Per-driver bounded channel: │
│ mpsc::channel(capacity = CHANNEL_CAPACITY) │
│ │
│ CHANNEL_CAPACITY = max( │
│ BACKPRESSURE_BUFFER_SECONDS × channel_max_rate_hz, │
│ MIN_CHANNEL_CAPACITY // = 256 │
│ ) │
│ │
│ Default BACKPRESSURE_BUFFER_SECONDS = 5.0 │
│ Example: 10 Hz × 8 channels × 5s = 400 frames minimum │
│ │
│ ┌──────────────────────┐ ┌──────────────────────────────┐ │
│ │ Driver Task │ │ Ring Buffer Writer Task │ │
│ │ (per instrument) │ │ (per instrument) │ │
│ │ │ mpsc │ │ │
│ │ produces frames ──►─┼─────────┼─► drains channel │ │
│ │ at instrument rate │ bounded │ writes to ring buffer │ │
│ │ │ channel │ writes to audit ledger │ │
│ │ on Send::Err: │ │ at sustainable rate │ │
│ │ apply drop_policy │ │ │ │
│ └──────────────────────┘ └──────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
The channel capacity defines the backpressure absorption window — the amount of time the driver can continue producing at full rate before it must start making drop decisions. A larger window consumes more memory but provides more time for the consumer to catch up.
For GMP-regulated instruments, the channel capacity MUST be large enough to absorb the maximum expected consumer stall duration without any frame drops. The validated hardware configuration list specifies minimum NVMe performance characteristics that keep P99 consumer stall times below the channel absorption window.
4.3 Drop Policy Hierarchy
// helix-hal/src/backpressure.rs
/// What the driver should do when `tx.send(frame)` returns `Err` (channel full).
///
/// The drop policy is configured per-instrument in `helix.toml`. The default
/// for all instruments is `DropOldest` because preserving the most recent
/// data is almost always preferable in process monitoring — a stale pH reading
/// from 10 seconds ago is less useful than the current one, even if we lost
/// some readings in between.
///
/// The exception is sequencing instruments (Illumina, Oxford Nanopore) where
/// data must be processed in order and dropping frames is not acceptable.
/// These instruments use `Backpressure` mode and the driver is expected to
/// slow down or buffer internally.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
/// Drop the oldest frame in the channel to make room for the new one.
///
/// Implementation: The driver calls `channel.try_send(frame)`. On `Err`,
/// it calls the HAL supervisor's `force_drop_oldest(channel_id)` helper,
/// which pops the head of the channel (logging a drop event to the ledger)
/// before retrying.
///
/// Use for: All process instruments (bioreactors, analyzers).
/// Regulatory impact: Each drop is logged to the audit ledger as a
/// `TELEMETRY_DROP_BACKPRESSURE` event. Persistent drops trigger an alarm.
DropOldest,
/// Drop the new (incoming) frame.
///
/// The channel is at capacity; the buffered data is considered more
/// important than the new arrival.
///
/// Use for: Instruments where temporal ordering within the buffer matters
/// more than recency (e.g., a chromatography instrument during a gradient).
DropNewest,
/// Apply backpressure to the driver by making `stream_telemetry` await
/// channel availability (using `tx.send(frame).await`).
///
/// This slows the driver's polling loop. Use ONLY for instruments that
/// can tolerate a slower polling rate without missing data at the hardware
/// level (i.e., the instrument has its own internal buffer).
///
/// NEVER use for instruments with fixed-rate DMA output that cannot be
/// throttled — data will be lost at the hardware level.
///
/// Use for: Instruments with hardware-side buffering (Illumina, Oxford Nanopore).
Backpressure,
/// Drop frames from a specified quality tier first, then fall back to `DropOldest`.
///
/// Frames with `quality_flags` below `threshold_quality` are dropped first.
/// Only when no low-quality frames remain does the policy fall back to
/// dropping the oldest regardless of quality.
///
/// Use for: Instruments with variable-quality outputs where bad readings
/// should be sacrificed before good ones.
DropLowQualityFirst {
/// OPC-UA quality code threshold. Frames below this are "low quality."
threshold_quality: u16,
},
}
/// Internal drop event logged when a frame is discarded due to backpressure.
/// Written to the audit ledger as a `TELEMETRY_DROP_BACKPRESSURE` entry.
#[derive(Debug, Clone)]
pub struct DropEvent {
pub instrument_id: String,
pub channel_id: u16,
pub dropped_sequence: u64,
pub drop_reason: DropReason,
pub channel_depth_at_drop: usize,
pub channel_capacity: usize,
pub timestamp_ns: u64,
}
#[derive(Debug, Clone, Copy)]
pub enum DropReason {
ChannelFull,
PolicyDropOldest,
PolicyDropNewest,
PolicyDropLowQuality { quality_code: u16 },
SupervisorShutdown,
}
4.4 Backpressure Signaling to the Driver
When the channel fill level exceeds a configurable warning threshold (default: 75% full), the HAL supervisor signals the driver via a watch channel. The driver can use this signal to:
- Temporarily reduce its polling rate (for instruments that support it).
- Increase its internal aggregation window (e.g., average 10 samples instead of sending each one).
- Log a diagnostic event without changing behavior.
// helix-hal/src/backpressure.rs (continued)
/// Backpressure signal sent from the HAL supervisor to the driver task.
/// Delivered via `tokio::sync::watch::Sender<BackpressureSignal>`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureSignal {
/// Channel is below 75% capacity. Normal operation.
Normal,
/// Channel is between 75% and 90% capacity. Advisory — driver may
/// optionally reduce rate.
Warning { fill_percent: u8 },
/// Channel is above 90% capacity. Critical — drops are imminent.
/// Driver should aggressively reduce rate or increase aggregation.
Critical { fill_percent: u8 },
/// Channel is full. Drops are occurring. The driver's `drop_policy`
/// is being applied on every frame.
Dropping { frames_dropped_since_last_signal: u64 },
}
/// Example: how a driver's polling loop responds to backpressure.
/// This is a pattern, not a required implementation.
async fn example_polling_loop(
tx: tokio::sync::mpsc::Sender<TelemetryFrame>,
mut bp_rx: tokio::sync::watch::Receiver<BackpressureSignal>,
mut stop: tokio::sync::oneshot::Receiver<()>,
) {
let mut poll_interval = tokio::time::interval(std::time::Duration::from_millis(100)); // 10 Hz
loop {
tokio::select! {
_ = poll_interval.tick() => {
// Poll the instrument, produce a frame, send to channel
// ...
}
// Non-blocking backpressure check
Ok(()) = bp_rx.changed() => {
match *bp_rx.borrow() {
BackpressureSignal::Normal => {
poll_interval = tokio::time::interval(
std::time::Duration::from_millis(100) // Restore 10 Hz
);
}
BackpressureSignal::Warning { .. } => {
// Reduce to 5 Hz if the instrument supports it
poll_interval = tokio::time::interval(
std::time::Duration::from_millis(200)
);
}
BackpressureSignal::Critical { .. } | BackpressureSignal::Dropping { .. } => {
// Reduce to 1 Hz
poll_interval = tokio::time::interval(
std::time::Duration::from_millis(1000)
);
}
}
}
_ = &mut stop => break,
}
}
}
5. Async Runtime Contract
5.1 Task Budgets and Cooperative Yielding
HELIX OS runs all driver tasks on a shared Tokio multi-thread runtime with worker_threads = num_physical_cores / 2 (the other half are reserved for CPU-bound simulation workloads). A driver task that monopolizes a worker thread degrades every other instrument on the same runtime.
The cooperative yielding rule: A driver's stream_telemetry() implementation MUST yield to the executor at least once per polling iteration. The tokio::time::interval::tick().await at the top of a polling loop satisfies this. A driver that uses a tight loop without any .await will starve other tasks.
// BAD — monopolizes the worker thread
loop {
let frame = instrument.read_blocking(); // Blocks the thread!
tx.try_send(frame);
}
// GOOD — yields to the executor each iteration via the interval tick
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
interval.tick().await; // Yields here, other tasks can run
let frame = instrument.try_read_nonblocking()?;
if let Some(frame) = frame {
tx.send(frame).await?;
}
}
5.2 Cancellation Safety
All async_trait methods that cross the cancellation boundary (i.e., are called with tokio::select! or tokio::time::timeout) MUST be cancellation-safe. This means:
- No
&mut selfstate changes before the first.await. If the method acquires a lock or modifies state, it must either do so after all awaits, or use a separate cleanup mechanism. - All acquired resources (sockets, file descriptors) must be released in
Drop, not after an.await. If the future is dropped at an.awaitpoint,Dropruns but no code after the.awaitruns. - Instrument commands are cancellation-safe at the dispatch level, not the execution level. If
dispatch_command()is cancelled after the command was sent to the instrument but before the acknowledgment was received, the HAL supervisor will re-verify the instrument's setpoint state on reconnection.
5.3 Blocking I/O Policy
All blocking I/O in drivers MUST use tokio::task::spawn_blocking. Blocking the async executor thread causes latency spikes across all concurrent instrument streams.
// helix-hal/src/patterns.rs
/// The correct pattern for calling a blocking vendor SDK function from an async context.
///
/// `spawn_blocking` moves the closure to a dedicated thread pool (separate from the
/// async worker threads). The async task suspends and the worker thread is freed
/// to process other tasks while the blocking call is in progress.
pub async fn call_blocking_sdk<F, T>(f: F) -> Result<T, InstrumentError>
where
F: FnOnce() -> Result<T, InstrumentError> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn_blocking(f)
.await
.map_err(|join_err| InstrumentError::VendorSdkError {
instrument_id: "unknown".to_string(),
message: format!("spawn_blocking task panicked: {join_err}"),
context: Default::default(),
})?
}
// Usage in a driver:
async fn read_measurement_from_sdk(&self) -> Result<f64, InstrumentError> {
let sdk_handle = self.sdk_handle.clone(); // Arc-wrapped SDK handle
call_blocking_sdk(move || {
sdk_handle.read_dissolved_oxygen() // Blocks for up to 50ms
.map_err(|e| InstrumentError::VendorSdkError {
instrument_id: sdk_handle.instrument_id(),
message: e.to_string(),
context: Default::default(),
})
}).await
}
6. Hot-Reload Architecture
6.1 Shared Library ABI Contract
Drivers are distributed as signed .so (Linux) or .dylib (macOS) shared libraries. The hot-reload system loads these at runtime using the libloading crate. The ABI contract between the loader and the library is minimal by design — a large ABI surface is a compatibility liability.
The only exported symbol the loader looks for:
// In the driver library (not the HELIX core)
#[no_mangle]
pub extern "C" fn helix_driver_manifest() -> *const helix_hal::DriverManifest {
&MANIFEST // Pointer to a static DriverManifest (see §2.5)
}
No other symbols are required. All trait object dispatch goes through fat pointers obtained from create_instance, not through named symbol lookups.
Why not a vtable-based approach? Rust's dyn Trait fat pointer contains a pointer to a vtable that is allocated in the .rodata section of the driver library. When the driver is unloaded, that vtable memory is freed. Any use of a dyn Instrument fat pointer after the library is unloaded is a use-after-free. The hot-reload protocol (§6.4) prevents this by draining all in-flight operations before unloading.
6.2 The libloading Integration Layer
// helix-hal/src/loader.rs
use libloading::{Library, Symbol};
use std::path::Path;
use crate::{DriverManifest, Instrument, InstrumentError, HELIX_HAL_ABI_VERSION};
/// A loaded driver library and its active instance.
///
/// The `Library` object MUST outlive the `Box<dyn Instrument>` created from it.
/// This invariant is enforced by storing both in `LoadedDriver` and dropping
/// the instance before the library in `Drop`.
pub struct LoadedDriver {
/// The driver instance (fat pointer into library memory).
/// Wrapped in Option so we can drop it before the library.
instance: Option<Box<dyn Instrument>>,
/// The library handle. MUST be dropped AFTER `instance`.
library: Library,
/// Path to the library file. Used for reload and audit records.
library_path: std::path::PathBuf,
/// The manifest (pointer into library .rodata). Valid while `library` is live.
manifest: &'static DriverManifest,
/// Blake3 hash of the library file at load time.
/// Compared against the driver registry on each reload.
library_hash: [u8; 32],
}
impl LoadedDriver {
/// Load a driver library from the given path.
///
/// # Safety
/// This function loads a shared library and calls its `helix_driver_manifest`
/// function. The following safety requirements must be satisfied:
///
/// 1. The library MUST have been signed by a HELIX driver signing key.
/// The loader verifies the Blake3 hash against the driver registry
/// (a ledger-backed source of truth) before calling any library functions.
///
/// 2. The library MUST have been compiled against the same `helix-hal` crate
/// version (or a compatible ABI version). The ABI version check in this
/// function provides a safety net, but the best defense is the driver
/// certification process (§8).
///
/// 3. The library MUST NOT have been modified after signing. The Blake3 hash
/// verification provides this guarantee.
///
/// Violating any of these requirements can cause undefined behavior. The loader
/// enforces requirements 1 and 3 by checking the hash. Requirement 2 is
/// enforced by the ABI version check. Both checks must pass before any
/// library code is executed.
pub unsafe fn load(
path: &Path,
config_json: &str,
driver_registry: &dyn DriverRegistry,
) -> Result<Self, DriverLoadError> {
// Step 1: Compute and verify the library hash BEFORE loading.
// Loading an unverified library is a security violation.
let library_bytes = std::fs::read(path)
.map_err(|e| DriverLoadError::IoError(e.to_string()))?;
let library_hash = *blake3::hash(&library_bytes).as_bytes();
let expected_hash = driver_registry
.get_expected_hash(path)
.ok_or(DriverLoadError::NotInRegistry(path.to_path_buf()))?;
if library_hash != expected_hash {
return Err(DriverLoadError::HashMismatch {
path: path.to_path_buf(),
expected: expected_hash,
actual: library_hash,
});
}
// Step 2: Load the library. After this point, library code can run.
let library = Library::new(path)
.map_err(|e| DriverLoadError::LibraryLoad(e.to_string()))?;
// Step 3: Look up the manifest function.
let manifest_fn: Symbol<unsafe extern "C" fn() -> *const DriverManifest> = library
.get(b"helix_driver_manifest\0")
.map_err(|_| DriverLoadError::ManifestSymbolNotFound)?;
let manifest_ptr = manifest_fn();
if manifest_ptr.is_null() {
return Err(DriverLoadError::NullManifest);
}
let manifest: &'static DriverManifest = &*manifest_ptr;
// Step 4: ABI version check.
if manifest.abi_version != HELIX_HAL_ABI_VERSION {
return Err(DriverLoadError::AbiVersionMismatch {
driver_abi: manifest.abi_version,
loader_abi: HELIX_HAL_ABI_VERSION,
});
}
// Step 5: Create the driver instance.
let config_cstr = std::ffi::CString::new(config_json)
.map_err(|_| DriverLoadError::InvalidConfig("null byte in config JSON".into()))?;
let instance_ptr = (manifest.create_instance)(
config_cstr.as_ptr(),
config_json.len(),
);
if instance_ptr.is_null() {
return Err(DriverLoadError::InstanceCreationFailed);
}
// Safety: `instance_ptr` was returned by `create_instance` which boxes a
// `dyn Instrument` and returns `Box::into_raw`. We re-box it here.
let instance: Box<dyn Instrument> = Box::from_raw(instance_ptr);
Ok(LoadedDriver {
instance: Some(instance),
library,
library_path: path.to_path_buf(),
manifest,
library_hash,
})
}
pub fn instrument(&self) -> &dyn Instrument {
self.instance.as_deref().expect("instance already dropped")
}
pub fn instrument_mut(&mut self) -> &mut dyn Instrument {
self.instance.as_deref_mut().expect("instance already dropped")
}
}
impl Drop for LoadedDriver {
fn drop(&mut self) {
// CRITICAL: Drop the instance BEFORE the library.
// The instance's Drop impl calls into library code.
// If the library is dropped first, the Drop call is a use-after-free.
if let Some(instance) = self.instance.take() {
// Call the library's destroy function to ensure proper cleanup
// on the library's allocator side.
let raw = Box::into_raw(instance);
unsafe { (self.manifest.destroy_instance)(raw) };
}
// Now the library can be dropped safely.
// `self.library` drops here.
}
}
#[derive(Debug, thiserror::Error)]
pub enum DriverLoadError {
#[error("Driver library not in registry: {0}")]
NotInRegistry(std::path::PathBuf),
#[error("Hash mismatch for {path}: expected {expected:?}, got {actual:?}")]
HashMismatch { path: std::path::PathBuf, expected: [u8; 32], actual: [u8; 32] },
#[error("Library load failed: {0}")]
LibraryLoad(String),
#[error("helix_driver_manifest symbol not found in library")]
ManifestSymbolNotFound,
#[error("helix_driver_manifest returned null")]
NullManifest,
#[error("ABI version mismatch: driver={driver_abi}, loader={loader_abi}")]
AbiVersionMismatch { driver_abi: u32, loader_abi: u32 },
#[error("Instance creation returned null")]
InstanceCreationFailed,
#[error("IO error: {0}")]
IoError(String),
#[error("Invalid config: {0}")]
InvalidConfig(String),
}
6.3 Driver Lifecycle State Machine
┌─────────────────────────────┐
│ Unregistered │
│ (library file on disk, │
│ not loaded by the runtime) │
└──────────────┬──────────────┘
│ helix driver register
│ (hash verified, added to registry)
▼
┌─────────────────────────────┐
│ Registered │
│ (in driver registry, │
│ library not yet loaded) │
└──────────────┬──────────────┘
│ HAL supervisor: load
│ (libloading::Library::new + ABI check)
▼
┌─────────────────────────────┐
│ Loaded │
│ (instance created, │
│ not yet connected) │
└──────────────┬──────────────┘
│ instrument.connect().await
▼
┌─────────────────────────────┐
┌─────── ►│ Active │◄──────────────────┐
│ │ (connected, streaming) │ │
│ └──────────────┬──────────────┘ │
│ InstrumentError │ │
│ │ │
│ ┌─────────┴──────────┐ │
│ │ reconnect_policy │ │
│ └──┬──────────────┬──┘ │
│ │ │ │
│ max_retries │ │ within_retries │
│ exceeded ▼ ▼ │
│ ┌──────────────────┐ ┌───────────────┐ │
│ │ FaultIsolated │ │ Reconnecting │ ──────────────►│
│ │ (loaded, not │ │ (backoff │ connected │
│ │ connected) │ │ timer active)│ │
│ └──────────────────┘ └───────────────┘ │
│ │ │
│ │ helix driver reload │
│ ▼ │
│ ┌──────────────────┐ │
└──│ Reloading │ ─────────────────────────────────────┘
│ (new .so loaded,│ reload success
│ old instance │
│ being drained) │
└──────────────────┘
6.4 Zero-Downtime Reload Protocol
The hot-reload protocol ensures that a driver can be replaced with a new version — for a bug fix or protocol update — without dropping any in-flight telemetry frames or interrupting other instruments on the same HAL supervisor.
// helix-hal/src/supervisor.rs
/// Execute a zero-downtime driver reload.
///
/// This is the implementation of `helix driver reload --instrument <id>`.
///
/// # The Protocol
///
/// 1. Load the new driver library (hash check, ABI check, instance creation).
/// The new driver is in `Loaded` state — not connected.
///
/// 2. Send a `BackpressureSignal::Dropping` to the old driver to stop
/// new frame production. Wait up to 2 seconds for the channel to drain
/// (via `channel.is_empty()` polling).
///
/// 3. Send the stop signal to the old driver's `stream_telemetry()` task.
/// Wait for the task to return (up to 5 seconds).
///
/// 4. Call `old_driver.disconnect().await`.
///
/// 5. Drop the old `LoadedDriver`. The `Drop` impl calls `destroy_instance`
/// and then drops the `Library` (in that order — see §6.2).
///
/// 6. Connect the new driver: `new_driver.connect().await`.
///
/// 7. Start the new driver's `stream_telemetry()` task.
///
/// Total downtime (time between last frame from old driver and first frame
/// from new driver): typically 200–800ms, bounded by the instrument's
/// reconnect latency.
pub async fn hot_reload_driver(
&mut self,
instrument_id: &InstrumentId,
new_library_path: &Path,
) -> Result<(), DriverReloadError> {
let config_json = self.get_instrument_config(instrument_id)?;
// Phase 1: Load new driver (does not connect or start streaming)
let new_driver = unsafe {
LoadedDriver::load(new_library_path, &config_json, &self.driver_registry)
.map_err(DriverReloadError::LoadFailed)?
};
// Phase 2: Drain the old driver's channel
tracing::info!("Hot-reload: draining channel for {instrument_id}");
self.signal_backpressure(instrument_id, BackpressureSignal::Dropping { frames_dropped_since_last_signal: 0 });
let drain_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
while !self.is_channel_empty(instrument_id) {
if tokio::time::Instant::now() > drain_deadline {
tracing::warn!("Hot-reload: channel drain timeout for {instrument_id}, proceeding");
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
// Phase 3: Stop old driver
tracing::info!("Hot-reload: stopping old driver for {instrument_id}");
self.stop_streaming(instrument_id).await?;
// Phase 4: Disconnect old driver
{
let old_driver = self.get_driver_mut(instrument_id)?;
old_driver.instrument_mut().disconnect().await
.map_err(DriverReloadError::DisconnectFailed)?;
}
// Phase 5: Drop old driver (library unloaded here — safe because all
// in-flight tasks have completed in phases 3 and 4)
self.drop_driver(instrument_id);
// Phase 6: Install and connect new driver
tracing::info!("Hot-reload: connecting new driver for {instrument_id}");
self.install_driver(instrument_id, new_driver);
self.connect_driver(instrument_id).await
.map_err(DriverReloadError::ReconnectFailed)?;
// Phase 7: Start streaming
self.start_streaming(instrument_id).await?;
// Log the reload event to the audit ledger
self.audit_sink.send(AuditEvent::DriverReloaded {
instrument_id: instrument_id.clone(),
old_driver_version: self.get_previous_driver_version(instrument_id),
new_driver_version: new_driver_version,
reload_timestamp_ns: crate::time::monotonic_ns(),
}).await.ok();
tracing::info!("Hot-reload: complete for {instrument_id}");
Ok(())
}
6.5 Safety Invariants for Shared Libraries
These invariants are enforced by the loader and must be respected by driver authors:
| ID | Invariant | Enforcement |
|---|---|---|
| SL-01 | A Box<dyn Instrument> is NEVER used after the Library that backs it is dropped. | LoadedDriver::drop() drops instance before library |
| SL-02 | The DriverManifest pointer is never dereferenced after the library is unloaded. | Manifest is accessed only through LoadedDriver which owns the library |
| SL-03 | Driver code never calls back into HELIX core via raw function pointers stored at load time. | All core callbacks go through channel sends (owned data, no raw pointers) |
| SL-04 | Driver memory allocations use the Rust global allocator (jemalloc in HELIX builds). | No custom allocators in drivers; enforced by driver certification test suite |
| SL-05 | Driver libraries are stripped of all symbols except helix_driver_manifest. | Build pipeline: --strip=all --export-dynamic-symbol=helix_driver_manifest |
| SL-06 | Driver libraries are compiled with -Z relro and PIE on Linux, hardened runtime on macOS. | CI build flags enforced by the driver signing tool |
| SL-07 | A library with an ABI version mismatch is never instantiated, even if the hash check passes. | Checked in LoadedDriver::load() before create_instance is called |
7. Reference Driver Implementations
7.1 Sartorius Biostat Driver Skeleton
// drivers/sartorius-biostat/src/lib.rs
use async_trait::async_trait;
use helix_hal::{
export_driver, Instrument, TelemetrySource, Commandable, Calibratable,
InstrumentId, InstrumentCapabilities, InstrumentStatus, InstrumentError,
ConnectionConfig, HealthReport, DriverInfo, TelemetryFrame, StreamConfig,
StreamStopReason, StreamingStats, ChannelDescriptor, InstrumentCommand,
CommandResult, CommandAuditRecord, CalibrationRecord, CalibrationPoint,
CalibrationStatus, CalibrationVerification,
};
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
/// Configuration loaded from helix.toml for this instrument instance.
#[derive(serde::Deserialize, Debug, Clone)]
pub struct SartoriusBiostatConfig {
pub host: String,
pub port: u16, // Default: 4840 (OPC-UA) or 10001 (DCU proprietary)
pub protocol: SartoriusProtocol,
pub poll_interval_ms: u64, // Default: 1000 (1 Hz — Sartorius DCU native rate)
pub username: Option<String>,
pub password_env_var: Option<String>, // Read password from env var, never config file
}
#[derive(serde::Deserialize, Debug, Clone, Copy)]
pub enum SartoriusProtocol {
OpcUa,
DcuProprietary,
ModbusTcp,
}
pub struct SartoriusBiostatDriver {
config: SartoriusBiostatConfig,
instrument_id: InstrumentId,
display_name: String,
status: InstrumentStatus,
// Connection handle — None when disconnected.
// Arc<Mutex> because the driver instance is split across
// the `stream_telemetry` task and the `dispatch_command` path.
connection: Option<Arc<Mutex<SartoriusConnection>>>,
streaming_stats: StreamingStats,
sequence_counters: [u64; 8], // Per-channel monotonic sequence numbers
}
impl SartoriusBiostatDriver {
pub fn new(config: SartoriusBiostatConfig) -> Self {
let id = InstrumentId::from(
format!("sartorius:biostat-b:{}", &config.host)
);
let display_name = format!("Sartorius Biostat B @ {}:{}", config.host, config.port);
SartoriusBiostatDriver {
config,
instrument_id: id,
display_name,
status: InstrumentStatus::Uninitialized,
connection: None,
streaming_stats: StreamingStats::default(),
sequence_counters: [0; 8],
}
}
/// Translate a raw Sartorius DCU measurement packet into a TelemetryFrame.
fn translate_dcu_packet(
&mut self,
packet: &DcuMeasurementPacket,
channel_id: u16,
) -> TelemetryFrame {
self.sequence_counters[channel_id as usize] += 1;
let seq = self.sequence_counters[channel_id as usize];
let mut frame = TelemetryFrame {
timestamp_ns: helix_hal::time::monotonic_ns(),
wall_timestamp_ns: helix_hal::time::wall_clock_tai_ns(),
instrument_id: self.instrument_id.as_u64(),
channel_id,
quality_flags: packet.quality_to_opcua_code(),
unit_code: packet.unit_to_helix_code(),
value: packet.value,
sequence: seq,
hw_metadata: packet.raw_adc_value as u64,
crc32c: 0, // Computed below
active_setpoint: packet.setpoint,
dt_predicted: 0.0, // Filled by Digital Twin engine, not the driver
dt_deviation_sigma: 0.0,
alarm_bitmap: packet.alarm_flags_to_helix_bitmap(),
spc_signals: 0, // Computed by core SPC engine, not the driver
_pad0: [0; 4],
_pad1: [0; 40],
};
// Compute CRC-32C over bytes 0..56
frame.crc32c = helix_hal::crc::compute_frame_crc(&frame);
frame
}
}
#[async_trait]
impl Instrument for SartoriusBiostatDriver {
fn instrument_id(&self) -> &InstrumentId { &self.instrument_id }
fn display_name(&self) -> &str { &self.display_name }
fn capabilities(&self) -> InstrumentCapabilities {
InstrumentCapabilities {
streams_telemetry: true,
accepts_commands: true,
supports_calibration: true,
supports_simulation_mode: true,
has_onboard_logging: true, // Sartorius DCU has internal data logging
}
}
async fn connect(&mut self, _config: &ConnectionConfig) -> Result<(), InstrumentError> {
self.status = InstrumentStatus::Connecting;
let conn = match self.config.protocol {
SartoriusProtocol::OpcUa => {
// Delegate to blocking OPC-UA SDK via spawn_blocking
let host = self.config.host.clone();
let port = self.config.port;
helix_hal::call_blocking_sdk(move || {
SartoriusConnection::connect_opcua(&host, port)
.map_err(|e| InstrumentError::ConnectionRefused {
instrument_id: format!("sartorius:biostat-b:{host}"),
endpoint: format!("{host}:{port}"),
reason: e.to_string(),
vendor_error_code: e.opcua_status_code(),
})
}).await?
}
SartoriusProtocol::DcuProprietary => {
// Async TCP connection — no spawn_blocking needed
SartoriusConnection::connect_dcu(&self.config.host, self.config.port)
.await
.map_err(|e| InstrumentError::ConnectionRefused {
instrument_id: self.instrument_id.to_string(),
endpoint: format!("{}:{}", self.config.host, self.config.port),
reason: e.to_string(),
vendor_error_code: None,
})?
}
SartoriusProtocol::ModbusTcp => {
SartoriusConnection::connect_modbus(&self.config.host, self.config.port)
.await
.map_err(|e| InstrumentError::ConnectionRefused {
instrument_id: self.instrument_id.to_string(),
endpoint: format!("{}:{}", self.config.host, self.config.port),
reason: e.to_string(),
vendor_error_code: None,
})?
}
};
self.connection = Some(Arc::new(Mutex::new(conn)));
self.status = InstrumentStatus::Connected;
Ok(())
}
async fn disconnect(&mut self) -> Result<(), InstrumentError> {
self.status = InstrumentStatus::Disconnecting;
if let Some(conn) = self.connection.take() {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(5),
helix_hal::call_blocking_sdk(move || {
conn.lock().unwrap().close()
.map_err(|_| InstrumentError::Timeout {
instrument_id: "sartorius".into(),
operation: "disconnect".into(),
budget_ms: 5000,
})
})
).await;
}
self.status = InstrumentStatus::Disconnected;
Ok(())
}
fn status(&self) -> InstrumentStatus { self.status }
async fn health_check(&self) -> Result<HealthReport, InstrumentError> {
let conn = self.connection.as_ref()
.ok_or_else(|| InstrumentError::ConnectionLost {
instrument_id: self.instrument_id.to_string(),
reason: "health_check called on disconnected driver".into(),
last_frame_sequence: self.streaming_stats.frames_sent,
was_graceful: false,
})?
.clone();
tokio::time::timeout(
std::time::Duration::from_millis(500),
helix_hal::call_blocking_sdk(move || {
conn.lock().unwrap().ping()
})
).await
.map_err(|_| InstrumentError::Timeout {
instrument_id: self.instrument_id.to_string(),
operation: "health_check".into(),
budget_ms: 500,
})?
.map_err(|e| InstrumentError::ConnectionLost {
instrument_id: self.instrument_id.to_string(),
reason: e.to_string(),
last_frame_sequence: self.streaming_stats.frames_sent,
was_graceful: false,
})
}
fn driver_info(&self) -> DriverInfo {
DriverInfo {
driver_name: env!("CARGO_PKG_NAME").to_string(),
driver_version: env!("CARGO_PKG_VERSION").to_string(),
driver_author: "HELIX OS Systems Engineering".to_string(),
supported_protocols: vec![
"Sartorius-DCU-Proprietary".into(),
"OPC-UA-1.04".into(),
"Modbus-TCP".into(),
],
min_firmware_version: "3.2.0".to_string(),
max_firmware_version: None,
}
}
}
#[async_trait]
impl TelemetrySource for SartoriusBiostatDriver {
fn channels(&self) -> Vec<ChannelDescriptor> {
vec![
ChannelDescriptor {
id: 0, name: "dissolved_oxygen".into(), unit: "mg/L".into(),
nominal_rate_hz: 1.0,
range: helix_hal::ChannelRange { min: 0.0, max: 20.0 },
resolution: 0.01, is_writable: false,
},
ChannelDescriptor {
id: 1, name: "ph".into(), unit: "[pH]".into(),
nominal_rate_hz: 1.0,
range: helix_hal::ChannelRange { min: 2.0, max: 12.0 },
resolution: 0.001, is_writable: true, // pH setpoint is controllable
},
ChannelDescriptor {
id: 2, name: "temperature".into(), unit: "Cel".into(),
nominal_rate_hz: 1.0,
range: helix_hal::ChannelRange { min: 20.0, max: 50.0 },
resolution: 0.01, is_writable: true,
},
ChannelDescriptor {
id: 3, name: "agitation".into(), unit: "RPM".into(),
nominal_rate_hz: 2.0,
range: helix_hal::ChannelRange { min: 0.0, max: 1200.0 },
resolution: 1.0, is_writable: true,
},
]
}
async fn stream_telemetry(
&mut self,
tx: mpsc::Sender<TelemetryFrame>,
config: StreamConfig,
mut stop_signal: tokio::sync::oneshot::Receiver<()>,
) -> Result<StreamStopReason, InstrumentError> {
let mut interval = tokio::time::interval(
std::time::Duration::from_millis(self.config.poll_interval_ms)
);
self.status = InstrumentStatus::Streaming;
loop {
tokio::select! {
biased; // Check stop signal first — ensures clean shutdown
_ = &mut stop_signal => {
self.status = InstrumentStatus::Connected;
return Ok(StreamStopReason::Requested);
}
_ = interval.tick() => {
let conn = match &self.connection {
Some(c) => c.clone(),
None => return Err(InstrumentError::ConnectionLost {
instrument_id: self.instrument_id.to_string(),
reason: "connection handle gone during streaming".into(),
last_frame_sequence: self.streaming_stats.frames_sent,
was_graceful: false,
}),
};
// Read all channels in one blocking call
let packets = helix_hal::call_blocking_sdk(move || {
conn.lock().unwrap().read_all_channels()
}).await?;
for (channel_id, packet) in packets.iter().enumerate() {
let frame = self.translate_dcu_packet(packet, channel_id as u16);
match tx.try_send(frame) {
Ok(()) => {
self.streaming_stats.frames_sent += 1;
}
Err(mpsc::error::TrySendError::Full(_)) => {
// Channel full — apply drop policy
// The HAL supervisor handles actual dropping via
// force_drop_oldest. Here we just count it.
self.streaming_stats.frames_dropped_backpressure += 1;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
// Consumer side closed — this is a supervisor shutdown
self.status = InstrumentStatus::Connected;
return Ok(StreamStopReason::Requested);
}
}
}
}
}
}
}
fn streaming_stats(&self) -> StreamingStats { self.streaming_stats }
}
// Export the driver manifest using the helper macro (§2.5)
export_driver!(SartoriusBiostatDriver, "sartorius-biostat-b,sartorius-biostat-q-plus");
7.2 Illumina DRAGEN Driver Skeleton
// drivers/illumina-dragen/src/lib.rs
//
// The DRAGEN platform is read-only (no setpoints) and produces burst data
// during sequencing runs. It uses BackPressure drop policy.
use async_trait::async_trait;
use helix_hal::{export_driver, Instrument, TelemetrySource, InstrumentCapabilities};
pub struct IlluminaDragenDriver {
// DRAGEN communicates via PCIe BAR registers mapped to /dev/dragen0
// or via the Illumina SDK over localhost gRPC.
config: DragenConfig,
// ...
}
// Illumina is a read-only instrument — does NOT implement Commandable or Calibratable.
// This is reflected in capabilities() returning accepts_commands: false.
// When a user attempts to send a command to an Illumina instrument,
// the HAL supervisor rejects it before calling dispatch_command()
// (which doesn't exist on this driver — it doesn't implement Commandable).
impl Instrument for IlluminaDragenDriver {
fn capabilities(&self) -> InstrumentCapabilities {
InstrumentCapabilities {
streams_telemetry: true,
accepts_commands: false, // Read-only
supports_calibration: false,
supports_simulation_mode: true,
has_onboard_logging: true,
}
}
// ... other Instrument methods
}
// BackPressure drop policy: the DRAGEN SDK has its own buffer;
// if we can't consume fast enough, we tell the SDK to pause,
// rather than dropping data on the HELIX side.
// Configured in helix.toml:
// [instruments.illumina-novaseq-x-01]
// drop_policy = "Backpressure"
export_driver!(IlluminaDragenDriver, "illumina-novaseq-x,illumina-nextseq-2000");
7.3 Generic OPC-UA Driver
// drivers/opcua-generic/src/lib.rs
//
// A site-configurable OPC-UA client. Any instrument that speaks OPC-UA 1.04
// can be integrated by providing a NodeID mapping in the config, without
// writing a custom driver.
use async_trait::async_trait;
use helix_hal::{export_driver, Instrument, TelemetrySource, Commandable,
InstrumentCapabilities, InstrumentError, InstrumentCommand,
CommandResult, CommandAuditRecord, ChannelDescriptor};
#[derive(serde::Deserialize)]
pub struct OpcUaGenericConfig {
pub endpoint_url: String, // "opc.tcp://bioreactor-01.local:4840"
pub security_mode: OpcUaSecurityMode,
pub channel_map: Vec<OpcUaChannelMapping>,
}
#[derive(serde::Deserialize)]
pub struct OpcUaChannelMapping {
pub channel_id: u16,
pub node_id: String, // "ns=2;s=Bioreactor.DO.PV"
pub display_name: String,
pub unit: String,
pub is_writable: bool,
pub setpoint_node_id: Option<String>, // "ns=2;s=Bioreactor.DO.SP"
}
pub struct OpcUaGenericDriver {
config: OpcUaGenericConfig,
// ...
}
// dispatch_command translates InstrumentCommand::SetSetpoint into an OPC-UA
// Write service call to the configured setpoint_node_id.
// All other command types return OperationNotSupported.
export_driver!(OpcUaGenericDriver, "opcua-generic");
8. Driver Validation & Certification
Before a driver can be installed in a HELIX OS production (GMP) environment, it must pass the driver certification test suite. The suite is part of the helix-hal-testkit crate and is run as part of the driver's CI pipeline.
# Run the full driver certification suite against a driver .so
helix-hal-testkit certify \
--driver ./target/release/libsartorius_biostat.so \
--config tests/sartorius-test-config.json \
--instrument-simulator sartorius-biostat-sim \ # Starts a software simulator of the instrument
--report ./certification-report.pdf
The certification suite validates:
| Test Category | What Is Tested |
|---|---|
| ABI Conformance | helix_driver_manifest is present; ABI version is correct; create_instance / destroy_instance are symmetric (no leaks, confirmed by Valgrind and ASAN) |
| Trait Contract | All required methods return consistent values; connect() is idempotent; disconnect() always succeeds |
| Telemetry Integrity | Every TelemetryFrame has correct CRC-32C; sequence is monotonic; no field overflows |
| Error Handling | All InstrumentError variants are exercised; recovery_action() returns correct values; requires_audit_entry() is correct |
| Backpressure | Driver does not block when channel is full (DropOldest/DropNewest policies); driver yields correctly under backpressure |
| Async Safety | No executor blocking (enforced by tokio-test with custom waker that panics on missing yields) |
| Cancellation Safety | All async methods are safe to cancel; no resource leaks on cancellation (confirmed by LSAN) |
| Memory Safety | ASAN/MSAN/LSAN clean; no use-after-free on LoadedDriver::drop(); instance/library drop order is correct |
| Hot-Reload | Driver can be loaded, connected, streamed, stopped, disconnected, and dropped; library can then be unloaded without crash |
| Calibration Compliance | CalibrationRecord.r_squared is computed correctly; calibration failure produces mandatory deviation record |
9. Appendix A — Full Trait Reference
Trait Required? Condition Capabilities Flag
───────────────── ────────── ────────────────────────────────── ──────────────────────────
Instrument Yes All drivers N/A (base trait)
TelemetrySource Cond. If instrument produces measurements streams_telemetry = true
Commandable Cond. If instrument accepts commands accepts_commands = true
Calibratable Cond. If instrument supports calibration supports_calibration = true
DriverManifest Yes All drivers (via export_driver!) N/A (loader entry point)
10. Appendix B — Driver Manifest Schema
The driver manifest is declared in helix.toml under [drivers]:
# /etc/helix/helix.toml
[instruments.bioreactor-brx-07]
driver_library = "/opt/helix/drivers/libsartorius_biostat.so"
display_name = "Sartorius Biostat B — BRX-07 (Upstream Suite A)"
auto_connect = true
reconnect_retries = 10
drop_policy = "DropOldest"
backpressure_buffer_seconds = 5.0
channel_capacity_override = 0 # 0 = use calculated default
[instruments.bioreactor-brx-07.config]
host = "brx-07.rtpbio.local"
port = 4840
protocol = "OpcUa"
poll_interval_ms = 1000
username = "helix-service"
password_env_var = "HELIX_BRX07_PASSWORD"
[instruments.novaseq-x-01]
driver_library = "/opt/helix/drivers/libillumina_dragen.so"
display_name = "Illumina NovaSeq X — SEQ-01 (Genomics Lab)"
auto_connect = true
reconnect_retries = 5
drop_policy = "Backpressure"
[instruments.novaseq-x-01.config]
endpoint_url = "grpc://novaseq-x-01.rtpbio.local:7777"
run_monitor_poll_ms = 5000
End of HELIX-HAL-0001.
This document defines the complete binary and behavioral contract for HELIX OS instrument drivers. Drivers that pass the certification suite defined in §8 are eligible for inclusion in the HELIX Driver Registry and deployment in GMP-validated environments.
Revision history: git log --follow docs/hal/HELIX-HAL-0001.md