diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5691af9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +venv/** +venv +dist/** +dist +fn_gen.egg-info/** +fn_gen.egg-info/ +activate diff --git a/DG2000_ProgrammingGuide_EN.pdf b/DG2000_ProgrammingGuide_EN.pdf new file mode 100644 index 0000000..45fff33 Binary files /dev/null and b/DG2000_ProgrammingGuide_EN.pdf differ diff --git a/DG2000_UserGuide_EN.pdf b/DG2000_UserGuide_EN.pdf new file mode 100644 index 0000000..e119727 Binary files /dev/null and b/DG2000_UserGuide_EN.pdf differ diff --git a/fn_gen/__init__.py b/fn_gen/__init__.py new file mode 100644 index 0000000..ba9bc03 --- /dev/null +++ b/fn_gen/__init__.py @@ -0,0 +1,3 @@ +from .dg2052 import DG2052 +from .enums import * +from .errors import * diff --git a/fn_gen/__pycache__/common.cpython-311.pyc b/fn_gen/__pycache__/common.cpython-311.pyc new file mode 100644 index 0000000..3845f8c Binary files /dev/null and b/fn_gen/__pycache__/common.cpython-311.pyc differ diff --git a/fn_gen/__pycache__/enums.cpython-311.pyc b/fn_gen/__pycache__/enums.cpython-311.pyc new file mode 100644 index 0000000..08a7ea6 Binary files /dev/null and b/fn_gen/__pycache__/enums.cpython-311.pyc differ diff --git a/fn_gen/__pycache__/errors.cpython-311.pyc b/fn_gen/__pycache__/errors.cpython-311.pyc new file mode 100644 index 0000000..ac0c310 Binary files /dev/null and b/fn_gen/__pycache__/errors.cpython-311.pyc differ diff --git a/fn_gen/common.py b/fn_gen/common.py new file mode 100644 index 0000000..1b729c0 --- /dev/null +++ b/fn_gen/common.py @@ -0,0 +1,5 @@ +from .errors import ValueOutOfBoundsError + +def check_bounds(bounds: tuple[float, float] | tuple[int, int], value: float | int): + if value < bounds[0] or value > bounds[1]: + raise ValueOutOfBoundsError(bounds, value) diff --git a/fn_gen/constants/dg2052/__init__.py b/fn_gen/constants/dg2052/__init__.py new file mode 100644 index 0000000..e2978db --- /dev/null +++ b/fn_gen/constants/dg2052/__init__.py @@ -0,0 +1,10 @@ +SIN_RANGE = (1e-6, 50e6) +SQU_RANGE = (1e-6, 15e6) +RAMP_RANGE = (1e-6, 1.5e6) +PULSE_RANGE = (1e-6, 20e6) +HARM_RANGE = (1e-6, 20e6) +NOISE_BANDWIDTH = 100e6 +USER_RANGE = (1e-6, 15e6) +DUALT_RANGE = (1e-6, 20e6) +PRBS_RANGE = (2e3, 40e6) +SEQ_RANGE = (2e3, 60e6) diff --git a/fn_gen/constants/dg2052/__pycache__/__init__.cpython-311.pyc b/fn_gen/constants/dg2052/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..3d5fa00 Binary files /dev/null and b/fn_gen/constants/dg2052/__pycache__/__init__.cpython-311.pyc differ diff --git a/fn_gen/dg2052.py b/fn_gen/dg2052.py new file mode 100644 index 0000000..9a1584c --- /dev/null +++ b/fn_gen/dg2052.py @@ -0,0 +1,191 @@ +from re import DEBUG +import time +import logging +import pyvisa +from .errors import * +from .enums import * +from .common import * +from .constants.dg2052 import * + +class DG2052( pyvisa.resources.MessageBasedResource ): + comm: CommMethod + rm: pyvisa.ResourceManager + port: str + def __init__( self, port: str ): + if "TCPIP" in port: + logging.debug("(PROG) detected TCPIP port") + self.comm = CommMethod.LAN + elif "USB" in port: + logging.debug("(PROG) detected USB port") + self.comm = CommMethod.USB + else: + raise UndefinedCommunicationMethodError(port) + rm = pyvisa.ResourceManager() + self.rm = rm + self.port = port + super().__init__(rm, port) + logging.debug("(PROG) created dg2052 instance") + self.open() + logging.debug("(PROG) connected to dg2052 device") + + def whoami( self ) -> str: + match(self.comm): + case CommMethod.LAN: + logging.debug("(PROG) communication method: LAN") + manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(',')) + # ipaddr = self.system.communicate.lan.ipaddress().strip() + ipaddr = self.query(":SYST:COMM:LAN:IPAD?").strip() + # mac = self.system.communicate.lan.mac().strip() + mac = self.query(":SYST:COMM:LAN:MAC?").strip() + return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}" + case CommMethod.USB: + logging.debug("(PROG) communication method USB") + manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(',')) + # info = self.system.communicate.usb.information().strip() + info = self.query(":SYST:COMM:USB:INF?").strip() + return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tINFORMATION: {info}" + case _: + raise UndefinedCommunicationMethodError(self.port) + + def set_output(self, channel: OutputChannel, state: bool): + if state: + logging.debug( f"(PROG) :OUTP{channel.value} ON" ) + self.write( f':OUTP{channel.value} ON' ) + else: + logging.debug( f"(PROG) :OUTP{channel.value} OFF" ) + self.write( f':OUTP{channel.value} OFF' ) + + def toggle_output(self, channel: OutputChannel): + state = self.query( f':OUT{channel.value}?' ).strip() + logging.debug(f"(PROG) output {channel.value} state: {state}") + match(state): + case "ON": + self.set_output(channel, False) + case "OFF": + self.set_output(channel, True) + case _: + raise UndefinedValueError(state, "ON or OFF") + + def get_output_volt_limits(self, channel: OutputChannel) -> tuple[float, float]: + low: float = float(self.query( f':OUTP{channel.value}:VOLL:LOW?' )) + high: float = float(self.query( f':OUTP{channel.value}:VOLL:HIGH?' )) + logging.debug(f"(PROG) output {channel.value} limits: {low}, {high}") + return low, high + + def get_output_impedance(self, channel: OutputChannel) -> float: + impedance = float(self.query( f':OUTP{channel.value}:IMP?' )) + logging.debug(f"(PROG) output {channel.value} impedance: {impedance}") + return impedance + + def get_output_load(self, channel: OutputChannel) -> float: + load = float(self.query( f':OUTP{channel.value}:LOAD?' )) + logging.debug(f"(PROG) output {channel.value} load: {load}") + return load + + def get_output_signal(self, channel: OutputChannel) -> str: + signal = self.query( f':SOUR{channel.value}:APPL?' ).strip() + logging.debug(f"(PROG) output {channel.value} signal: {signal}") + return signal + + def get_output_state(self, channel: OutputChannel) -> str: + state = self.query( f':OUTP{channel.value}?' ).strip() + logging.debug(f"(PROG) output {channel.value} state: {state}") + return state + + def set_dc(self, channel: OutputChannel, offset: float): + logging.debug(f"(PROG) set dc signal with offset: {offset}") + self.write( f':SOUR{channel.value}:APPL:DC 1,1,{offset}' ) + + def set_sine_wave(self, channel: OutputChannel, freq: float = 1e3, amp: float = 5.0, offset: float = 0.0, phase: int = 0): + if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]: + raise ValueOutOfBoundsError(SIN_RANGE, freq) + if phase < 0 and phase > 360: + raise ValueOutOfBoundsError((0, 360), phase) + logging.debug(f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") + self.write( f':SOUR{channel.value}:APPL:SIN {freq},{amp},{offset},{phase}' ) + + def set_square_wave( + self, + channel: OutputChannel, # Sets the output channel of the ramp function + freq: float = 1e3, # Sets the frequency + amp: float = 5.0, # Sets the amplitude + offset: float = 0.0, # Sets the amplitude offset + phase: int = 0 # Sets the phase shift + ): + check_bounds(SQU_RANGE, freq) + check_bounds((0, 360), phase) + logging.debug(f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") + self.write( f':SOUR{channel.value}:APPL:SQU {freq},{amp},{offset},{phase}' ) + + def set_ramp( + self, + channel: OutputChannel, # Sets the output channel of the ramp function + freq: float = 1e3, # Sets the frequency + amp: float = 5, # Sets the amplitude + offset: float = 0, # Sets the amplitude offset + phase: int = 0 # Sets the phase shift + ): + check_bounds(RAMP_RANGE, freq) + check_bounds((0, 360), phase) + logging.debug(f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") + self.write( f':SOUR{channel.value}:APPL:RAMP {freq},{amp},{offset},{phase}' ) + + def set_sweep( + self, + channel: OutputChannel, # Sets the output channel of the sweep function + signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped + htime_start: float = 0, # Sets the start hold time of the sweep function + htime_stop: float = 0, # Sets the stop hold time of the sweep function + rtime: float = 0, # Sets the return time of the sweep function + time: float = 1, # Sets the sweep time + spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type + step: int = 2, # Sets the number of steps of the sweep function + trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only) + trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL # Sets the sweep trigger source + ): + time_bounds: tuple[float, float] = (0, 500) + command_header = f":SOUR{channel.value}:SWE" + check_bounds(time_bounds, htime_start) + check_bounds(time_bounds, htime_stop) + check_bounds(time_bounds, rtime) + check_bounds((2, 1024), step) + check_bounds((1e-3, 599.0), time) + self.write( f"{command_header}:SPAC {spacing}" ) + self.write( f"{command_header}:STEP {step}" ) + match trigger_source: + case SweepTriggerSource.INTERNAL: + self.write( f"{command_header}:TRIG:SOUR INT" ) + self.write( f"{command_header}:HTIM:STAR {htime_start}" ) + self.write( f"{command_header}:HTIM {htime_stop}" ) + self.write( f"{command_header}:RTIM {rtime}" ) + self.write( f"{command_header}:TIME {time}" ) + case SweepTriggerSource.EXTERNAL: + self.write( f"{command_header}:TRIG:SOUR EXT" ) + if trigger_slope == SweepTriggerSlope.POSITIVE: + self.write( f"{command_header}:TRIG:SLOP POS" ) + elif trigger_slope == SweepTriggerSlope.NEGATIVE: + self.write( f"{command_header}:TRIG:SLOP NEG" ) + else: + UndefinedValueError(trigger_slope, "SweepTriggerSlope.Positive or SweepTriggerSlope.Negative") + case SweepTriggerSource.MANUAL: + self.write( f"{command_header}:TRIG:SOUR MAN" ) + case _: + UndefinedValueError(trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]") + self.write( f"{command_header}:STAT ON" ) + + def trigger_sweep(self, channel: OutputChannel): + self.write( f":SOUR{channel.value}:SWE:TRIG:IMM" ) + + # def set_pulse(self, channel: OutputChannel, duty_cycle: float, transition_leading: float, transition_trailing: float, pulse_width: float): + # transition_bounds = (8e-9, 0.625*pulse_width) + # duty_cycle_bounds = (0.001, 99.999) + # pulse_width_bounds = (16e-9, 999.999e3) + # command_header = f":SOUR{channel.value}:PULS" + # check_bounds(duty_cycle_bounds, duty_cycle) + # check_bounds(transition_bounds, transition_leading) + # check_bounds(transition_bounds, transition_trailing) + # check_bounds(pulse_width_bounds, pulse_width) + # self.write( f"{command_header}:WIDT {pulse_width}" ) + # self.write( f"{command_header}:DCYC {duty_cycle}" ) + # self.write( f"{command_header}:TRAN:LEAD {transition_leading}" ) + # self.write( f"{command_header}:TRAN:TRA {transition_trailing}" ) diff --git a/fn_gen/enums/__init__.py b/fn_gen/enums/__init__.py new file mode 100644 index 0000000..9fbef5e --- /dev/null +++ b/fn_gen/enums/__init__.py @@ -0,0 +1,6 @@ +from .comm_method import CommMethod +from .output_channel import OutputChannel +from .sweep_spacing import SweepSpacing +from .sweep_trigger_slope import SweepTriggerSlope +from .sweep_trigger_source import SweepTriggerSource +from .sweep_signal_type import SweepSignalType diff --git a/fn_gen/enums/comm_method.py b/fn_gen/enums/comm_method.py new file mode 100644 index 0000000..1444a9f --- /dev/null +++ b/fn_gen/enums/comm_method.py @@ -0,0 +1,5 @@ +from enum import Enum + +class CommMethod(Enum): + USB = 0 + LAN = 1 diff --git a/fn_gen/enums/output_channel.py b/fn_gen/enums/output_channel.py new file mode 100644 index 0000000..395f75b --- /dev/null +++ b/fn_gen/enums/output_channel.py @@ -0,0 +1,5 @@ +from enum import Enum + +class OutputChannel(Enum): + ONE = 1 + TWO = 2 diff --git a/fn_gen/enums/sweep_signal_type.py b/fn_gen/enums/sweep_signal_type.py new file mode 100644 index 0000000..3d76007 --- /dev/null +++ b/fn_gen/enums/sweep_signal_type.py @@ -0,0 +1,6 @@ +from enum import Enum + +class SweepSignalType(Enum): + SINE = 1 + SQUARE = 2 + RAMP = 3 diff --git a/fn_gen/enums/sweep_spacing.py b/fn_gen/enums/sweep_spacing.py new file mode 100644 index 0000000..e98cc56 --- /dev/null +++ b/fn_gen/enums/sweep_spacing.py @@ -0,0 +1,6 @@ +from enum import Enum + +class SweepSpacing(Enum): + LIN = 1 + LOG = 2 + STEP = 3 diff --git a/fn_gen/enums/sweep_trigger_slope.py b/fn_gen/enums/sweep_trigger_slope.py new file mode 100644 index 0000000..9265423 --- /dev/null +++ b/fn_gen/enums/sweep_trigger_slope.py @@ -0,0 +1,5 @@ +from enum import Enum + +class SweepTriggerSlope(Enum): + POSITIVE = 1 + NEGATIVE = 2 diff --git a/fn_gen/enums/sweep_trigger_source.py b/fn_gen/enums/sweep_trigger_source.py new file mode 100644 index 0000000..65109bf --- /dev/null +++ b/fn_gen/enums/sweep_trigger_source.py @@ -0,0 +1,6 @@ +from enum import Enum + +class SweepTriggerSource(Enum): + INTERNAL = 1 + EXTERNAL = 2 + MANUAL = 3 diff --git a/fn_gen/errors/__init__.py b/fn_gen/errors/__init__.py new file mode 100644 index 0000000..3b9a603 --- /dev/null +++ b/fn_gen/errors/__init__.py @@ -0,0 +1,3 @@ +from .undefined_communication_method_error import UndefinedCommunicationMethodError +from .value_out_of_bounds_error import ValueOutOfBoundsError +from .undefined_value_error import UndefinedValueError diff --git a/fn_gen/errors/undefined_communication_method_error.py b/fn_gen/errors/undefined_communication_method_error.py new file mode 100644 index 0000000..eacdc19 --- /dev/null +++ b/fn_gen/errors/undefined_communication_method_error.py @@ -0,0 +1,5 @@ +class UndefinedCommunicationMethodError(Exception): + def __init__(self, port: str): + method = port.split( "::" )[0] + super().__init__( f"ERROR: Undefined Communication Exception, Method \"{method}\" is not recognized in Port \"{port}\"." ) + diff --git a/fn_gen/errors/undefined_value_error.py b/fn_gen/errors/undefined_value_error.py new file mode 100644 index 0000000..f7b3af4 --- /dev/null +++ b/fn_gen/errors/undefined_value_error.py @@ -0,0 +1,4 @@ +class UndefinedValueError(Exception): + def __init__(self, value: str, expected: str): + super().__init__( f"ERROR: Undefined Value, expected: {expected}, value: {value}." ) + diff --git a/fn_gen/errors/value_out_of_bounds_error.py b/fn_gen/errors/value_out_of_bounds_error.py new file mode 100644 index 0000000..7093704 --- /dev/null +++ b/fn_gen/errors/value_out_of_bounds_error.py @@ -0,0 +1,4 @@ +class ValueOutOfBoundsError(Exception): + def __init__(self, bounds: tuple[float, float] | tuple[int, int], value: float | int): + super().__init__( f"ERROR: Value out of expected bounds, Min: {bounds[0]}, Max: {bounds[1]}, Value: {value}." ) + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..16095ea --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[project] +name = "fn_gen" +readme = "README.md" +description = "A library for usage with SCPI compliant function generators (for now the DG2000 series from rigol)" +requires-python = ">=3.11" +version = "0.0.1" +dependencies = [ + "easy-scpi==0.1.4", + "ifaddr==0.2.0", + "pip==23.2.1", + "psutil==5.9.7", + "PyVISA==1.14.1", + "PyVISA-py==0.7.1", + "setuptools==65.5.0", + "typing_extensions==4.9.0", + "zeroconf==0.131.0" +] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2f1215e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +easy-scpi==0.1.4 +ifaddr==0.2.0 +pip==23.2.1 +psutil==5.9.7 +PyVISA==1.14.1 +PyVISA-py==0.7.1 +setuptools==65.5.0 +typing_extensions==4.9.0 +zeroconf==0.131.0