voyant_api
Voyant API - Python bindings for Voyant point cloud data and services.
1"""Voyant API - Python bindings for Voyant point cloud data and services.""" 2 3# Import the native extension module first (this loads the Rust code) 4from .voyant_api import VoyantFrame 5from .voyant_api import VoyantPlayback 6from .voyant_api import VoyantClient 7from .voyant_api import VoyantRecorder 8from .voyant_api import RecordStatus 9from .voyant_api import init_voyant_logging 10from .voyant_api import create_default_frame 11from .voyant_api import create_test_frame 12from .voyant_api import CarbonClient 13from .voyant_api import CarbonConfig 14# from .voyant_api import protos, playback, client 15 16# Utility submodules are not imported here to avoid importing heavier 17# dependencies (such as pandas and pypcd4) unless they are actually needed. 18# Import explicitly as needed: 19# from voyant_api import pandas_utils 20# from voyant_api import pcd_utils 21 22__version__ = "0.8.2" 23__all__ = [ 24 "CarbonClient", 25 "CarbonConfig", 26 "VoyantFrame", 27 "VoyantPlayback", 28 "VoyantClient", 29 "VoyantRecorder", 30 "RecordStatus", 31 "init_voyant_logging", 32 "create_default_frame", 33 "create_test_frame", 34 # "protos", 35 # "playback", 36 # "client", 37]
Python client for receiving frames from a Carbon LiDAR sensor.
Example:
config = CarbonConfig() config.set_bind_addr("0.0.0.0:5678") config.set_group_addr("224.0.0.0") config.set_interface_addr("192.168.1.100") client = CarbonClient(config) client.start() while client.is_running(): frame = client.try_receive_frame() if frame: process(frame) client.stop()
Create a CarbonClient from a CarbonConfig. Does not start receiving — call start().
Stop receiving and processing data. Safe to call multiple times. The client can be restarted by calling start() again.
Try to get the latest assembled frame if one is available.
Returns:
VoyantFrame if a new frame is available, None otherwise. Logs a warning if frames were dropped since the last call.
Configuration for the Carbon LiDAR pipeline.
Construct either from a JSON file or with defaults and setters.
Create a CarbonConfig with default values.
Example:
config = CarbonConfig() config.set_bind_addr("0.0.0.0:5678") config.set_group_addr("224.0.0.0") config.set_interface_addr("192.168.1.100") config.set_range_max(50.0) config.set_pfa(1e-4) client = CarbonClient(config)
Load a CarbonConfig from a JSON file. Missing fields default to their standard values.
Example:
config = CarbonConfig.from_json("config.json") client = CarbonClient(config)
Set the UDP bind address including port (e.g. "0.0.0.0:5678")
Set the network interface address (e.g. "0.0.0.0" for default)
Use timestamps from received packets instead of system time on receipt.
Maximum number of peaks messages per batch before flushing to the pipeline.
Size of the pre-allocated UDP receive buffer in bytes.
Channel capacity for batched messages from receiver to pipeline.
Probability of False Alarm for optional SNR thresholding. Recommended range: [1e-6, 1e-3].
Keep invalid points (drop_reason != Success) in the assembled point cloud. Default: false (invalid points are filtered out).
Range threshold for spatial outlier filter in meters.
Doppler threshold for spatial outlier filter in m/s.
Minimum number of neighbors for a point to pass the spatial outlier filter.
Range threshold for elevation interpolation in meters.
Doppler threshold for elevation interpolation in m/s.
Missing elevation indices to interpolate (must be even: 0, 2, 4, … 254). Setting this enables elevation interpolation. Default: disabled (empty list).
Enable multicast mirroring of raw peaks for external logging.
Multicast group address for peaks mirroring (e.g. "224.0.0.1:5555").
Bind address for the peaks mirror sender (e.g. "127.0.0.1:0").
Python-exposed frame containing point cloud data
Get XYZ coordinates as (N, 3) array
Get XYZ coordinates as (N, 3) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: xyz()
Get XYZ + radial velocity as (N, 4) array
Get XYZ + radial velocity as (N, 4) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: xyzv()
Get spherical coordinates as (N, 3) array [range, azimuth, elevation]
Arguments:
- degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
Get spherical coordinates as (N, 3) array, filtering out invalid points
Arguments:
- degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
Get spherical coordinates + velocity as (N, 4) array [range, azimuth, elevation, radial_vel]
Arguments:
- degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
Get spherical coordinates + velocity as (N, 4) array, filtering out invalid points
Arguments:
- degrees (bool): If True, angles in degrees; if False, angles in radians. Default: False (radians)
Get points as a 2D array (N, 7) with columns: [x, y, z, radial_vel, snr_linear, nanosecs_since_frame, drop_reason]
Get all point data as (N, 7) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: points()
Get points as a 2D array (N, 11) with columns: [x, y, z, radial_vel, snr_linear, nanosecs_since_frame, drop_reason, calibrated_reflectance, noise_mean_estimate, min_ramp_snr, point_index]
Get all point data as (N, 11) array, filtering out invalid points If invalid points are already filtered out, this return is identical to: points_extended()
Get points as a 2D array (N, 5) with columns: [azimuth_idx, elevation_idx, range, doppler, snr]
This is designed for internal systems test team usage
Python wrapper for VoyantPlayback Provides simple frame-by-frame playback control
Create a new VoyantPlayback instance
Arguments:
- rate: Playback rate (1.0 = real-time, None = as fast as possible, default: None)
- loopback: Whether to loop when reaching the end of file (default: false)
- filter_points: Whether to filter out invalid points during playback (default: true)
Python wrapper for PointsClient Provides simple interface for receiving frames over UDP
Create a new multicast client
Arguments:
- bind_addr: Local socket address to bind to (e.g., "0.0.0.0:4444")
- group_addr: Multicast group address (e.g., "224.0.0.0")
- interface_addr: Interface address (e.g., "127.0.0.1")
- filter_points: Whether to filter out invalid points (default: true)
- use_msg_stamps: Whether to use timestamps from received point groups [false = use system time] (default: false)
- track_timing: Whether to collect statistics on packet timing and latency (default: false)
- disable_spn: Whether to disable Single Point Noise filtering (default: false)
Warning:
Do NOT set disable_spn=True with Carbon Dev Kit (Meadowlark) units!
Python wrapper for VoyantRecorder Provides interface for recording frames to disk with automatic file splitting
Create a new VoyantRecorder instance
Arguments:
- output_path: Base path for output recording file(s)
- timestamp_filename: Whether to add timestamp to filename (default: true)
- frames_per_file: Maximum frames per file before splitting (None = no limit)
- duration_per_file: Maximum seconds per file before splitting (None = no limit)
- size_per_file_mb: Maximum MB per file before splitting (None = no limit)
- max_total_frames: Maximum total frames across all files (None = no limit)
- max_total_duration: Maximum total seconds across all files (None = no limit)
- max_total_size_mb: Maximum total MB across all files (None = no limit)
- buffer_size_mb: Frame buffer size in MB for writing (default: 4, advanced users only)
Record a frame to disk
Arguments:
- frame: VoyantFrame to record
Returns:
RecordStatus indicating the result:
- RecordStatus.OK: Frame recorded successfully, continue recording
- RecordStatus.SPLIT: Frame recorded, but file was split due to limits
- RecordStatus.STOP: Recording limit reached, should stop recording
Simple result status for recording operations
Initialize Voyant logging system from Python
This function initializes the Rust logging system for use in Python applications. It sets up logging to stderr with INFO level as default, but respects the RUST_LOG environment variable for custom configuration.
The logging system supports multiple levels:
- TRACE: Very detailed diagnostic information for fine-grained debugging
- DEBUG: Detailed diagnostic information
- INFO: General informational messages (default)
- WARN: Warning messages for potentially harmful situations
- ERROR: Error messages for failures
Note: This function should be called only once, typically during application startup. Calling it multiple times will cause the program to panic.
Environment Variables: Set RUST_LOG environment variable to control logging:
RUST_LOG=trace- Enable trace level logging (most verbose)RUST_LOG=debug- Enable debug level loggingRUST_LOG=info- Enable info level logging (default)RUST_LOG=warn- Enable warning level logging onlyRUST_LOG=error- Enable error level logging onlyRUST_LOG=off- Disable all logging
Raises:
- RuntimeError: If logging has already been initialized
Create a default, empty VoyantFrame for Python
This function creates a minimal frame with all default values. Useful for testing serialization/deserialization or as a starting point for manual frame construction.
Returns:
PyVoyantFrame: An empty frame with default header and no points
Create a test VoyantFrame with synthetic point cloud data arranged in a curved wall pattern
This function generates a complete frame with realistic point cloud data for testing purposes. The points are arranged in a curved wall pattern similar to what a real LiDAR sensor would produce when scanning a wall at a fixed distance. Each point includes proper timing information, with timestamps incrementing every 8 points to simulate the sensor's parallel channel acquisition.
Arguments:
- timestamp_seconds (Optional[int]): Unix timestamp seconds for the frame header. Defaults to 0. Use actual timestamps for time-based testing.
- timestamp_nanoseconds (Optional[int]): Nanoseconds portion of the timestamp (0-999999999). Defaults to 0. Combined with timestamp_seconds for precise frame timing.
- frame_index (Optional[int]): Sequential frame number for tracking frame order. Defaults to 0. Should increment for each frame in a sequence.
- elevations (Optional[int]): Number of elevation scan lines in the point cloud. Defaults to 32. Higher values create denser vertical resolution.
- azimuths (Optional[int]): Number of azimuth points per elevation line. Defaults to 720. Higher values create denser horizontal resolution.
- range (Optional[float]): Distance to the wall in meters. Defaults to 10.0. All points will be at approximately this distance.
- drop_center (Optional[bool]): Whether to mark center region points as invalid. Defaults to False. When True, simulates obstruction or invalid returns in FOV center.
Returns:
PyVoyantFrame: A complete frame with header, config, and point cloud data. The frame will contain elevations * azimuths total points arranged in a grid pattern.
Notes:
- Point timestamps increment by 16384 nanoseconds every 8 points to simulate the sensor's 8-channel parallel acquisition pattern
- The generated header includes device ID "SIM-001" to indicate synthetic data
- All version fields (proto, API, firmware, HDL) are populated with test values
- Points are arranged in elevation-first scan order (all elevations at first azimuth, then move to next azimuth)
- When drop_center is True, approximately 25% of the center FOV will have invalid points