voyant_api

Voyant API - Python bindings for Voyant point cloud data and services.

 1"""Voyant API - Python bindings for Voyant point cloud data and services."""
 2
 3# Import the native extension module first (this loads the Rust code)
 4from .voyant_api import VoyantFrame
 5from .voyant_api import VoyantPlayback
 6from .voyant_api import VoyantClient
 7from .voyant_api import VoyantRecorder
 8from .voyant_api import RecordStatus
 9from .voyant_api import init_voyant_logging
10from .voyant_api import create_default_frame
11from .voyant_api import create_test_frame
12from .voyant_api import CarbonClient
13from .voyant_api import CarbonConfig
14# from .voyant_api import protos, playback, client
15
16# Utility submodules are not imported here to avoid importing heavier
17# dependencies (such as pandas and pypcd4) unless they are actually needed.
18# Import explicitly as needed:
19#   from voyant_api import pandas_utils
20#   from voyant_api import pcd_utils
21
22__version__ = "0.8.2"
23__all__ = [
24    "CarbonClient",
25    "CarbonConfig",
26    "VoyantFrame",
27    "VoyantPlayback",
28    "VoyantClient",
29    "VoyantRecorder",
30    "RecordStatus",
31    "init_voyant_logging",
32    "create_default_frame",
33    "create_test_frame",
34    # "protos",
35    # "playback",
36    # "client",
37]
class CarbonClient:

Python client for receiving frames from a Carbon LiDAR sensor.

Example:

config = CarbonConfig() config.set_bind_addr("0.0.0.0:5678") config.set_group_addr("224.0.0.0") config.set_interface_addr("192.168.1.100") client = CarbonClient(config) client.start() while client.is_running(): frame = client.try_receive_frame() if frame: process(frame) client.stop()

CarbonClient(config: CarbonConfig)

Create a CarbonClient from a CarbonConfig. Does not start receiving — call start().

def start(self) -> None:

Start receiving and processing data.

def stop(self) -> None:

Stop receiving and processing data. Safe to call multiple times. The client can be restarted by calling start() again.

def try_receive_frame(self) -> Optional[VoyantFrame]:

Try to get the latest assembled frame if one is available.

Returns:

VoyantFrame if a new frame is available, None otherwise. Logs a warning if frames were dropped since the last call.

def is_running(self) -> bool:

Returns true if the client is running and no OS shutdown has been requested.

def wait_for_shutdown(self) -> None:

Block the calling thread until an OS shutdown signal or stop() is called.

class CarbonConfig:

Configuration for the Carbon LiDAR pipeline.

Construct either from a JSON file or with defaults and setters.

CarbonConfig()

Create a CarbonConfig with default values.

Example:

config = CarbonConfig() config.set_bind_addr("0.0.0.0:5678") config.set_group_addr("224.0.0.0") config.set_interface_addr("192.168.1.100") config.set_range_max(50.0) config.set_pfa(1e-4) client = CarbonClient(config)

def from_json(path: str) -> CarbonConfig:

Load a CarbonConfig from a JSON file. Missing fields default to their standard values.

Example:

config = CarbonConfig.from_json("config.json") client = CarbonClient(config)

def set_bind_addr(self, v: str) -> None:

Set the UDP bind address including port (e.g. "0.0.0.0:5678")

def set_group_addr(self, v: str) -> None:

Set the multicast group address (e.g. "224.0.0.0")

def set_interface_addr(self, v: str) -> None:

Set the network interface address (e.g. "0.0.0.0" for default)

def set_use_msg_timestamp(self, v: bool) -> None:

Use timestamps from received packets instead of system time on receipt.

def set_batch_size(self, v: int) -> None:

Maximum number of peaks messages per batch before flushing to the pipeline.

def set_recv_buffer_size(self, v: int) -> None:

Size of the pre-allocated UDP receive buffer in bytes.

def set_receiver_channel_capacity(self, v: int) -> None:

Channel capacity for batched messages from receiver to pipeline.

def set_pfa(self, v: float) -> None:

Probability of False Alarm for optional SNR thresholding. Recommended range: [1e-6, 1e-3].

def set_vel_corr_factor(self, v: float) -> None:

Velocity correction multiplier.

