Better documentation + examples

This commit is contained in:
2024-02-28 12:04:31 +01:00
parent a3b100b419
commit 2ee2b10c40
67 changed files with 17311 additions and 37 deletions

3
src/fn_gen/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .dg2052 import DG2052
from .enums import *
from .errors import *

5
src/fn_gen/common.py Normal file
View File

@@ -0,0 +1,5 @@
from .errors import ValueOutOfBoundsError
def check_bounds(bounds: tuple[float, float] | tuple[int, int], value: float | int):
if value < bounds[0] or value > bounds[1]:
raise ValueOutOfBoundsError(bounds, value)

View File

@@ -0,0 +1,10 @@
SIN_RANGE = (1e-6, 50e6)
SQU_RANGE = (1e-6, 15e6)
RAMP_RANGE = (1e-6, 1.5e6)
PULSE_RANGE = (1e-6, 20e6)
HARM_RANGE = (1e-6, 20e6)
NOISE_BANDWIDTH = 100e6
USER_RANGE = (1e-6, 15e6)
DUALT_RANGE = (1e-6, 20e6)
PRBS_RANGE = (2e3, 40e6)
SEQ_RANGE = (2e3, 60e6)

336
src/fn_gen/dg2052.py Normal file
View File

@@ -0,0 +1,336 @@
### PLEASE DO NOT MIND THE FORMATTING, IT IS DONE AUTOMATICALLY BY 'BLACK' THE PYTHON FORMATTER
import logging
from typing import Literal
import pyvisa
from pyvisa import ResourceManager
from .common import check_bounds
from .constants.dg2052 import (SIN_RANGE, SQU_RANGE, RAMP_RANGE)
from .enums import CommMethod, SweepSpacing, SweepTriggerSlope, SweepTriggerSource, SweepSignalType
from .errors import UndefinedValueError, UndefinedCommunicationMethodError, ValueOutOfBoundsError
class DG2052(pyvisa.resources.MessageBasedResource):
"""
This is an object representing the Rigol DG2052 function generator. This object uses the SCPI protocol for communicating with the Rigol DG2052 function generator.
Attributes
----------
port : str
The SCPI port describing the device, consists of a communication method and device port followed by the "::INSTR" keyword.
communication method: can be either USB or TCPIP (other communication methods are not supported for this device)
device port: either COMM4 or /dev/USB0 for USB in windows and posix systems respectively or the IP Address for TCPIP
format: "<communication method>::<device port>::INSTR"
example: "TCPI::192.168.1.11::INSTR" or "USB::COMM4::INSTR"
rm : ResourceManager
The resource manager object for pyvisa (for future use)
comm : CommMethod
The communication method used (either TCPIP or USB)
"""
comm: CommMethod # The communication method used (either TCPIP or USB)
rm: ResourceManager # The resource manager object for pyvisa (for future use)
port: str # The str used for the port
def __init__(self, port: str): # Class initialization method
"""
Initializes the DG2052 object
Parameters
--------
port : str
The SCPI port describing the device, consists of a communication method and device port followed by the "::INSTR" keyword.
communication method: can be either `USB` or `TCPIP` (other communication methods are not supported for this device)
device port: either `COMM4` or `/dev/USB0` for USB in windows and posix systems respectively or the IP Address for TCPIP
format: "`communication method`::`device port`::INSTR"
example: "`TCPI`::`192.168.1.11`::INSTR" or "`USB`::`COMM4`::INSTR"
"""
if "TCPIP" in port: # Check if port starts with TCPIP
logging.debug("(PROG) detected TCPIP port")
self.comm = CommMethod.LAN # Set comm to LAN
elif "USB" in port: # Check if port starts with USB
logging.debug("(PROG) detected USB port")
self.comm = CommMethod.USB # Set comm to USB
else: # Rause Undefined Communication Method Error
raise UndefinedCommunicationMethodError(port)
rm = ResourceManager() # Create a pyvisa.ResourceManager object
self.rm = rm # Save that object as rm
self.port = port # Save the port string as port
super().__init__(rm, port) # create tne instrument object
logging.debug("(PROG) created dg2052 instance")
self.open() # connect to the instrument object (for ease of use)
logging.debug("(PROG) connected to dg2052 device")
def whoami(self) -> str:
"""
shows the identification of the connected instrument
Returns
-------
str
The identification of the connected instrument
"""
match (
self.comm
): # Return an Identification string depending on the communication method
# Here a match case is used to make it easy to extend the communication methods to other methods
case CommMethod.LAN: # if the communication method is LAN
logging.debug("(PROG) communication method: LAN")
(
manufacturer,
model,
serial,
software_ver,
) = tuple( # Acquire the data for the manufacturer, model, serial and software version from the '*IDN?' SCPI query
self.query("*IDN?").strip().split(",")
)
ipaddr = self.query(
":SYST:COMM:LAN:IPAD?"
).strip() # Get the IPAddress of the device
mac = self.query(
":SYST:COMM:LAN:MAC?"
).strip() # Get the MAC address of the device
out = (
f"{manufacturer} {model}:\n\tSerial Nr.:"
+ f" {serial}\n\tSoftware Ver.:"
+ f" {software_ver}\n\tPort:"
+ f" {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}"
)
return out # return the formatted string
case CommMethod.USB: # if the communication method is USB
logging.debug("(PROG) communication method USB")
(
manufacturer,
model,
serial,
software_ver,
) = tuple( # Acquire the data for the manufacturer, model, serial and software version from the '*IDN?' SCPI query
self.query("*IDN?").strip().split(",")
)
# info = self.system.communicate.usb.information().strip()
info = self.query(
":SYST:COMM:USB:INF?"
).strip() # Get the USB info of the device
out = (
f"{manufacturer} {model}:\n\tSerial Nr.:"
+ f" {serial}\n\tSoftware Ver.:"
+ f" {software_ver}\n\tPort:"
+ f" {self.port}\n\tINFORMATION: {info}"
)
return out # return the formatted string
case _: # default case raise Undefined Communication Method Error
raise UndefinedCommunicationMethodError(self.port)
def set_output(self, channel: Literal[1, 2], state: bool):
"""
Sets the output channel ON or OFF
Parameters
----------
channel : Literal[1, 2]
The output channel of the device (either 1 or 2)
state : bool
The state of the output channel
"""
if state:
logging.debug(f"(PROG) :OUTP{channel} ON")
self.write(f":OUTP{channel} ON")
else:
logging.debug(f"(PROG) :OUTP{channel} OFF")
self.write(f":OUTP{channel} OFF")
def toggle_output(self, channel: Literal[1, 2]):
state = self.query(f":OUT{channel}?").strip()
logging.debug(f"(PROG) output {channel} state: {state}")
match (state):
case "ON":
self.set_output(channel, False)
case "OFF":
self.set_output(channel, True)
case _:
raise UndefinedValueError(state, "ON or OFF")
def get_output_volt_limits(self, channel: Literal[1, 2]) -> tuple[float, float]:
"""
Gets the output volt limits of the specified channel
Parameters
-------
channel: Literal[1, 2]
The output channel of the device (either 1 or 2)
Returns
-------
tuple[float, float]
The low voltage limit and the hight voltage limit
"""
low: float = float(self.query(f":OUTP{channel}:VOLL:LOW?"))
high: float = float(self.query(f":OUTP{channel}:VOLL:HIGH?"))
logging.debug(f"(PROG) output {channel} limits: {low}, {high}")
return low, high
def get_output_impedance(self, channel: Literal[1, 2]) -> float:
impedance = float(self.query(f":OUTP{channel}:IMP?"))
logging.debug(f"(PROG) output {channel} impedance: {impedance}")
return impedance
def get_output_load(self, channel: Literal[1, 2]) -> float:
load = float(self.query(f":OUTP{channel}:LOAD?"))
logging.debug(f"(PROG) output {channel} load: {load}")
return load
def get_output_signal(self, channel: Literal[1, 2]) -> str:
signal = self.query(f":SOUR{channel}:APPL?").strip()
logging.debug(f"(PROG) output {channel} signal: {signal}")
return signal
def get_output_state(self, channel: Literal[1, 2]) -> str:
state = self.query(f":OUTP{channel}?").strip()
logging.debug(f"(PROG) output {channel} state: {state}")
return state
def is_output_on(self, channel: Literal[1, 2]) -> bool:
channel_state = self.get_output_state(channel)
match channel_state:
case "ON":
return True
case "OFF":
return False
case _:
raise UndefinedValueError(channel_state, "ON or OFF")
def set_dc(self, channel: Literal[1, 2], offset: float):
logging.debug(f"(PROG) set dc signal with offset: {offset}")
self.write(f":SOUR{channel}:APPL:DC 1,1,{offset}")
def set_sine_wave(
self,
channel: Literal[1, 2],
freq: float = 1e3,
amp: float = 5.0,
offset: float = 0.0,
phase: int = 0,
):
if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]:
raise ValueOutOfBoundsError(SIN_RANGE, freq)
if phase < 0 and phase > 360:
raise ValueOutOfBoundsError((0, 360), phase)
logging.debug(
f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}"
)
self.write(f":SOUR{channel}:APPL:SIN {freq},{amp},{offset},{phase}")
def set_frequency(self, channel: Literal[1, 2], freq):
logging.debug(
f"(PROG) set wave frequency to {freq} Hz."
)
self.write(f":SOUR:FREQ {freq}")
def set_square_wave(
self,
channel: Literal[1, 2], # Sets the output channel of the ramp function
freq: float = 1e3, # Sets the frequency
amp: float = 5.0, # Sets the amplitude
offset: float = 0.0, # Sets the amplitude offset
phase: int = 0, # Sets the phase shift
):
check_bounds(SQU_RANGE, freq)
check_bounds((0, 360), phase)
logging.debug(
f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}"
)
self.write(f":SOUR{channel}:APPL:SQU {freq},{amp},{offset},{phase}")
def set_ramp(
self,
channel: Literal[1, 2], # Sets the output channel of the ramp function
freq: float = 1e3, # Sets the frequency
amp: float = 5, # Sets the amplitude
offset: float = 0, # Sets the amplitude offset
phase: int = 0, # Sets the phase shift
):
check_bounds(RAMP_RANGE, freq)
check_bounds((0, 360), phase)
logging.debug(
f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}"
)
self.write(f":SOUR{channel}:APPL:RAMP {freq},{amp},{offset},{phase}")
def set_sweep(
self,
channel: Literal[1, 2], # Sets the output channel of the sweep function
amp: float = 5, # Sets the amplitude of the sweeped signal
offset: float = 0, # Sets the offset voltage of the sweeped signal
phase: int = 0, # Sets the phase shift of the sweeped signal
signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped
htime_start: float = 0, # Sets the start hold time of the sweep function
htime_stop: float = 0, # Sets the stop hold time of the sweep function
freq_start: float = 100, # Sets the sweep starting frequency
freq_stop: float = 1e3, # Sets the sweep stopping frequency
marker: bool = False, # Enables/Disables setting the marker frequency manually
freq_marker: float = 550, # Sets the marker frequency at whic the Sync signal changes from high to low
rtime: float = 0, # Sets the return time of the sweep function
time: float = 1, # Sets the sweep time
spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type
step: int = 2, # Sets the number of steps of the sweep function
trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only)
trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL, # Sets the sweep trigger source
):
time_bounds: tuple[float, float] = (0, 500)
command_header = f":SOUR{channel}:SWE"
check_bounds(time_bounds, htime_start)
check_bounds(time_bounds, htime_stop)
check_bounds(time_bounds, rtime)
check_bounds((2, 1024), step)
check_bounds((1e-3, 599.0), time)
match signal_type:
case SweepSignalType.SINE:
self.set_sine_wave(channel, amp=amp, offset=offset, phase=phase)
case SweepSignalType.SQUARE:
self.set_square_wave(channel, amp=amp, offset=offset, phase=phase)
case SweepSignalType.RAMP:
self.set_ramp(channel, amp=amp, offset=offset, phase=phase)
self.write(f":SOUR:FREQ:STAR {freq_start}")
self.write(f":SOUR:FREQ:STOP {freq_stop}")
if marker:
self.write(":SOUR:MARK ON")
self.write(f":SOUR:MARK:FREQ {freq_marker}")
else:
self.write(":SOUR:MARK OFF")
self.write(f"{command_header}:SPAC {spacing}")
self.write(f"{command_header}:STEP {step}")
match trigger_source:
case SweepTriggerSource.INTERNAL:
self.write(f"{command_header}:TRIG:SOUR INT")
self.write(f"{command_header}:HTIM:STAR {htime_start}")
self.write(f"{command_header}:HTIM {htime_stop}")
self.write(f"{command_header}:RTIM {rtime}")
self.write(f"{command_header}:TIME {time}")
case SweepTriggerSource.EXTERNAL:
self.write(f"{command_header}:TRIG:SOUR EXT")
if trigger_slope == SweepTriggerSlope.POSITIVE:
self.write(f"{command_header}:TRIG:SLOP POS")
elif trigger_slope == SweepTriggerSlope.NEGATIVE:
self.write(f"{command_header}:TRIG:SLOP NEG")
else:
UndefinedValueError(
trigger_slope,
"SweepTriggerSlope.Positive or SweepTriggerSlope.Negative",
)
case SweepTriggerSource.MANUAL:
self.write(f"{command_header}:TRIG:SOUR MAN")
case _:
UndefinedValueError(
trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]"
)
self.write(f"{command_header}:STAT ON")
def trigger_sweep(self, channel: Literal[1, 2]):
self.write(f":SOUR{channel}:SWE:TRIG:IMM")

