Source code for nistreamer.streamer

"""Streamer module.

Contains the main ``NIStreamer`` class, as well as ``StreamHandle`` inner class
for stream control in ``with`` context.
"""

from ._nistreamer import StreamerWrap as _StreamerWrap
from .card import BaseCardProxy, AOCardProxy, DOCardProxy
from typing import Optional, Literal, Union, Tuple, Type


[docs] class NIStreamer: """Represents the whole streamer """ def __init__(self): """Creates a new empty instance.""" self._streamer = _StreamerWrap() self._ao_cards = dict() self._do_cards = dict() def __getitem__(self, item): if item in self._ao_cards.keys(): return self._ao_cards[item] elif item in self._do_cards.keys(): return self._do_cards[item] else: raise KeyError(f'There is no card with max_name "{item}" registered') def __repr__(self): return ( f'NIStreamer instance\n' f'\n' f'AO cards: {list(self._ao_cards.keys())}\n' f'DO cards: {list(self._do_cards.keys())}\n' f'\n' f'Hardware settings:\n' f'\tCalc/write chunk size: {self.chunksize_ms} ms\n' f'\t 10MHz ref provider: {self.ref_clk_provider}\n' f'\t Starts-last card: {self.starts_last}' ) def _add_card(self, card_type: Literal['AO', 'DO'], max_name: str, samp_rate: float, proxy_class: Type[BaseCardProxy], nickname: Optional[str] = None) -> BaseCardProxy: """Base for ``add_card`` methods.""" if card_type == 'AO': streamer_method = _StreamerWrap.add_ao_dev target_dict = self._ao_cards elif card_type == 'DO': streamer_method = _StreamerWrap.add_do_dev target_dict = self._do_cards else: raise ValueError(f'Invalid card type "{card_type}". Valid type strings are "AO" and "DO"') # Call to NIStreamer struct in the compiled Rust backend streamer_method( self._streamer, name=max_name, samp_rate=samp_rate ) # Proxy object proxy = proxy_class( _streamer=self._streamer, max_name=max_name, nickname=nickname ) target_dict[max_name] = proxy return proxy
[docs] def add_ao_card(self, max_name: str, samp_rate: float, nickname: Optional[str] = None, proxy_class: Optional[Type[BaseCardProxy]] = AOCardProxy): """Add an analog output card. Args: max_name: name of the card as shown in NI MAX samp_rate: sample rate in Hz nickname: human-readable name (e.g. "Fast AO card"; used for visualizations) proxy_class: custom subclass of ``BaseCardProxy`` to use for the device proxy. Returns: ``proxy_class`` instance representing this card. Raises: KeyError: if a card with the same name already exists. """ return self._add_card( card_type='AO', max_name=max_name, samp_rate=samp_rate, nickname=nickname, proxy_class=proxy_class )
[docs] def add_do_card(self, max_name: str, samp_rate: float, nickname: Optional[str] = None, proxy_class: Optional[Type[BaseCardProxy]] = DOCardProxy): """Add a digital output card. Args: max_name: name of the card as shown in NI MAX samp_rate: sample rate in Hz nickname: human-readable name (e.g. "Fast DO card"; used for visualizations) proxy_class: custom subclass of ``BaseCardProxy`` to use for the device proxy. Returns: ``proxy_class`` instance representing this card. Raises: KeyError: if a card with the same name already exists. """ return self._add_card( card_type='DO', max_name=max_name, samp_rate=samp_rate, nickname=nickname, proxy_class=proxy_class )
@property def chunksize_ms(self) -> float: """Streaming chunk size in milliseconds. Chunk size is the main unit of streaming - all signal samples within a single chunk are computed, stored in memory, and transferred to the hardware in one operation together, at the same time as the previous chunk is playing. Optimal chunk size depends on the tradeoff. Larger size reduces the risk of buffer underflow at the cost of increased overhead - memory allocation and the initial chunk compute will take longer. The default value is 150 ms. Notes: If chunk size exceeds the total sequence duration, no streaming actually happens - all samples are computed before generation starts. This could be used to eliminate the risk of underflows if sequence is sufficiently short. However, in-stream looping feature will be disabled, and trying to pre-sample a long waveform can overfill RAM. """ return self._streamer.get_chunksize_ms() @chunksize_ms.setter def chunksize_ms(self, val: float): self._streamer.set_chunksize_ms(val=val) @property def starts_last(self) -> Union[str, None]: """Specifies which card starts last. Format: * ``dev_name: str`` - this card waits for all others to start first; * ``None`` (default) - each threads starts its task independently. Typically, this is needed when start trigger or shared sample clock are used for hardware synchronisation. The card-provider of the signal should call ``ni_task.start()`` last, otherwise some "consumer" cards could start later and miss the signal. """ return self._streamer.get_starts_last() @starts_last.setter def starts_last(self, name: Union[str, None]): self._streamer.set_starts_last(name=name) @property def ref_clk_provider(self) -> Union[Tuple[str, str], None]: """Specifies which card exports its 10 MHz reference clock signal during the run. Format: * ``(card_name: str, term_name: str)`` - card `card_name` exports to terminal `term_name` * ``None`` - no card exports Notes: (1) NIStreamer uses *run-based static* reference clock export - signal is statically exported during ``init_stream()`` and then automatically undone during ``close_stream()`` calls. This export is *static* instead of being tied to NI tasks. As a result, any card supporting 10MHz export can serve as the provider. It does not have to be active (get some instructions) or even be registered in the streamer. (2) If this mechanism is not sufficient, you can manually do a static export from any card by calling :meth:`~nistreamer.utils.share_10mhz_ref()`. However, such export will not be undone automatically - you should manually call :meth:`~nistreamer.utils.unshare_10mhz_ref()` or :meth:`~nistreamer.utils.reset_dev()`. Forgetting to undo manual export is very easy and dangerous - the exporter card will continue silently feeding the signal until the system is power-cycled. That can lead to physical double-driving and very confusing mis-triggering or mis-clocking errors. So only choose the manual export over the automatic method (1) if absolutely necessary. """ return self._streamer.get_ref_clk_provider() @ref_clk_provider.setter def ref_clk_provider(self, dev_and_term: Union[Tuple[str, str], None]): self._streamer.set_ref_clk_provider(provider=dev_and_term)
[docs] def got_instructions(self) -> bool: """Returns ``True`` if there are some instructions in the edit cache, otherwise ``False``.""" return self._streamer.got_instructions()
[docs] def last_instr_end_time(self) -> Union[float, None]: """Returns the last instruction end time or ``None`` if the edit cache is empty.""" return self._streamer.last_instr_end_time()
[docs] def compile(self, stop_time: Optional[float] = None) -> float: """Compiles the full pulse sequence from instructions in the current edit cache. Args: stop_time: If ``None`` (default), the compiled sequence stops at the last instruction end. Specifying a later stop time extends the sequence duration. Returns: The actual compiled stop time. Raises: ValueError: if provided ``stop_time`` is below the last instruction end. Notes: The actual stop times may vary between cards due to clock grid mismatch and extra ticks on the final closing edges. The returned value is the shortest run time across all cards. If explicit ``stop_time`` is provided, the additional time at the sequence end will be filled according to the usual rules: - Constant values after finite-duration instructions. - Continued waveforms after "go-this" instructions. """ return self._streamer.compile(stop_time=stop_time)
[docs] def validate_compile_cache(self): """Verifies that compile cache is up-to-date with the current edit cache. Returns: ``None`` if compile cache is up-to-date with the current edit cache. Raises: ValueError: If compile cache does not match edit cache (typical reason - users forgot to re-compile after adding more instructions). """ self._streamer.validate_compile_cache()
[docs] def init_stream(self): """Context-based stream initialization. This function should only be used in the ``with`` context. The returned object is a :class:`StreamHandle` instance providing full stream control. Examples: >>> strmr = NIStreamer() >>> # ... add cards/channels, add instructions, compile ... >>> with strmr.init_stream() as stream_handle: >>> stream_handle.launch(instream_reps=10) >>> # Do some other logic while waiting ... >>> stream_handle.wait_until_finished() Raises: ValueError: if stream initialization fails due to invalid settings. See Also: :class:`StreamHandle` for more details about stream controls; :meth:`run` - a more basic way of launching stream. """ return self._ContextManager(streamer=self._streamer)
[docs] def run(self, nreps: Optional[int] = 1): """Runs pulse sequence generation. This method will block and only return after all ``nreps`` iterations have been generated. But it can be interrupted with ``KeyboardInterrupt`` - generation will stop between repetitions. Args: nreps: the number of times to play the sequence. Plays once by default. Raises: ValueError: if stream initialization fails due to invalid settings. RuntimeError: if generation fails during streaming (underflow, timeout, overvoltage). Notes: This method implements basic repeating by stopping and re-starting the stream every time. As a result, there is a fluctuating time gap between subsequent repetitions. Repeating with no gap and rigid timing between iterations is possible with so-called *in-stream looping*. It is only accessible through the context-based interface. See :meth:`init_stream` and :class:`StreamHandle` for more details. See Also: :meth:`init_stream` and :class:`StreamHandle` for full stream control. """ with self.init_stream() as stream_handle: for _ in range(nreps): stream_handle.launch(instream_reps=1) stream_handle.wait_until_finished()
[docs] def close_stream(self): """Closes the stream. You typically do not need to use this method since context manager will automatically close the stream whenever exiting the context. There is a small chance that a rapid succession of ``KeyboardInterrupt`` signals disrupts ``__exit__()`` logic and prevents automatic stream shutdown. But even then, the next call to ``init`` will automatically close it. So this method is mostly exposed for completeness. """ return self._streamer.close_stream()
[docs] def add_reset_instr(self, reset_time: Optional[float] = None): """Helper method - adds a "go-reset-value" instruction on each channel. Args: reset_time: the time at which to add the reset instruction (the same for all channels). If ``None``, the latest last instruction end time from across all channels is used. Raises: ValueError: if requested ``reset_time`` is below the last instruction end time. """ self._streamer.add_reset_instr(reset_time=reset_time)
[docs] def clear_edit_cache(self): """Discards all instructions added so far.""" self._streamer.clear_edit_cache()
[docs] def reset_all(self): """Performs hardware reset on all cards that have been added to the streamer.""" self._streamer.reset_all()
class _ContextManager: """Stream ``with`` context manager. Using an explicit context manager class instead of ``contextlib.contextmanager`` decorator because it results in a simpler traceback from exceptions during ``__enter__()``. This is important since users will see an exception any time ``init_stream()`` fails due to invalid user settings. """ def __init__(self, streamer: _StreamerWrap): self._streamer = streamer def __enter__(self): # The actual stream initialization call to the back-end: self._streamer.init_stream() # If the above call returned without exceptions, stream has been successfully initialized. # Return the handle for use in the `with` context body: return NIStreamer.StreamHandle(streamer=self._streamer) def __exit__(self, *args, **kwargs): self._streamer.close_stream() class StreamHandle: """Handle providing full stream control within ``with`` context. The handle is obtained by initializing stream context (see :meth:`~nistreamer.streamer.NIStreamer.init_stream`): >>> strmr = NIStreamer() >>> # ... add cards/channels, add instructions, compile ... >>> >>> # This is how you initialize stream context: >>> with strmr.init_stream() as stream_handle: >>> # ... This is context body ... >>> >>> # Use handle to control stream: >>> stream_handle.launch(instream_reps=10) >>> >>> # Always wait for stream to finish: >>> stream_handle.wait_until_finished() >>> >>> # Stream will be closed automatically when leaving context """ def __init__(self, streamer: _StreamerWrap): self._streamer = streamer def launch(self, instream_reps=1): """Launch sequence generation. This call is *non-blocking* - returns immediately after generation starts. You must always call :meth:`~nistreamer.streamer.NIStreamer.StreamHandle.wait_until_finished` after launching to ensure generation is complete before making any further stream actions. Args: instream_reps: number of in-stream repetitions. Plays once by default. Raises: RuntimeError: if attempting to launch stream while it is already running. Notes: *In-stream looping* is very different from basic *repeating by re-launching* (see :meth:`~nistreamer.streamer.NIStreamer.run`) - all iterations happen as a single continuous stream without any gaps (but it is still interruptible between repetitions). The limitation is that in-stream looping requires at least a minimal sequence duration of :meth:`~nistreamer.streamer.NIStreamer.chunksize_ms`. Shorter sequences can still be played with ``instream_reps=1`` only or be repeated by re-launching. Alternatively, one can concatenate several repetitions into a single sequence to reach sufficient duration for in-stream looping to work. See Also: You must always call :meth:`~nistreamer.streamer.NIStreamer.StreamHandle.wait_until_finished` after launching stream. """ self._streamer.launch(instream_reps=instream_reps) def wait_until_finished(self, timeout: Optional[float] = None) -> Union[bool, None]: """Block to wait until generation is finished. Args: timeout: wait time limit in seconds. ``None`` - wait indefinitely (default). Returns: In timed mode, ``Ture`` means generation finished, ``False`` means still running when timeout elapsed. Basic mode returns ``None``. Raises: RuntimeError: if there is any stream error (underflow, overvoltage, sync signal loss) Notes: There are two modes depending on ``timeout`` value: * Basic (``timeout=None``, default). Blocks and waits indefinitely until run is finished. Can be interrupted with ``KeyboardInterrupt``. Returns ``None`` when finished. * Timed (``timeout: float``). Blocks and returns ``True`` if run is finished or ``False`` if timeout elapses before. Examples: >>> strmr = NIStreamer() >>> # ... add cards/channels, add instructions, compile ... >>> >>> # Basic wait: >>> with strmr.init_stream() as handle: >>> handle.launch(instream_reps=10) >>> handle.wait_until_finished() >>> >>> # Timed wait - using to implement live progress printing: >>> with strmr.init_stream() as handle: >>> handle.launch(instream_reps=10) >>> while True: >>> finished = handle.wait_until_finished(timeout=1) >>> print(handle.reps_written_count()) >>> if finished: >>> break >>> """ if timeout is None: while True: if self._streamer.wait_until_finished(timeout=1.0): break else: return self._streamer.wait_until_finished(timeout=timeout) def request_stop(self): """Request to stop in-stream loop without completing all repetitions. Streamer will complete the current repetition in progress and then stop. You should call :meth:`~nistreamer.streamer.NIStreamer.StreamHandle.wait_until_finished` after requesting stop to wait until the in-progress iteration is finished. Notes: Practically, you *do not* need to use this function in most cases - simply leaving ``with`` context will automatically request stop and wait until stream finishes before returning: >>> import time >>> strmr = NIStreamer() >>> # ... add cards/channels, add instructions, compile ... >>> >>> # This will stop in-stream loop as well: >>> with strmr.init_stream() as handle: >>> handle.launch(instream_reps=1000000) >>> time.sleep(10) >>> >>> # This will stop the loop whenever `KeyboardInterrupt` is emitted: >>> with strmr.init_stream() as handle: >>> handle.launch(instream_reps=1000000) >>> handle.wait_until_finished() This function is only exposed for advanced cases where you need to break out of in-stream loop (e.g. due to some external condition) and then launch the stream again without incurring re-init overhead: >>> import time >>> strmr = NIStreamer() >>> # ... add cards/channels, add instructions, compile ... >>> >>> with strmr.init_stream() as handle: >>> handle.launch(instream_reps=1000000) >>> time.sleep(10) >>> >>> # Now have to break the loop due to some condition: >>> handle.request_stop() >>> handle.wait_until_finished() >>> >>> # Then launch again: >>> handle.launch(instream_reps=10) >>> handle.wait_until_finished() >>> """ self._streamer.request_stop() def reps_written_count(self) -> int: """Number of fully computed and transferred in-stream repetitions so far. Notes: This value DOES NOT show the current play position and cannot serve as a reliable sync mechanism. Samples are computed and written to cards before they are actually played. So this value - the number of fully *written* repetitions - can be greater than the actual number of fully *played* cycles. The two can differ significantly, especially for very short sequences. Instead, this value is meant to be a coarse progress indicator for long-running in-stream loops (see example below). Examples: >>> strmr = NIStreamer() >>> # ... add cards/channels, add instructions, compile ... >>> >>> # Timed wait + reps_written_count to implement live progress printing: >>> with strmr.init_stream() as handle: >>> handle.launch(instream_reps=1000) >>> while True: >>> finished = handle.wait_until_finished(timeout=1) >>> print(handle.reps_written_count()) >>> if finished: >>> break >>> """ return self._streamer.reps_written_count()