From 40d6907dbb9cdb57aebed25dd3f8b9ef0458582c Mon Sep 17 00:00:00 2001 From: Ali Amr Ali Taha Elnwegy Date: Mon, 22 Jan 2024 11:09:01 +0100 Subject: [PATCH 1/3] Added PDM integration --- .gitignore | 6 ++++++ pyproject.toml | 39 +++++++++++++++++++++++---------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 21f0008..cefa04c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,9 @@ activate **/__pycache__ **/build **/build/** +pdm.lock +.pdm-python +rigol_dg2052_python_library/ +rigol_dg2052_python_library/** +rigol_dg2052_python_library.egg-info/ +rigol_dg2052_python_library.egg-info/** diff --git a/pyproject.toml b/pyproject.toml index c416729..521b441 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,24 +1,31 @@ -[build-system] -requires = ["setuptools"] -build-backend = "setuptools.build_meta" - [project] -name = "fn_gen" +name = "rigol-dg2052-python-library" readme = "README.md" description = "A library for usage with SCPI compliant function generators (for now the DG2000 series from rigol)" -requires-python = ">=3.11" +requires-python = "==3.11.*" version = "0.0.1" dependencies = [ - "easy-scpi==0.1.4", - "ifaddr==0.2.0", - "pip==23.2.1", - "psutil==5.9.7", - "PyVISA==1.14.1", - "PyVISA-py==0.7.1", - "setuptools==65.5.0", - "typing_extensions==4.9.0", - "zeroconf==0.131.0" + "easy-scpi==0.1.4", + "ifaddr==0.2.0", + "pip==23.2.1", + "psutil==5.9.7", + "PyVISA==1.14.1", + "PyVISA-py==0.7.1", + "setuptools==65.5.0", + "typing_extensions==4.9.0", + "zeroconf==0.131.0", + "typing-extensions==4.9.0", ] +authors = [ + {name = "Ali Nwegy", email = "ali.el-nwegy@tuhh.de"}, +] +license = {text = "MIT"} [tool.setuptools] -packages = ["fn_gen"] +packages = [ + "fn_gen", + "rigol_dg2052_python_library", +] + +[tool.pdm] +distribution = true From 718015c7d4864bb05bb1ff29da520d78da722c9e Mon Sep 17 00:00:00 2001 From: Ali Amr Ali Taha Elnwegy Date: Mon, 22 Jan 2024 11:14:55 +0100 Subject: [PATCH 2/3] fixed project name in 'pyproject.toml' --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 521b441..33e3ed2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [project] -name = "rigol-dg2052-python-library" +name = "fn_gen" readme = "README.md" description = "A library for usage with SCPI compliant function generators (for now the DG2000 series from rigol)" requires-python = "==3.11.*" @@ -24,7 +24,6 @@ license = {text = "MIT"} [tool.setuptools] packages = [ "fn_gen", - "rigol_dg2052_python_library", ] [tool.pdm] From ee5d585f01664255688748582a87f41dbc4af185 Mon Sep 17 00:00:00 2001 From: Ali Amr Ali Taha Elnwegy Date: Wed, 24 Jan 2024 15:48:07 +0100 Subject: [PATCH 3/3] fixed project sturcture, fixes imports --- fn_gen/dg2052.py | 198 ------------ pyproject.toml | 7 +- {fn_gen => src/fn_gen}/__init__.py | 0 {fn_gen => src/fn_gen}/common.py | 0 {fn_gen => src/fn_gen}/constants/dg2052.py | 0 src/fn_gen/dg2052.py | 295 ++++++++++++++++++ {fn_gen => src/fn_gen}/enums/__init__.py | 0 {fn_gen => src/fn_gen}/enums/comm_method.py | 0 .../fn_gen}/enums/output_channel.py | 0 .../fn_gen}/enums/sweep_signal_type.py | 0 {fn_gen => src/fn_gen}/enums/sweep_spacing.py | 0 .../fn_gen}/enums/sweep_trigger_slope.py | 0 .../fn_gen}/enums/sweep_trigger_source.py | 0 {fn_gen => src/fn_gen}/errors/__init__.py | 0 .../undefined_communication_method_error.py | 0 .../fn_gen}/errors/undefined_value_error.py | 0 .../errors/value_out_of_bounds_error.py | 0 17 files changed, 296 insertions(+), 204 deletions(-) delete mode 100644 fn_gen/dg2052.py rename {fn_gen => src/fn_gen}/__init__.py (100%) rename {fn_gen => src/fn_gen}/common.py (100%) rename {fn_gen => src/fn_gen}/constants/dg2052.py (100%) create mode 100644 src/fn_gen/dg2052.py rename {fn_gen => src/fn_gen}/enums/__init__.py (100%) rename {fn_gen => src/fn_gen}/enums/comm_method.py (100%) rename {fn_gen => src/fn_gen}/enums/output_channel.py (100%) rename {fn_gen => src/fn_gen}/enums/sweep_signal_type.py (100%) rename {fn_gen => src/fn_gen}/enums/sweep_spacing.py (100%) rename {fn_gen => src/fn_gen}/enums/sweep_trigger_slope.py (100%) rename {fn_gen => src/fn_gen}/enums/sweep_trigger_source.py (100%) rename {fn_gen => src/fn_gen}/errors/__init__.py (100%) rename {fn_gen => src/fn_gen}/errors/undefined_communication_method_error.py (100%) rename {fn_gen => src/fn_gen}/errors/undefined_value_error.py (100%) rename {fn_gen => src/fn_gen}/errors/value_out_of_bounds_error.py (100%) diff --git a/fn_gen/dg2052.py b/fn_gen/dg2052.py deleted file mode 100644 index f2a5c26..0000000 --- a/fn_gen/dg2052.py +++ /dev/null @@ -1,198 +0,0 @@ -from re import DEBUG -import time -import logging -import pyvisa -from .errors import * -from .enums import * -from .common import * -from .constants.dg2052 import * - -class DG2052( pyvisa.resources.MessageBasedResource ): - comm: CommMethod - rm: pyvisa.ResourceManager - port: str - def __init__( self, port: str ): - if "TCPIP" in port: - logging.debug("(PROG) detected TCPIP port") - self.comm = CommMethod.LAN - elif "USB" in port: - logging.debug("(PROG) detected USB port") - self.comm = CommMethod.USB - else: - raise UndefinedCommunicationMethodError(port) - rm = pyvisa.ResourceManager() - self.rm = rm - self.port = port - super().__init__(rm, port) - logging.debug("(PROG) created dg2052 instance") - self.open() - logging.debug("(PROG) connected to dg2052 device") - - def whoami( self ) -> str: - match(self.comm): - case CommMethod.LAN: - logging.debug("(PROG) communication method: LAN") - manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(',')) - # ipaddr = self.system.communicate.lan.ipaddress().strip() - ipaddr = self.query(":SYST:COMM:LAN:IPAD?").strip() - # mac = self.system.communicate.lan.mac().strip() - mac = self.query(":SYST:COMM:LAN:MAC?").strip() - return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}" - case CommMethod.USB: - logging.debug("(PROG) communication method USB") - manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(',')) - # info = self.system.communicate.usb.information().strip() - info = self.query(":SYST:COMM:USB:INF?").strip() - return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tINFORMATION: {info}" - case _: - raise UndefinedCommunicationMethodError(self.port) - - def set_output(self, channel: OutputChannel, state: bool): - if state: - logging.debug( f"(PROG) :OUTP{channel.value} ON" ) - self.write( f':OUTP{channel.value} ON' ) - else: - logging.debug( f"(PROG) :OUTP{channel.value} OFF" ) - self.write( f':OUTP{channel.value} OFF' ) - - def toggle_output(self, channel: OutputChannel): - state = self.query( f':OUT{channel.value}?' ).strip() - logging.debug(f"(PROG) output {channel.value} state: {state}") - match(state): - case "ON": - self.set_output(channel, False) - case "OFF": - self.set_output(channel, True) - case _: - raise UndefinedValueError(state, "ON or OFF") - - def get_output_volt_limits(self, channel: OutputChannel) -> tuple[float, float]: - low: float = float(self.query( f':OUTP{channel.value}:VOLL:LOW?' )) - high: float = float(self.query( f':OUTP{channel.value}:VOLL:HIGH?' )) - logging.debug(f"(PROG) output {channel.value} limits: {low}, {high}") - return low, high - - def get_output_impedance(self, channel: OutputChannel) -> float: - impedance = float(self.query( f':OUTP{channel.value}:IMP?' )) - logging.debug(f"(PROG) output {channel.value} impedance: {impedance}") - return impedance - - def get_output_load(self, channel: OutputChannel) -> float: - load = float(self.query( f':OUTP{channel.value}:LOAD?' )) - logging.debug(f"(PROG) output {channel.value} load: {load}") - return load - - def get_output_signal(self, channel: OutputChannel) -> str: - signal = self.query( f':SOUR{channel.value}:APPL?' ).strip() - logging.debug(f"(PROG) output {channel.value} signal: {signal}") - return signal - - def get_output_state(self, channel: OutputChannel) -> str: - state = self.query( f':OUTP{channel.value}?' ).strip() - logging.debug(f"(PROG) output {channel.value} state: {state}") - return state - - def set_dc(self, channel: OutputChannel, offset: float): - logging.debug(f"(PROG) set dc signal with offset: {offset}") - self.write( f':SOUR{channel.value}:APPL:DC 1,1,{offset}' ) - - def set_sine_wave(self, channel: OutputChannel, freq: float = 1e3, amp: float = 5.0, offset: float = 0.0, phase: int = 0): - if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]: - raise ValueOutOfBoundsError(SIN_RANGE, freq) - if phase < 0 and phase > 360: - raise ValueOutOfBoundsError((0, 360), phase) - logging.debug(f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") - self.write( f':SOUR{channel.value}:APPL:SIN {freq},{amp},{offset},{phase}' ) - - def set_square_wave( - self, - channel: OutputChannel, # Sets the output channel of the ramp function - freq: float = 1e3, # Sets the frequency - amp: float = 5.0, # Sets the amplitude - offset: float = 0.0, # Sets the amplitude offset - phase: int = 0 # Sets the phase shift - ): - check_bounds(SQU_RANGE, freq) - check_bounds((0, 360), phase) - logging.debug(f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") - self.write( f':SOUR{channel.value}:APPL:SQU {freq},{amp},{offset},{phase}' ) - - def set_ramp( - self, - channel: OutputChannel, # Sets the output channel of the ramp function - freq: float = 1e3, # Sets the frequency - amp: float = 5, # Sets the amplitude - offset: float = 0, # Sets the amplitude offset - phase: int = 0 # Sets the phase shift - ): - check_bounds(RAMP_RANGE, freq) - check_bounds((0, 360), phase) - logging.debug(f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") - self.write( f':SOUR{channel.value}:APPL:RAMP {freq},{amp},{offset},{phase}' ) - - def set_sweep( - self, - channel: OutputChannel, # Sets the output channel of the sweep function - amp: float = 5, # Sets the amplitude of the sweeped signal - offset: float = 0, # Sets the offset voltage of the sweeped signal - phase: int = 0, # Sets the phase shift of the sweeped signal - signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped - htime_start: float = 0, # Sets the start hold time of the sweep function - htime_stop: float = 0, # Sets the stop hold time of the sweep function - freq_start: float = 100, # Sets the sweep starting frequency - freq_stop: float = 1e3, # Sets the sweep stopping frequency - marker: bool = False, # Enables/Disables setting the marker frequency manually - freq_marker: float = 550, # Sets the marker frequency at whic the Sync signal changes from high to low - rtime: float = 0, # Sets the return time of the sweep function - time: float = 1, # Sets the sweep time - spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type - step: int = 2, # Sets the number of steps of the sweep function - trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only) - trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL # Sets the sweep trigger source - ): - time_bounds: tuple[float, float] = (0, 500) - command_header = f":SOUR{channel.value}:SWE" - check_bounds(time_bounds, htime_start) - check_bounds(time_bounds, htime_stop) - check_bounds(time_bounds, rtime) - check_bounds((2, 1024), step) - check_bounds((1e-3, 599.0), time) - match signal_type: - case SweepSignalType.SINE: - self.set_sine_wave(channel, amp=amp, offset=offset, phase=phase) - case SweepSignalType.SQUARE: - self.set_square_wave(channel, amp=amp, offset=offset, phase=phase) - case SweepSignalType.RAMP: - self.set_ramp(channel, amp=amp, offset=offset, phase=phase) - self.write( f":SOUR:FREQ:STAR {freq_start}" ) - self.write( f":SOUR:FREQ:STOP {freq_stop}" ) - if marker: - self.write( f":SOUR:MARK ON" ) - self.write( f":SOUR:MARK:FREQ {freq_marker}" ) - else: - self.write( f":SOUR:MARK OFF" ) - self.write( f"{command_header}:SPAC {spacing}" ) - self.write( f"{command_header}:STEP {step}" ) - match trigger_source: - case SweepTriggerSource.INTERNAL: - self.write( f"{command_header}:TRIG:SOUR INT" ) - self.write( f"{command_header}:HTIM:STAR {htime_start}" ) - self.write( f"{command_header}:HTIM {htime_stop}" ) - self.write( f"{command_header}:RTIM {rtime}" ) - self.write( f"{command_header}:TIME {time}" ) - case SweepTriggerSource.EXTERNAL: - self.write( f"{command_header}:TRIG:SOUR EXT" ) - if trigger_slope == SweepTriggerSlope.POSITIVE: - self.write( f"{command_header}:TRIG:SLOP POS" ) - elif trigger_slope == SweepTriggerSlope.NEGATIVE: - self.write( f"{command_header}:TRIG:SLOP NEG" ) - else: - UndefinedValueError(trigger_slope, "SweepTriggerSlope.Positive or SweepTriggerSlope.Negative") - case SweepTriggerSource.MANUAL: - self.write( f"{command_header}:TRIG:SOUR MAN" ) - case _: - UndefinedValueError(trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]") - self.write( f"{command_header}:STAT ON" ) - - def trigger_sweep(self, channel: OutputChannel): - self.write( f":SOUR{channel.value}:SWE:TRIG:IMM" ) diff --git a/pyproject.toml b/pyproject.toml index 33e3ed2..1185b91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "fn_gen" readme = "README.md" description = "A library for usage with SCPI compliant function generators (for now the DG2000 series from rigol)" requires-python = "==3.11.*" -version = "0.0.1" +version = "0.0.2" dependencies = [ "easy-scpi==0.1.4", "ifaddr==0.2.0", @@ -21,10 +21,5 @@ authors = [ ] license = {text = "MIT"} -[tool.setuptools] -packages = [ - "fn_gen", -] - [tool.pdm] distribution = true diff --git a/fn_gen/__init__.py b/src/fn_gen/__init__.py similarity index 100% rename from fn_gen/__init__.py rename to src/fn_gen/__init__.py diff --git a/fn_gen/common.py b/src/fn_gen/common.py similarity index 100% rename from fn_gen/common.py rename to src/fn_gen/common.py diff --git a/fn_gen/constants/dg2052.py b/src/fn_gen/constants/dg2052.py similarity index 100% rename from fn_gen/constants/dg2052.py rename to src/fn_gen/constants/dg2052.py diff --git a/src/fn_gen/dg2052.py b/src/fn_gen/dg2052.py new file mode 100644 index 0000000..f87f9fa --- /dev/null +++ b/src/fn_gen/dg2052.py @@ -0,0 +1,295 @@ +### PLEASE DO NOT MIND THE FORMATTING, IT IS DONE AUTOMATICALLY BY 'BLACK' THE PYTHON FORMATTER + +from re import DEBUG +import time +import logging +import pyvisa +from .errors import * +from .enums import * +from .common import * +from .constants.dg2052 import * + + +class DG2052(pyvisa.resources.MessageBasedResource): + """ + This is an object representing the Rigol DG2052 function generator. This object uses the SCPI protocol for communicating with the Rigol DG2052 function generator. + + Parameters + ---------- + port : str + The SCPI port describing the device, consists of a communication method and device port followed by the "::INSTR" keyword. + communication method: can be either USB or TCPIP (other communication methods are not supported for this device) + device port: either COMM4 or /dev/USB0 for USB in windows and posix systems respectively or the IP Address for TCPIP + format: "::::INSTR" + example: "TCPI::192.168.1.11::INSTR" or "USB::COMM4::INSTR" + + Returns + ------- + DG2052(pyvisa.resources.MessageBasedResource) + The object representing the instrument + + Raises + ------ + UndefinedCommunicationMethodError + when the communication method is not a USB or TCPIP in the port string + """ + + comm: CommMethod # The communication method used (either TCPIP or USB) + rm: pyvisa.ResourceManager # The resource manager object for pyvisa (for future use) + port: str # The str used for the port + + def __init__(self, port: str): # Class initialization method + if "TCPIP" in port: # Check if port starts with TCPIP + logging.debug("(PROG) detected TCPIP port") + self.comm = CommMethod.LAN # Set comm to LAN + elif "USB" in port: # Check if port starts with USB + logging.debug("(PROG) detected USB port") + self.comm = CommMethod.USB # Set comm to USB + else: # Rause Undefined Communication Method Error + raise UndefinedCommunicationMethodError(port) + rm = pyvisa.ResourceManager() # Create a pyvisa.ResourceManager object + self.rm = rm # Save that object as rm + self.port = port # Save the port string as port + super().__init__(rm, port) # create tne instrument object + logging.debug("(PROG) created dg2052 instance") + self.open() # connect to the instrument object (for ease of use) + logging.debug("(PROG) connected to dg2052 device") + + def whoami(self) -> str: + """ + shows the identification of the connected instrument + + Returns + ------- + str + The identification of the connected instrument + """ + match ( + self.comm + ): # Return an Identification string depending on the communication method + # Here a match case is used to make it easy to extend the communication methods to other methods + case CommMethod.LAN: # if the communication method is LAN + logging.debug("(PROG) communication method: LAN") + ( + manufacturer, + model, + serial, + software_ver, + ) = tuple( # Acquire the data for the manufacturer, model, serial and software version from the '*IDN?' SCPI query + self.query("*IDN?").strip().split(",") + ) + ipaddr = self.query( + ":SYST:COMM:LAN:IPAD?" + ).strip() # Get the IPAddress of the device + mac = self.query( + ":SYST:COMM:LAN:MAC?" + ).strip() # Get the MAC address of the device + out = ( + f"{manufacturer} {model}:\n\tSerial Nr.:" + + f" {serial}\n\tSoftware Ver.:" + + f" {software_ver}\n\tPort:" + + f" {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}" + ) + return out # return the formatted string + case CommMethod.USB: # if the communication method is USB + logging.debug("(PROG) communication method USB") + ( + manufacturer, + model, + serial, + software_ver, + ) = tuple( # Acquire the data for the manufacturer, model, serial and software version from the '*IDN?' SCPI query + self.query("*IDN?").strip().split(",") + ) + # info = self.system.communicate.usb.information().strip() + info = self.query( + ":SYST:COMM:USB:INF?" + ).strip() # Get the USB info of the device + out = ( + f"{manufacturer} {model}:\n\tSerial Nr.:" + + f" {serial}\n\tSoftware Ver.:" + + f" {software_ver}\n\tPort:" + + f" {self.port}\n\tINFORMATION: {info}" + ) + return out # return the formatted string + case _: # default case raise Undefined Communication Method Error + raise UndefinedCommunicationMethodError(self.port) + + def set_output(self, channel: OutputChannel, state: bool): + """ + Sets the output channel ON or OFF + + Parameters + ---------- + channel : OutpuChannel + The output channel of the device (either OutputChannel.ONE or OutputChannel.TWO) + + state : bool + The state of the output channel + """ + if state: + logging.debug(f"(PROG) :OUTP{channel.value} ON") + self.write(f":OUTP{channel.value} ON") + else: + logging.debug(f"(PROG) :OUTP{channel.value} OFF") + self.write(f":OUTP{channel.value} OFF") + + def toggle_output(self, channel: OutputChannel): + state = self.query(f":OUT{channel.value}?").strip() + logging.debug(f"(PROG) output {channel.value} state: {state}") + match (state): + case "ON": + self.set_output(channel, False) + case "OFF": + self.set_output(channel, True) + case _: + raise UndefinedValueError(state, "ON or OFF") + + def get_output_volt_limits(self, channel: OutputChannel) -> tuple[float, float]: + low: float = float(self.query(f":OUTP{channel.value}:VOLL:LOW?")) + high: float = float(self.query(f":OUTP{channel.value}:VOLL:HIGH?")) + logging.debug(f"(PROG) output {channel.value} limits: {low}, {high}") + return low, high + + def get_output_impedance(self, channel: OutputChannel) -> float: + impedance = float(self.query(f":OUTP{channel.value}:IMP?")) + logging.debug(f"(PROG) output {channel.value} impedance: {impedance}") + return impedance + + def get_output_load(self, channel: OutputChannel) -> float: + load = float(self.query(f":OUTP{channel.value}:LOAD?")) + logging.debug(f"(PROG) output {channel.value} load: {load}") + return load + + def get_output_signal(self, channel: OutputChannel) -> str: + signal = self.query(f":SOUR{channel.value}:APPL?").strip() + logging.debug(f"(PROG) output {channel.value} signal: {signal}") + return signal + + def get_output_state(self, channel: OutputChannel) -> str: + state = self.query(f":OUTP{channel.value}?").strip() + logging.debug(f"(PROG) output {channel.value} state: {state}") + return state + + def set_dc(self, channel: OutputChannel, offset: float): + logging.debug(f"(PROG) set dc signal with offset: {offset}") + self.write(f":SOUR{channel.value}:APPL:DC 1,1,{offset}") + + def set_sine_wave( + self, + channel: OutputChannel, + freq: float = 1e3, + amp: float = 5.0, + offset: float = 0.0, + phase: int = 0, + ): + if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]: + raise ValueOutOfBoundsError(SIN_RANGE, freq) + if phase < 0 and phase > 360: + raise ValueOutOfBoundsError((0, 360), phase) + logging.debug( + f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}" + ) + self.write(f":SOUR{channel.value}:APPL:SIN {freq},{amp},{offset},{phase}") + + def set_square_wave( + self, + channel: OutputChannel, # Sets the output channel of the ramp function + freq: float = 1e3, # Sets the frequency + amp: float = 5.0, # Sets the amplitude + offset: float = 0.0, # Sets the amplitude offset + phase: int = 0, # Sets the phase shift + ): + check_bounds(SQU_RANGE, freq) + check_bounds((0, 360), phase) + logging.debug( + f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}" + ) + self.write(f":SOUR{channel.value}:APPL:SQU {freq},{amp},{offset},{phase}") + + def set_ramp( + self, + channel: OutputChannel, # Sets the output channel of the ramp function + freq: float = 1e3, # Sets the frequency + amp: float = 5, # Sets the amplitude + offset: float = 0, # Sets the amplitude offset + phase: int = 0, # Sets the phase shift + ): + check_bounds(RAMP_RANGE, freq) + check_bounds((0, 360), phase) + logging.debug( + f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}" + ) + self.write(f":SOUR{channel.value}:APPL:RAMP {freq},{amp},{offset},{phase}") + + def set_sweep( + self, + channel: OutputChannel, # Sets the output channel of the sweep function + amp: float = 5, # Sets the amplitude of the sweeped signal + offset: float = 0, # Sets the offset voltage of the sweeped signal + phase: int = 0, # Sets the phase shift of the sweeped signal + signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped + htime_start: float = 0, # Sets the start hold time of the sweep function + htime_stop: float = 0, # Sets the stop hold time of the sweep function + freq_start: float = 100, # Sets the sweep starting frequency + freq_stop: float = 1e3, # Sets the sweep stopping frequency + marker: bool = False, # Enables/Disables setting the marker frequency manually + freq_marker: float = 550, # Sets the marker frequency at whic the Sync signal changes from high to low + rtime: float = 0, # Sets the return time of the sweep function + time: float = 1, # Sets the sweep time + spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type + step: int = 2, # Sets the number of steps of the sweep function + trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only) + trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL, # Sets the sweep trigger source + ): + time_bounds: tuple[float, float] = (0, 500) + command_header = f":SOUR{channel.value}:SWE" + check_bounds(time_bounds, htime_start) + check_bounds(time_bounds, htime_stop) + check_bounds(time_bounds, rtime) + check_bounds((2, 1024), step) + check_bounds((1e-3, 599.0), time) + match signal_type: + case SweepSignalType.SINE: + self.set_sine_wave(channel, amp=amp, offset=offset, phase=phase) + case SweepSignalType.SQUARE: + self.set_square_wave(channel, amp=amp, offset=offset, phase=phase) + case SweepSignalType.RAMP: + self.set_ramp(channel, amp=amp, offset=offset, phase=phase) + self.write(f":SOUR:FREQ:STAR {freq_start}") + self.write(f":SOUR:FREQ:STOP {freq_stop}") + if marker: + self.write(f":SOUR:MARK ON") + self.write(f":SOUR:MARK:FREQ {freq_marker}") + else: + self.write(f":SOUR:MARK OFF") + self.write(f"{command_header}:SPAC {spacing}") + self.write(f"{command_header}:STEP {step}") + match trigger_source: + case SweepTriggerSource.INTERNAL: + self.write(f"{command_header}:TRIG:SOUR INT") + self.write(f"{command_header}:HTIM:STAR {htime_start}") + self.write(f"{command_header}:HTIM {htime_stop}") + self.write(f"{command_header}:RTIM {rtime}") + self.write(f"{command_header}:TIME {time}") + case SweepTriggerSource.EXTERNAL: + self.write(f"{command_header}:TRIG:SOUR EXT") + if trigger_slope == SweepTriggerSlope.POSITIVE: + self.write(f"{command_header}:TRIG:SLOP POS") + elif trigger_slope == SweepTriggerSlope.NEGATIVE: + self.write(f"{command_header}:TRIG:SLOP NEG") + else: + UndefinedValueError( + trigger_slope, + "SweepTriggerSlope.Positive or SweepTriggerSlope.Negative", + ) + case SweepTriggerSource.MANUAL: + self.write(f"{command_header}:TRIG:SOUR MAN") + case _: + UndefinedValueError( + trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]" + ) + self.write(f"{command_header}:STAT ON") + + def trigger_sweep(self, channel: OutputChannel): + self.write(f":SOUR{channel.value}:SWE:TRIG:IMM") diff --git a/fn_gen/enums/__init__.py b/src/fn_gen/enums/__init__.py similarity index 100% rename from fn_gen/enums/__init__.py rename to src/fn_gen/enums/__init__.py diff --git a/fn_gen/enums/comm_method.py b/src/fn_gen/enums/comm_method.py similarity index 100% rename from fn_gen/enums/comm_method.py rename to src/fn_gen/enums/comm_method.py diff --git a/fn_gen/enums/output_channel.py b/src/fn_gen/enums/output_channel.py similarity index 100% rename from fn_gen/enums/output_channel.py rename to src/fn_gen/enums/output_channel.py diff --git a/fn_gen/enums/sweep_signal_type.py b/src/fn_gen/enums/sweep_signal_type.py similarity index 100% rename from fn_gen/enums/sweep_signal_type.py rename to src/fn_gen/enums/sweep_signal_type.py diff --git a/fn_gen/enums/sweep_spacing.py b/src/fn_gen/enums/sweep_spacing.py similarity index 100% rename from fn_gen/enums/sweep_spacing.py rename to src/fn_gen/enums/sweep_spacing.py diff --git a/fn_gen/enums/sweep_trigger_slope.py b/src/fn_gen/enums/sweep_trigger_slope.py similarity index 100% rename from fn_gen/enums/sweep_trigger_slope.py rename to src/fn_gen/enums/sweep_trigger_slope.py diff --git a/fn_gen/enums/sweep_trigger_source.py b/src/fn_gen/enums/sweep_trigger_source.py similarity index 100% rename from fn_gen/enums/sweep_trigger_source.py rename to src/fn_gen/enums/sweep_trigger_source.py diff --git a/fn_gen/errors/__init__.py b/src/fn_gen/errors/__init__.py similarity index 100% rename from fn_gen/errors/__init__.py rename to src/fn_gen/errors/__init__.py diff --git a/fn_gen/errors/undefined_communication_method_error.py b/src/fn_gen/errors/undefined_communication_method_error.py similarity index 100% rename from fn_gen/errors/undefined_communication_method_error.py rename to src/fn_gen/errors/undefined_communication_method_error.py diff --git a/fn_gen/errors/undefined_value_error.py b/src/fn_gen/errors/undefined_value_error.py similarity index 100% rename from fn_gen/errors/undefined_value_error.py rename to src/fn_gen/errors/undefined_value_error.py diff --git a/fn_gen/errors/value_out_of_bounds_error.py b/src/fn_gen/errors/value_out_of_bounds_error.py similarity index 100% rename from fn_gen/errors/value_out_of_bounds_error.py rename to src/fn_gen/errors/value_out_of_bounds_error.py