fixed project sturcture, fixes imports

This commit is contained in:
2024-01-24 15:48:07 +01:00
parent 718015c7d4
commit ee5d585f01
17 changed files with 296 additions and 204 deletions

3
src/fn_gen/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .dg2052 import DG2052
from .enums import *
from .errors import *

5
src/fn_gen/common.py Normal file
View File

@@ -0,0 +1,5 @@
from .errors import ValueOutOfBoundsError
def check_bounds(bounds: tuple[float, float] | tuple[int, int], value: float | int):
if value < bounds[0] or value > bounds[1]:
raise ValueOutOfBoundsError(bounds, value)

View File

@@ -0,0 +1,11 @@
SIN_RANGE = (1e-6, 50e6)
SQU_RANGE = (1e-6, 15e6)
RAMP_RANGE = (1e-6, 1.5e6)
PULSE_RANGE = (1e-6, 20e6)
HARM_RANGE = (1e-6, 20e6)
NOISE_BANDWIDTH = 100e6
USER_RANGE = (1e-6, 15e6)
DUALT_RANGE = (1e-6, 20e6)
PRBS_RANGE = (2e3, 40e6)
SEQ_RANGE = (2e3, 60e6)

295
src/fn_gen/dg2052.py Normal file
View File

@@ -0,0 +1,295 @@
### PLEASE DO NOT MIND THE FORMATTING, IT IS DONE AUTOMATICALLY BY 'BLACK' THE PYTHON FORMATTER
from re import DEBUG
import time
import logging
import pyvisa
from .errors import *
from .enums import *
from .common import *
from .constants.dg2052 import *
class DG2052(pyvisa.resources.MessageBasedResource):
"""
This is an object representing the Rigol DG2052 function generator. This object uses the SCPI protocol for communicating with the Rigol DG2052 function generator.
Parameters
----------
port : str
The SCPI port describing the device, consists of a communication method and device port followed by the "::INSTR" keyword.
communication method: can be either USB or TCPIP (other communication methods are not supported for this device)
device port: either COMM4 or /dev/USB0 for USB in windows and posix systems respectively or the IP Address for TCPIP
format: "<communication method>::<device port>::INSTR"
example: "TCPI::192.168.1.11::INSTR" or "USB::COMM4::INSTR"
Returns
-------
DG2052(pyvisa.resources.MessageBasedResource)
The object representing the instrument
Raises
------
UndefinedCommunicationMethodError
when the communication method is not a USB or TCPIP in the port string
"""
comm: CommMethod # The communication method used (either TCPIP or USB)
rm: pyvisa.ResourceManager # The resource manager object for pyvisa (for future use)
port: str # The str used for the port
def __init__(self, port: str): # Class initialization method
if "TCPIP" in port: # Check if port starts with TCPIP
logging.debug("(PROG) detected TCPIP port")
self.comm = CommMethod.LAN # Set comm to LAN
elif "USB" in port: # Check if port starts with USB
logging.debug("(PROG) detected USB port")
self.comm = CommMethod.USB # Set comm to USB
else: # Rause Undefined Communication Method Error
raise UndefinedCommunicationMethodError(port)
rm = pyvisa.ResourceManager() # Create a pyvisa.ResourceManager object
self.rm = rm # Save that object as rm
self.port = port # Save the port string as port
super().__init__(rm, port) # create tne instrument object
logging.debug("(PROG) created dg2052 instance")
self.open() # connect to the instrument object (for ease of use)
logging.debug("(PROG) connected to dg2052 device")
def whoami(self) -> str:
"""
shows the identification of the connected instrument
Returns
-------
str
The identification of the connected instrument
"""
match (
self.comm
): # Return an Identification string depending on the communication method
# Here a match case is used to make it easy to extend the communication methods to other methods
case CommMethod.LAN: # if the communication method is LAN
logging.debug("(PROG) communication method: LAN")
(
manufacturer,
model,
serial,
software_ver,
) = tuple( # Acquire the data for the manufacturer, model, serial and software version from the '*IDN?' SCPI query
self.query("*IDN?").strip().split(",")
)
ipaddr = self.query(
":SYST:COMM:LAN:IPAD?"
).strip() # Get the IPAddress of the device
mac = self.query(
":SYST:COMM:LAN:MAC?"
).strip() # Get the MAC address of the device
out = (
f"{manufacturer} {model}:\n\tSerial Nr.:"
+ f" {serial}\n\tSoftware Ver.:"
+ f" {software_ver}\n\tPort:"
+ f" {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}"
)
return out # return the formatted string
case CommMethod.USB: # if the communication method is USB
logging.debug("(PROG) communication method USB")
(
manufacturer,
model,
serial,
software_ver,
) = tuple( # Acquire the data for the manufacturer, model, serial and software version from the '*IDN?' SCPI query
self.query("*IDN?").strip().split(",")
)
# info = self.system.communicate.usb.information().strip()
info = self.query(
":SYST:COMM:USB:INF?"
).strip() # Get the USB info of the device
out = (
f"{manufacturer} {model}:\n\tSerial Nr.:"
+ f" {serial}\n\tSoftware Ver.:"
+ f" {software_ver}\n\tPort:"
+ f" {self.port}\n\tINFORMATION: {info}"
)
return out # return the formatted string
case _: # default case raise Undefined Communication Method Error
raise UndefinedCommunicationMethodError(self.port)
def set_output(self, channel: OutputChannel, state: bool):
"""
Sets the output channel ON or OFF
Parameters
----------
channel : OutpuChannel
The output channel of the device (either OutputChannel.ONE or OutputChannel.TWO)
state : bool
The state of the output channel
"""
if state:
logging.debug(f"(PROG) :OUTP{channel.value} ON")
self.write(f":OUTP{channel.value} ON")
else:
logging.debug(f"(PROG) :OUTP{channel.value} OFF")
self.write(f":OUTP{channel.value} OFF")
def toggle_output(self, channel: OutputChannel):
state = self.query(f":OUT{channel.value}?").strip()
logging.debug(f"(PROG) output {channel.value} state: {state}")
match (state):
case "ON":
self.set_output(channel, False)
case "OFF":
self.set_output(channel, True)
case _:
raise UndefinedValueError(state, "ON or OFF")
def get_output_volt_limits(self, channel: OutputChannel) -> tuple[float, float]:
low: float = float(self.query(f":OUTP{channel.value}:VOLL:LOW?"))
high: float = float(self.query(f":OUTP{channel.value}:VOLL:HIGH?"))
logging.debug(f"(PROG) output {channel.value} limits: {low}, {high}")
return low, high
def get_output_impedance(self, channel: OutputChannel) -> float:
impedance = float(self.query(f":OUTP{channel.value}:IMP?"))
logging.debug(f"(PROG) output {channel.value} impedance: {impedance}")
return impedance
def get_output_load(self, channel: OutputChannel) -> float:
load = float(self.query(f":OUTP{channel.value}:LOAD?"))
logging.debug(f"(PROG) output {channel.value} load: {load}")
return load
def get_output_signal(self, channel: OutputChannel) -> str:
signal = self.query(f":SOUR{channel.value}:APPL?").strip()
logging.debug(f"(PROG) output {channel.value} signal: {signal}")
return signal
def get_output_state(self, channel: OutputChannel) -> str:
state = self.query(f":OUTP{channel.value}?").strip()
logging.debug(f"(PROG) output {channel.value} state: {state}")
return state
def set_dc(self, channel: OutputChannel, offset: float):
logging.debug(f"(PROG) set dc signal with offset: {offset}")
self.write(f":SOUR{channel.value}:APPL:DC 1,1,{offset}")
def set_sine_wave(
self,
channel: OutputChannel,
freq: float = 1e3,
amp: float = 5.0,
offset: float = 0.0,
phase: int = 0,
):
if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]:
raise ValueOutOfBoundsError(SIN_RANGE, freq)
if phase < 0 and phase > 360:
raise ValueOutOfBoundsError((0, 360), phase)
logging.debug(
f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}"
)
self.write(f":SOUR{channel.value}:APPL:SIN {freq},{amp},{offset},{phase}")
def set_square_wave(
self,
channel: OutputChannel, # Sets the output channel of the ramp function
freq: float = 1e3, # Sets the frequency
amp: float = 5.0, # Sets the amplitude
offset: float = 0.0, # Sets the amplitude offset
phase: int = 0, # Sets the phase shift
):
check_bounds(SQU_RANGE, freq)
check_bounds((0, 360), phase)
logging.debug(
f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}"
)
self.write(f":SOUR{channel.value}:APPL:SQU {freq},{amp},{offset},{phase}")
def set_ramp(
self,
channel: OutputChannel, # Sets the output channel of the ramp function
freq: float = 1e3, # Sets the frequency
amp: float = 5, # Sets the amplitude
offset: float = 0, # Sets the amplitude offset
phase: int = 0, # Sets the phase shift
):
check_bounds(RAMP_RANGE, freq)
check_bounds((0, 360), phase)
logging.debug(
f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}"
)
self.write(f":SOUR{channel.value}:APPL:RAMP {freq},{amp},{offset},{phase}")
def set_sweep(
self,
channel: OutputChannel, # Sets the output channel of the sweep function
amp: float = 5, # Sets the amplitude of the sweeped signal
offset: float = 0, # Sets the offset voltage of the sweeped signal
phase: int = 0, # Sets the phase shift of the sweeped signal
signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped
htime_start: float = 0, # Sets the start hold time of the sweep function
htime_stop: float = 0, # Sets the stop hold time of the sweep function
freq_start: float = 100, # Sets the sweep starting frequency
freq_stop: float = 1e3, # Sets the sweep stopping frequency
marker: bool = False, # Enables/Disables setting the marker frequency manually
freq_marker: float = 550, # Sets the marker frequency at whic the Sync signal changes from high to low
rtime: float = 0, # Sets the return time of the sweep function
time: float = 1, # Sets the sweep time
spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type
step: int = 2, # Sets the number of steps of the sweep function
trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only)
trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL, # Sets the sweep trigger source
):
time_bounds: tuple[float, float] = (0, 500)
command_header = f":SOUR{channel.value}:SWE"
check_bounds(time_bounds, htime_start)
check_bounds(time_bounds, htime_stop)
check_bounds(time_bounds, rtime)
check_bounds((2, 1024), step)
check_bounds((1e-3, 599.0), time)
match signal_type:
case SweepSignalType.SINE:
self.set_sine_wave(channel, amp=amp, offset=offset, phase=phase)
case SweepSignalType.SQUARE:
self.set_square_wave(channel, amp=amp, offset=offset, phase=phase)
case SweepSignalType.RAMP:
self.set_ramp(channel, amp=amp, offset=offset, phase=phase)
self.write(f":SOUR:FREQ:STAR {freq_start}")
self.write(f":SOUR:FREQ:STOP {freq_stop}")
if marker:
self.write(f":SOUR:MARK ON")
self.write(f":SOUR:MARK:FREQ {freq_marker}")
else:
self.write(f":SOUR:MARK OFF")
self.write(f"{command_header}:SPAC {spacing}")
self.write(f"{command_header}:STEP {step}")
match trigger_source:
case SweepTriggerSource.INTERNAL:
self.write(f"{command_header}:TRIG:SOUR INT")
self.write(f"{command_header}:HTIM:STAR {htime_start}")
self.write(f"{command_header}:HTIM {htime_stop}")
self.write(f"{command_header}:RTIM {rtime}")
self.write(f"{command_header}:TIME {time}")
case SweepTriggerSource.EXTERNAL:
self.write(f"{command_header}:TRIG:SOUR EXT")
if trigger_slope == SweepTriggerSlope.POSITIVE:
self.write(f"{command_header}:TRIG:SLOP POS")
elif trigger_slope == SweepTriggerSlope.NEGATIVE:
self.write(f"{command_header}:TRIG:SLOP NEG")
else:
UndefinedValueError(
trigger_slope,
"SweepTriggerSlope.Positive or SweepTriggerSlope.Negative",
)
case SweepTriggerSource.MANUAL:
self.write(f"{command_header}:TRIG:SOUR MAN")
case _:
UndefinedValueError(
trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]"
)
self.write(f"{command_header}:STAT ON")
def trigger_sweep(self, channel: OutputChannel):
self.write(f":SOUR{channel.value}:SWE:TRIG:IMM")