def set_bandwidth_hz(self, v: float) -> None:

Optionally override the chirp sweep bandwidth in Hz.

def set_elevation_fov_deg(self, v: float) -> None:

Elevation field of view in degrees.

def set_keep_invalid_points(self, v: bool) -> None:

Keep invalid points (drop_reason != Success) in the assembled point cloud. Default: false (invalid points are filtered out).

def set_range_min(self, v: float) -> None:

Minimum range filter in meters.

def set_range_max(self, v: float) -> None:

Maximum range filter in meters.

def set_doppler_min(self, v: float) -> None:

Minimum Doppler filter in m/s.

def set_doppler_max(self, v: float) -> None:

Maximum Doppler filter in m/s.

def set_azimuth_deg_min(self, v: float) -> None:

Minimum azimuth filter in degrees.

def set_azimuth_deg_max(self, v: float) -> None:

Maximum azimuth filter in degrees.

def set_elevation_deg_min(self, v: float) -> None:

Minimum elevation filter in degrees.

def set_elevation_deg_max(self, v: float) -> None:

Maximum elevation filter in degrees.

def set_spatial_range_threshold(self, v: float) -> None:

Range threshold for spatial outlier filter in meters.

def set_spatial_doppler_threshold(self, v: float) -> None:

Doppler threshold for spatial outlier filter in m/s.

def set_spatial_min_neighbors(self, v: int) -> None:

Minimum number of neighbors for a point to pass the spatial outlier filter.

def set_interp_range_threshold(self, v: float) -> None:

Range threshold for elevation interpolation in meters.

def set_interp_doppler_threshold(self, v: float) -> None:

Doppler threshold for elevation interpolation in m/s.

def set_missing_elevations(self, v: Sequence[int]) -> None:

Missing elevation indices to interpolate (must be even: 0, 2, 4, … 254). Setting this enables elevation interpolation. Default: disabled (empty list).

def set_mirror_peaks(self, v: bool) -> None:

Enable multicast mirroring of raw peaks for external logging.

def set_mirror_group_addr(self, v: str) -> None:

Multicast group address for peaks mirroring (e.g. "224.0.0.1:5555").

def set_mirror_bind_addr(self, v: str) -> None:

Bind address for the peaks mirror sender (e.g. "127.0.0.1:0").

def set_eval_mode(self, v: bool) -> None:

Enable timing and drop statistics evaluation with generated reports.

def set_report_dir(self, v: str) -> None:

Directory for evaluation report files. Only used when eval mode is enabled. Default: "./reports"

class VoyantFrame:

Python-exposed frame containing point cloud data

def xyz_columns() -> list[str]:

Get column names for the xyz() array

def xyzv_columns() -> list[str]:

Get column names for the xyzv() array

def spherical_columns() -> list[str]:

Get column names for the spherical() array

def spherical_v_columns() -> list[str]:

Get column names for the spherical_v() array

def points_columns() -> list[str]:

Get column names for the points() array

def points_extended_columns() -> list[str]:

Get column names for the points_extended() array

def special_test_columns() -> list[str]:

Get column names for the special_test() array