View File

@@ -0,0 +1,5 @@
from .comm_method import CommMethod
from .sweep_spacing import SweepSpacing
from .sweep_trigger_slope import SweepTriggerSlope
from .sweep_trigger_source import SweepTriggerSource
from .sweep_signal_type import SweepSignalType

View File

@@ -0,0 +1,11 @@
from enum import Enum
class CommMethod(Enum):
"""The communication method accepted by the function generator
"""
USB = 0
"""For USB devices
"""
LAN = 1
"""For LAN devices
"""

View File

@@ -0,0 +1,14 @@
from enum import Enum
class SweepSignalType(Enum):
"""The type of signal used in the sweep function
"""
SINE = 1
"""The Sine function sweep
"""
SQUARE = 2
"""The Square function sweep
"""
RAMP = 3
"""The Ramp function sweep
"""

View File

@@ -0,0 +1,14 @@
from enum import Enum
class SweepSpacing(Enum):
"""The spacing used in the sweep function
"""
LIN = 1
"""A linear sweep
"""
LOG = 2
"""A logarithmic sweep
"""
STEP = 3
"""A stepwise sweep
"""

View File

@@ -0,0 +1,11 @@
from enum import Enum
class SweepTriggerSlope(Enum):
"""The type of slope used for the external sweep trigger
"""
POSITIVE = 1
"""This option triggers the sweep on the positive edge
"""
NEGATIVE = 2
"""Ths option triggers the sweep on the negative edge
"""

