# modbus_rt 模块 API 文档 ## API ### class data_trans(_modbus_rt._data_trans): ``` python def reg2reg(self,val:int):... ``` ``` python def regs2regs(self,val:list):... ``` ``` python def regs2bytes(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2str(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2signed(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2int(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2uint(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2long(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2float(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def regs2double(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def bytes2regs(self,val:any,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def str2regs(self,val:str,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def signed2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def int2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def uint2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def long2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def float2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ``` python def double2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):... ``` ### class rtu(_modbus_rt._rtu): ``` python def __init__(self,mode=SLAVE):... ``` ``` python def set_serial(self,devname:str,baudrate=115200,bytesize=8,parity='N',stopbits=1,xonxoff=0):... ``` ``` python def set_over_type(self,over_type:int):... ``` ``` python def set_net(self,ip='',port=502,type=SOCK_STREAM):... ``` ``` python def set_ip(self,ip:str):... ``` ``` python def set_port(self,port:int):... ``` ``` python def set_type(self,type:int):... ``` ``` python def set_p2p(self,p2p_flag:int):... ``` ``` python def open(self):... ``` ``` python def isopen(self):... ``` ``` python def close(self):... ``` ``` python def set_addr(self,addr:int):... ``` ``` python def set_strict(self,strict:int):... ``` ``` python def add_block(self,name:str,type:int,addr:int,nums:int):... ``` ``` python def regs_binding(self,regs:bytes,type:int,addr:int,nums:int):... ``` ``` python def set_pre_ans_callback(self,cb):... ``` ``` python def set_done_callback(self,cb):... ``` ``` python def set_dev_binding(self,flag:int):... ``` ``` python def set_server(self,saddr:str,sport:int):... ``` ``` python def get_saddr(self):... ``` ``` python def excuse(self,dir_slave:int,type_function:int,addr:int,*val):... ``` ``` python def download(self,slave:int,file_dev:str,file_master:str):... ``` ``` python def upload(self,slave:int,file_dev:str,file_master:str):... ``` ### class ascii(rtu): ### class tcp(_modbus_rt._tcp): ``` python def __init__(self,mode=SLAVE):... ``` ``` python def set_net(self,ip='',port=502,type=SOCK_STREAM):... ``` ``` python def set_ip(self,ip:str):... ``` ``` python def set_port(self,port:int):... ``` ``` python def set_type(self,type:int):... ``` ``` python def set_p2p(self,p2p_flag:int):... ``` ``` python def open(self):... ``` ``` python def isopen(self):... ``` ``` python def close(self):... ``` ``` python def set_addr(self,addr:int):... ``` ``` python def set_strict(self,strict:int):... ``` ``` python def add_block(self,name:str,type:int,addr:int,nums:int):... ``` ``` python def regs_binding(self,regs:bytes,type:int,addr:int,nums:int):... ``` ``` python def set_pre_ans_callback(self,cb):... ``` ``` python def set_done_callback(self,cb):... ``` ``` python def set_dev_binding(self,flag:int):... ``` ``` python def set_server(self,saddr:str,sport:int):... ``` ``` python def get_saddr(self):... ``` ``` python def excuse(self,dir_slave:int,type_function:int,addr:int,*val):... ``` ``` python def download(self,slave:int,file_dev:str,file_master:str):... ``` ``` python def upload(self,slave:int,file_dev:str,file_master:str):... ``` ## Examples ### tcp2rtu_dtu.py ```python import modbus_rt import modbus_rt_defines as cst serial_name = "/dev/ttyUSB0" ip_addr = "" rm = modbus_rt.rtu(cst.MASTER) rm.set_serial(serial_name) rm.open() ts = modbus_rt.tcp() ts.set_net(ip_addr, 502, cst.SOCK_STREAM) def pre_call(evt) : slave = evt.slave function = evt.function addr = evt.addr quantity = evt.quantity if cst.READ_HOLDING_REGISTERS == function: data = rm.excuse(slave, function, addr, quantity) ts.excuse(cst.WRITE, cst.REGISTERS, addr, quantity, data) elif cst.READ_DISCRETE_INPUTS == function: data = rm.excuse(slave, function, addr, quantity) ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data) def done_call(evt) : slave = evt.slave function = evt.function addr = evt.addr quantity = evt.quantity if cst.WRITE_SINGLE_COIL == function: data = ts.excuse(cst.READ, cst.CIOLS, addr, 1) rm.excuse(slave, function, addr, data[0]) elif cst.WRITE_SINGLE_REGISTER == function: data = ts.excuse(cst.READ, cst.REGISTERS, addr, 1) rm.excuse(slave, function, addr, data[0]) elif cst.WRITE_MULTIPLE_COILS == function: data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity) rm.excuse(slave, function, addr, quantity, data) elif cst.WRITE_MULTIPLE_REGISTERS == function: data = ts.excuse(0, cst.REGISTERS, addr, quantity) rm.excuse(slave, function, addr, quantity, data) ts.add_block("A", 0, 20000, 10) ts.add_block("B", 1, 10000, 16) ts.add_block("C", 4, 0, 10) ts.set_strict(0) ts.set_pre_ans_callback(pre_call) ts.set_done_callback(done_call) ts.open() ``` ### rtu_slave_for_udp.py ```python import modbus_rt import modbus_rt_defines as cst ip_addr = "192.168.28.150" rsu = modbus_rt.rtu() rsu.set_over_type(cst.OVER_NET) rsu.set_net(ip_addr, 502, cst.SOCK_DGRAM) rsu.add_block("A",cst.REGISTERS, 0, 10) rsu.open() # rsu.excuse(0,4,0,5) # rsu.excuse(1,4,0,5,[1,2,34,5,6]) # rsu.excuse(0,4,0,5) ``` ### udp_slave.py ```python import modbus_rt import modbus_rt_defines as cst ip_addr = "192.168.28.150" tsu = modbus_rt.tcp() tsu.set_net(ip_addr, 502, cst.SOCK_DGRAM) tsu.add_block("A",cst.REGISTERS, 0, 10) tsu.open() # tsu.excuse(0,4,0,5) # tsu.excuse(1,4,0,5,[1,2,34,5,6]) # tsu.excuse(0,4,0,5) ``` ### rtu_slave.py ```python import modbus_rt import modbus_rt_defines as cst serial_name = "uart1" rs = modbus_rt.rtu() rs.set_serial(serial_name) rs.add_block("A",cst.REGISTERS, 0, 10) rs.open() # rs.excuse(0,4,0,5) # rs.excuse(1,4,0,5,[1,2,34,5,6]) # rs.excuse(0,4,0,5) ``` ### dtu_stm32.py ```python import modbus_rt import modbus_rt_defines as cst serial_name = "uart4" ip_addr = "" rm = modbus_rt.rtu(cst.MASTER) rm.set_serial(serial_name) rm.open() ts = modbus_rt.tcp() ts.set_net(ip_addr, 502, cst.SOCK_STREAM) def pre_call(evt) : slave = evt.slave function = evt.function addr = evt.addr quantity = evt.quantity if cst.READ_DISCRETE_INPUTS == function: if addr >= 0 and addr <= 16 : data = rm.excuse(slave, function, addr + 10000, quantity) ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data) elif cst.READ_COILS == function: if addr >= 0 and addr <= 16 : data = rm.excuse(slave, function, addr + 20000, quantity) ts.excuse(cst.WRITE, cst.CIOLS, addr, quantity, data) def done_call(evt) : slave = evt.slave function = evt.function addr = evt.addr quantity = evt.quantity if cst.WRITE_SINGLE_COIL == function: if addr >= 0 and addr <= 16 : data = ts.excuse(cst.READ, cst.CIOLS, addr, 1) rm.excuse(slave, function, addr + 20000, data[0]) elif cst.WRITE_MULTIPLE_COILS == function: if addr >= 0 and addr <= 16 : data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity) rm.excuse(slave, function, addr + 20000, quantity, data) ts.set_strict(0) ts.set_pre_ans_callback(pre_call) ts.set_done_callback(done_call) ts.open() ``` ### ascii_slave.py ```python import modbus_rt import modbus_rt_defines as cst serial_name = "COM11" ascii_s = modbus_rt.ascii() ascii_s.set_serial(serial_name) ascii_s.add_block("A",cst.REGISTERS, 0, 10) ascii_s.open() # ascii_s.excuse(0,4,0,5) # ascii_s.excuse(1,4,0,5,[1,2,34,5,6]) # ascii_s.excuse(0,4,0,5) ``` ### tcp_master.py ```python import modbus_rt import modbus_rt_defines as cst ip_addr = "192.168.28.150" tm = modbus_rt.tcp(cst.MASTER) tm.set_net(ip_addr, 0, cst.SOCK_STREAM) tm.set_server(ip_addr, 502) tm.open() # tm.excuse(1,3,0,5) # tm.excuse(1,16,0,5,[21,23,24,25,65]) # tm.excuse(1,3,0,5) ``` ### rtu_slave_for_tcp.py ```python import modbus_rt import modbus_rt_defines as cst ip_addr = "192.168.28.150" rst = modbus_rt.rtu() rst.set_over_type(cst.OVER_NET) rst.set_net(ip_addr, 502, cst.SOCK_STREAM) rst.add_block("A",cst.REGISTERS, 0, 10) rst.open() # rst.excuse(0,4,0,5) # rst.excuse(1,4,0,5,[1,2,34,5,6]) # rst.excuse(0,4,0,5) ``` ### rtu_master.py ```python import modbus_rt import modbus_rt_defines as cst serial_name = "COM11" rm = modbus_rt.rtu(cst.MASTER) rm.set_serial(serial_name) rm.open() # rm.excuse(1,3,0,5) # rm.excuse(1,16,0,5,[21,23,24,25,65]) # rm.excuse(1,3,0,5) ``` ### tcp_slave.py ```python import modbus_rt import modbus_rt_defines as cst ip_addr = "192.168.28.150" ts = modbus_rt.tcp() ts.set_net(ip_addr, 502, cst.SOCK_STREAM) ts.add_block("A",cst.REGISTERS, 0, 10) ts.open() # ts.excuse(0,4,0,5) # ts.excuse(1,4,0,5,[1,2,34,5,6]) # ts.excuse(0,4,0,5) ``` ### ascii_master.py ```python import modbus_rt import modbus_rt_defines as cst serial_name = "COM11" ascii_m = modbus_rt.rtu(cst.MASTER) ascii_m.set_serial(serial_name) ascii_m.open() # ascii_m.excuse(1,3,0,5) # ascii_m.excuse(1,16,0,5,[21,23,24,25,65]) # ascii_m.excuse(1,3,0,5) ``` ### udp_master.py ```python import modbus_rt import modbus_rt_defines as cst ip_addr = "192.168.28.150" tmu = modbus_rt.tcp(cst.MASTER) tmu.set_net(ip_addr, 0, cst.SOCK_DGRAM) tmu.open() # tmu.set_server("255.255.255.255", 502) # tmu.excuse(1,3,0,5) # tmu.get_saddr() # tmu.excuse(1,16,0,5,[21,23,24,25,65]) # tmu.excuse(1,3,0,5) ```