View File

@@ -0,0 +1,6 @@
from .comm_method import CommMethod
from .output_channel import OutputChannel
from .sweep_spacing import SweepSpacing
from .sweep_trigger_slope import SweepTriggerSlope
from .sweep_trigger_source import SweepTriggerSource
from .sweep_signal_type import SweepSignalType

View File

@@ -0,0 +1,5 @@
from enum import Enum
class CommMethod(Enum):
USB = 0
LAN = 1

View File

@@ -0,0 +1,5 @@
from enum import Enum
class OutputChannel(Enum):
ONE = 1
TWO = 2

View File

@@ -0,0 +1,6 @@
from enum import Enum
class SweepSignalType(Enum):
SINE = 1
SQUARE = 2
RAMP = 3

View File

@@ -0,0 +1,6 @@
from enum import Enum
class SweepSpacing(Enum):
LIN = 1
LOG = 2
STEP = 3

View File

@@ -0,0 +1,5 @@
from enum import Enum
class SweepTriggerSlope(Enum):
POSITIVE = 1
NEGATIVE = 2

View File

@@ -0,0 +1,6 @@
from enum import Enum
class SweepTriggerSource(Enum):
INTERNAL = 1
EXTERNAL = 2
MANUAL = 3

View File

@@ -0,0 +1,3 @@
from .undefined_communication_method_error import UndefinedCommunicationMethodError
from .value_out_of_bounds_error import ValueOutOfBoundsError
from .undefined_value_error import UndefinedValueError

View File

@@ -0,0 +1,5 @@
class UndefinedCommunicationMethodError(Exception):
def __init__(self, port: str):
method = port.split( "::" )[0]
super().__init__( f"ERROR: Undefined Communication Exception, Method \"{method}\" is not recognized in Port \"{port}\"." )

View File

@@ -0,0 +1,4 @@
class UndefinedValueError(Exception):
def __init__(self, value: str, expected: str):
super().__init__( f"ERROR: Undefined Value, expected: {expected}, value: {value}." )

View File

@@ -0,0 +1,4 @@
class ValueOutOfBoundsError(Exception):
def __init__(self, bounds: tuple[float, float] | tuple[int, int], value: float | int):
super().__init__( f"ERROR: Value out of expected bounds, Min: {bounds[0]}, Max: {bounds[1]}, Value: {value}." )