View File

@@ -0,0 +1,10 @@
from enum import Enum
class SweepTriggerSource(Enum):
"""The Source of the trigger for the sweep"""
INTERNAL = 1
"""An Internal trigger signal, defined by htime_start, htime_stop"""
EXTERNAL = 2
"""An external trigger signal"""
MANUAL = 3
"""A manual trigger set in code by the trigger function"""

View File

@@ -0,0 +1,3 @@
from .undefined_communication_method_error import UndefinedCommunicationMethodError
from .value_out_of_bounds_error import ValueOutOfBoundsError
from .undefined_value_error import UndefinedValueError

View File

@@ -0,0 +1,5 @@
class UndefinedCommunicationMethodError(Exception):
def __init__(self, port: str):
method = port.split( "::" )[0]
super().__init__( f"ERROR: Undefined Communication Exception, Method \"{method}\" is not recognized in Port \"{port}\"." )

View File

@@ -0,0 +1,4 @@
class UndefinedValueError(Exception):
def __init__(self, value: str, expected: str):
super().__init__( f"ERROR: Undefined Value, expected: {expected}, value: {value}." )

View File

@@ -0,0 +1,4 @@
class ValueOutOfBoundsError(Exception):
def __init__(self, bounds: tuple[float, float] | tuple[int, int], value: float | int):
super().__init__( f"ERROR: Value out of expected bounds, Min: {bounds[0]}, Max: {bounds[1]}, Value: {value}." )