Source code for nistreamer.utils

"""Miscellaneous helper tools."""

from ._nistreamer import connect_terms as _connect_terms
from ._nistreamer import disconnect_terms as _disconnect_terms
from ._nistreamer import reset_dev as _reset_dev
import numpy as np
from typing import Union, Optional
# Import plotly
PLOTLY_INSTALLED = False
try:
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    PLOTLY_INSTALLED = True
except ImportError:
    print(
        'Warning! Plotly package is not installed. You can still use the streamer, '
        'but plotting functionality will not be available.\n'
        'To install, run `pip install plotly` in your Python environment'
    )


# region NI DAQmx functions
[docs] def connect_terms(src: str, dest: str): """Create a static connection between terminals. *Hint:* you can find the list of available terminals and signals as well as permitted routes for each card using NI MAX app. Click the specific card entry in the device tree on the left, then hit the "Device Routes" tab on the bottom of the window. **CAUTION!** Static conntections are independent of any NI tasks and will persist until explicily undone, involved cards are reset, or the full system is power-cycled. If left behind, such static connections can lead to physical line double-driving, very confusing sync issues, and even hardware damage. Args: src: full source terminal or signal name dest: full destination terminal name Raises: ValueError: if any terminal name is invalid or if the connection cannot be established. Examples: >>> connect_terms(src='/Dev1/PFI0', dest='/Dev1/PXI_Trig0') >>> connect_terms(src='/Dev2/10MHzRefClock', dest='/Dev2/PFI0') See Also: Use :meth:`disconnect_terms` or :meth:`reset_dev` to undo static connections. """ return _connect_terms(src=src, dest=dest)
[docs] def disconnect_terms(src: str, dest: str): """Undo static connection. Args: src: full source terminal or signal name dest: full destination terminal name Raises: ValueError: if any terminal name is invalid. """ return _disconnect_terms(src=src, dest=dest)
[docs] def share_10mhz_ref(dev: str, term: str): """Statically export 10 MHz reference clock signal. **CAUTION!** Static conntections are independent of any NI tasks and will persist until explicily undone, involved cards are reset, or the full system is power-cycled. If left behind, such static connections can lead to physical line double-driving, very confusing sync issues, and even hardware damage. Args: dev: device name term: terminal name Raises: ValueError: if parameters are invalid Examples: >>> share_10mhz_ref(dev='Dev1', term='PFI0') See Also: Consider using a safer way of 10 MHz reference export by setting :meth:`~nistreamer.streamer.NIStreamer.ref_clk_provider` property. This will automatically undo export when run is finished. If still choosing manual approach, use :meth:`unshare_10mhz_ref` or :meth:`reset_dev` to undo this export afterwards. """ connect_terms( src=f'/{dev}/10MHzRefClock', dest=f'/{dev}/{term}' )
[docs] def unshare_10mhz_ref(dev: str, term: str): """Undo static export of 10 MHz reference clock signal. Args: dev: device name term: terminal name Raises: ValueError: if parameters are invalid """ disconnect_terms( src=f'/{dev}/10MHzRefClock', dest=f'/{dev}/{term}' )
[docs] def reset_dev(name: str): """Perform hardware reset. Args: name: device name as shown in NI MAX """ return _reset_dev(name=name)
# endregion # region iplot
[docs] class RendOption: """Enum-like collection of select Plotly renderer options. See `Plotly docs <https://plotly.com/python/renderers/>`_ for the full list of available renderers. """ browser = 'browser' notebook = 'notebook' svg = 'svg' png = 'png' jpeg = 'jpeg'
[docs] def iplot(chan_list, start_time: Union[float, None] = None, end_time: Union[float, None] = None, nsamps: Optional[int] = 1000, renderer: Optional[str] = 'browser', row_height: Union[float, None] = None): """Plot signals for a list of channels. Values are computed for a grid of ``nsamps`` time points uniformly distributed over the closed interval ``[start_time, end_time]``. Note that sequence has to be freshly compiled to plot. Args: chan_list: list of channel proxy instances to plot (trace order will follow the list order) start_time: window start time. If ``None``, zero time is used end_time: window end time. If ``None``, compiled sequence end time is used nsamps: number of time points to evaluate for each channel renderer: Plotly renderer to use. A few options are collected in :class:`RendOption` row_height: channel sub-plot height Raises: ImportError: if ``plotly`` is not installed ValueError: if sequence is not freshly compiled or any parameters are invalid Notes: You may need to select a sufficiently large ``nsamps`` and a sufficiently narrow time window to see the true waveform shape that the actual stream would produce when sampling at the hardware clock rate. Otherwise, very narrow pulses may be missed and periodic waveforms may appear distorted due to undersampling. """ if not PLOTLY_INSTALLED: raise ImportError('Plotly package is not installed. Run `pip install plotly` to get it.') # Sanity checks: if len(chan_list) == 0: raise ValueError("Channel list is empty") streamer_wrap = chan_list[0]._streamer # FixMe: this is a dirty hack. Consider making this function a method of NIStreamer class to get clear access to streamer_wrap streamer_wrap.validate_compile_cache() # Process window start/end times total_run_time = streamer_wrap.shortest_dev_run_time() if start_time is not None: if start_time > total_run_time: raise ValueError(f"Requsted start_time={start_time} exceeds total run time {total_run_time}") else: start_time = 0.0 if end_time is not None: if end_time > total_run_time: raise ValueError(f"Requsted end_time={end_time} exceeds total run time {total_run_time}") else: end_time = total_run_time if start_time > end_time: raise ValueError(f"Requested start_time={start_time} exceeds end_time={end_time}") t_arr = np.linspace(start_time, end_time, nsamps) chan_num = len(chan_list) nsamps = int(nsamps) fig = make_subplots( rows=len(chan_list), cols=1, x_title='Time [s]', # shared_xaxes=True, # Using this option locks X-axes, # but also hides X-axis ticks for all plots except the bottom one ) fig.update_xaxes(matches='x') # Using this option locks X-axes and also leaves ticks if row_height is not None: fig.update_layout(height=1.1 * row_height * chan_num) else: # Row height is not provided - use auto-height and fit everything into the standard frame height. # # Exception - the case of many channels: # - switch off auto and set fixed row height, to make frame extend downwards as much as needed if chan_num > 4: fig.update_layout(height=1.1 * 200 * chan_num) for idx, chan in enumerate(chan_list): signal_arr = chan.calc_signal(start_time=start_time, end_time=end_time, nsamps=nsamps) fig.add_trace( go.Scatter(x=t_arr, y=signal_arr, name=chan.nickname), row=idx + 1, col=1 ) fig.show(renderer=renderer)
# endregion