def xyz(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get XYZ coordinates as (N, 3) array

def valid_xyz(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get XYZ coordinates as (N, 3) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: xyz()

def xyzv(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get XYZ + radial velocity as (N, 4) array

def valid_xyzv(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get XYZ + radial velocity as (N, 4) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: xyzv()

def spherical( self, degrees: Optional[bool] = None) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get spherical coordinates as (N, 3) array [range, azimuth, elevation]

Arguments:
  • degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
def valid_spherical( self, degrees: Optional[bool] = None) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get spherical coordinates as (N, 3) array, filtering out invalid points

Arguments:
  • degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
def spherical_v( self, degrees: Optional[bool] = None) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get spherical coordinates + velocity as (N, 4) array [range, azimuth, elevation, radial_vel]

Arguments:
  • degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
def valid_spherical_v( self, degrees: Optional[bool] = None) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get spherical coordinates + velocity as (N, 4) array, filtering out invalid points

Arguments:
  • degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
def points(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get points as a 2D array (N, 7) with columns: [x, y, z, radial_vel, snr_linear, nanosecs_since_frame, drop_reason]

def valid_points(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get all point data as (N, 7) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: points()

def points_extended(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get points as a 2D array (N, 11) with columns: [x, y, z, radial_vel, snr_linear, nanosecs_since_frame, drop_reason, calibrated_reflectance, noise_mean_estimate, min_ramp_snr, point_index]

def valid_points_extended(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get all point data as (N, 11) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: points_extended()

def special_test(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get points as a 2D array (N, 5) with columns: [azimuth_idx, elevation_idx, range, doppler, snr]

This is designed for internal systems test team usage

def valid_special_test(self) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float32]]:

Get special test data as (N, 5) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: special_test()

def valid_mask(self) -> list[bool]:

Get boolean mask of valid points

def describe(self) -> None:

Print detailed frame information (like the Rust print_info)

frame_index: int
n_points: int
device_id: str
n_valid_points: int
timestamp: float
class VoyantPlayback:

Python wrapper for VoyantPlayback Provides simple frame-by-frame playback control

VoyantPlayback( rate: Optional[float] = None, loopback: Optional[bool] = None, filter_points: Optional[bool] = None)

Create a new VoyantPlayback instance

Arguments:
  • rate: Playback rate (1.0 = real-time, None = as fast as possible, default: None)
  • loopback: Whether to loop when reaching the end of file (default: false)
  • filter_points: Whether to filter out invalid points during playback (default: true)
def open(self, file_path: str) -> None:

Open a file for playback

Arguments:
  • file_path: Path to the recording file
def reset(self) -> None:

Reset playback to the beginning of the file

loopback: bool

Get loopback setting

rate: Optional[float]

Get playback rate

is_open: bool

Check if a file is currently open

frame_timestamp: float

Get the current frame timestamp in seconds

frame_index: int

Get the current frame index

class VoyantClient:

Python wrapper for PointsClient Provides simple interface for receiving frames over UDP

VoyantClient( bind_addr: str, group_addr: str, interface_addr: str, filter_points: Optional[bool] = None, use_msg_stamps: Optional[bool] = None, track_timing: Optional[bool] = None, disable_spn: Optional[bool] = None)

Create a new multicast client

Arguments:
  • bind_addr: Local socket address to bind to (e.g., "0.0.0.0:4444")
  • group_addr: Multicast group address (e.g., "224.0.0.0")
  • interface_addr: Interface address (e.g., "127.0.0.1")
  • filter_points: Whether to filter out invalid points (default: true)
  • use_msg_stamps: Whether to use timestamps from received point groups [false = use system time] (default: false)
  • track_timing: Whether to collect statistics on packet timing and latency (default: false)
  • disable_spn: Whether to disable Single Point Noise filtering (default: false)
Warning:

Do NOT set disable_spn=True with Carbon Dev Kit (Meadowlark) units!

def try_receive_frame(self) -> Optional[VoyantFrame]:

Try to get the latest frame if available

Returns:

VoyantFrame if new data is available, None otherwise

class VoyantRecorder:

Python wrapper for VoyantRecorder Provides interface for recording frames to disk with automatic file splitting

VoyantRecorder( output_path: str, timestamp_filename: Optional[bool] = None, frames_per_file: Optional[int] = None, duration_per_file: Optional[int] = None, size_per_file_mb: Optional[int] = None, max_total_frames: Optional[int] = None, max_total_duration: Optional[int] = None, max_total_size_mb: Optional[int] = None, buffer_size_mb: Optional[int] = None)

Create a new VoyantRecorder instance

Arguments:
  • output_path: Base path for output recording file(s)
  • timestamp_filename: Whether to add timestamp to filename (default: true)
  • frames_per_file: Maximum frames per file before splitting (None = no limit)
  • duration_per_file: Maximum seconds per file before splitting (None = no limit)
  • size_per_file_mb: Maximum MB per file before splitting (None = no limit)
  • max_total_frames: Maximum total frames across all files (None = no limit)
  • max_total_duration: Maximum total seconds across all files (None = no limit)
  • max_total_size_mb: Maximum total MB across all files (None = no limit)
  • buffer_size_mb: Frame buffer size in MB for writing (default: 4, advanced users only)
def record_frame(self, frame: VoyantFrame) -> RecordStatus:

Record a frame to disk

Arguments:
  • frame: VoyantFrame to record
Returns:

RecordStatus indicating the result:

def finalize(self) -> None:

Finalize the recording and close all files

This should be called when recording is complete to ensure all data is written and files are properly closed.

frames_recorded: int

Get the total number of frames recorded across all files

split_count: int

Get the number of files created (split count)

is_active: bool

Check if the recorder is still active (not finalized)

class RecordStatus:

Simple result status for recording operations

Frame recorded successfully, continue recording

Frame recorded, but file was split due to reaching limits

Recording limit reached, should stop recording

def init_voyant_logging() -> None:

Initialize Voyant logging system from Python

This function initializes the Rust logging system for use in Python applications. It sets up logging to stderr with INFO level as default, but respects the RUST_LOG environment variable for custom configuration.

The logging system supports multiple levels:

  • TRACE: Very detailed diagnostic information for fine-grained debugging
  • DEBUG: Detailed diagnostic information
  • INFO: General informational messages (default)
  • WARN: Warning messages for potentially harmful situations
  • ERROR: Error messages for failures

Note: This function should be called only once, typically during application startup. Calling it multiple times will cause the program to panic.

Environment Variables: Set RUST_LOG environment variable to control logging:

  • RUST_LOG=trace - Enable trace level logging (most verbose)
  • RUST_LOG=debug - Enable debug level logging
  • RUST_LOG=info - Enable info level logging (default)
  • RUST_LOG=warn - Enable warning level logging only
  • RUST_LOG=error - Enable error level logging only
  • RUST_LOG=off - Disable all logging
Raises:
  • RuntimeError: If logging has already been initialized
def create_default_frame() -> VoyantFrame:

Create a default, empty VoyantFrame for Python

This function creates a minimal frame with all default values. Useful for testing serialization/deserialization or as a starting point for manual frame construction.

Returns:

PyVoyantFrame: An empty frame with default header and no points

def create_test_frame( timestamp_seconds: Optional[int], timestamp_nanoseconds: Optional[int], frame_index: Optional[int], elevations: Optional[int], azimuths: Optional[int], range: Optional[float], drop_center: Optional[bool]) -> VoyantFrame:

Create a test VoyantFrame with synthetic point cloud data arranged in a curved wall pattern

This function generates a complete frame with realistic point cloud data for testing purposes. The points are arranged in a curved wall pattern similar to what a real LiDAR sensor would produce when scanning a wall at a fixed distance. Each point includes proper timing information, with timestamps incrementing every 8 points to simulate the sensor's parallel channel acquisition.

Arguments:
  • timestamp_seconds (Optional[int]): Unix timestamp seconds for the frame header. Defaults to 0. Use actual timestamps for time-based testing.
  • timestamp_nanoseconds (Optional[int]): Nanoseconds portion of the timestamp (0-999999999). Defaults to 0. Combined with timestamp_seconds for precise frame timing.
  • frame_index (Optional[int]): Sequential frame number for tracking frame order. Defaults to 0. Should increment for each frame in a sequence.
  • elevations (Optional[int]): Number of elevation scan lines in the point cloud. Defaults to 32. Higher values create denser vertical resolution.
  • azimuths (Optional[int]): Number of azimuth points per elevation line. Defaults to 720. Higher values create denser horizontal resolution.
  • range (Optional[float]): Distance to the wall in meters. Defaults to 10.0. All points will be at approximately this distance.
  • drop_center (Optional[bool]): Whether to mark center region points as invalid. Defaults to False. When True, simulates obstruction or invalid returns in FOV center.
Returns:

PyVoyantFrame: A complete frame with header, config, and point cloud data. The frame will contain elevations * azimuths total points arranged in a grid pattern.

Notes:
  • Point timestamps increment by 16384 nanoseconds every 8 points to simulate the sensor's 8-channel parallel acquisition pattern
  • The generated header includes device ID "SIM-001" to indicate synthetic data
  • All version fields (proto, API, firmware, HDL) are populated with test values
  • Points are arranged in elevation-first scan order (all elevations at first azimuth, then move to next azimuth)
  • When drop_center is True, approximately 25% of the center FOV will have invalid points