Source code for snap7.client

"""
Snap7 client used for connection to a siemens7 server.
"""
import re
from ctypes import c_int, c_char_p, byref, sizeof, c_uint16, c_int32, c_byte
from ctypes import c_void_p

import logging

import snap7
from snap7 import six
from snap7.snap7types import S7Object, buffer_type, buffer_size, BlocksList
from snap7.snap7types import TS7BlockInfo, param_types, cpu_statuses

from snap7.common import check_error, load_library, ipv4
from snap7.snap7exceptions import Snap7Exception

logger = logging.getLogger(__name__)


[docs]def error_wrap(func): """Parses a s7 error code returned the decorated function.""" def f(*args, **kw): code = func(*args, **kw) check_error(code, context="client") return f
[docs]class Client(object): """ A snap7 client """ def __init__(self): self.library = load_library() self.pointer = False self.create()
[docs] def create(self): """ create a SNAP7 client. """ logger.info("creating snap7 client") self.library.Cli_Create.restype = c_void_p self.pointer = S7Object(self.library.Cli_Create())
[docs] def destroy(self): """ destroy a client. """ logger.info("destroying snap7 client") return self.library.Cli_Destroy(byref(self.pointer))
[docs] def plc_stop(self): """ stops a client """ logger.info("stopping plc") return self.library.Cli_PlcStop(self.pointer)
[docs] def plc_cold_start(self): """ cold starts a client """ logger.info("cold starting plc") return self.library.Cli_PlcColdStart(self.pointer)
[docs] def plc_hot_start(self): """ hot starts a client """ logger.info("hot starting plc") return self.library.Cli_PlcColdStart(self.pointer)
[docs] def get_cpu_state(self): """ Retrieves CPU state from client """ state = c_int(0) self.library.Cli_GetPlcStatus(self.pointer,byref(state)) try: status_string = cpu_statuses[state.value] except KeyError: status_string = None if not status_string: raise Snap7Exception("The cpu state (%s) is invalid" % state.value) logging.debug("CPU state is %s" % status_string) return status_string
[docs] def get_cpu_info(self): """ Retrieves CPU info from client """ info = snap7.snap7types.S7CpuInfo() result = self.library.Cli_GetCpuInfo(self.pointer, byref(info)) check_error(result, context="client") return info
@error_wrap def disconnect(self): """ disconnect a client. """ logger.info("disconnecting snap7 client") return self.library.Cli_Disconnect(self.pointer) @error_wrap def connect(self, address, rack, slot, tcpport=102): """ Connect to a S7 server. :param address: IP address of server :param rack: rack on server :param slot: slot on server. """ logger.info("connecting to %s:%s rack %s slot %s" % (address, tcpport, rack, slot)) self.set_param(snap7.snap7types.RemotePort, tcpport) return self.library.Cli_ConnectTo( self.pointer, c_char_p(six.b(address)), c_int(rack), c_int(slot))
[docs] def db_read(self, db_number, start, size): """This is a lean function of Cli_ReadArea() to read PLC DB. :returns: user buffer. """ logger.debug("db_read, db_number:%s, start:%s, size:%s" % (db_number, start, size)) type_ = snap7.snap7types.wordlen_to_ctypes[snap7.snap7types.S7WLByte] data = (type_ * size)() result = (self.library.Cli_DBRead( self.pointer, db_number, start, size, byref(data))) check_error(result, context="client") return bytearray(data)
@error_wrap def db_write(self, db_number, start, data): """ Writes to a DB object. :param start: write offset :param data: bytearray """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] size = len(data) cdata = (type_ * size).from_buffer(data) logger.debug("db_write db_number:%s start:%s size:%s data:%s" % (db_number, start, size, data)) return self.library.Cli_DBWrite(self.pointer, db_number, start, size, byref(cdata))
[docs] def full_upload(self, _type, block_num): """ Uploads a full block body from AG. The whole block (including header and footer) is copied into the user buffer. :param block_num: Number of Block """ _buffer = buffer_type() size = c_int(sizeof(_buffer)) block_type = snap7.snap7types.block_types[_type] result = self.library.Cli_FullUpload(self.pointer, block_type, block_num, byref(_buffer), byref(size)) check_error(result, context="client") return bytearray(_buffer), size.value
[docs] def upload(self, block_num): """ Uploads a block body from AG :param data: bytearray """ logger.debug("db_upload block_num: %s" % (block_num)) block_type = snap7.snap7types.block_types['DB'] _buffer = buffer_type() size = c_int(sizeof(_buffer)) result = self.library.Cli_Upload(self.pointer, block_type, block_num, byref(_buffer), byref(size)) check_error(result, context="client") logger.info('received %s bytes' % size) return bytearray(_buffer)
@error_wrap def download(self, data, block_num=-1): """ Downloads a DB data into the AG. A whole block (including header and footer) must be available into the user buffer. :param block_num: New Block number (or -1) :param data: the user buffer """ type_ = c_byte size = len(data) cdata = (type_ * len(data)).from_buffer(data) result = self.library.Cli_Download(self.pointer, block_num, byref(cdata), size) return result
[docs] def db_get(self, db_number): """Uploads a DB from AG. """ logging.debug("db_get db_number: %s" % db_number) _buffer = buffer_type() result = self.library.Cli_DBGet( self.pointer, db_number, byref(_buffer), byref(c_int(buffer_size))) check_error(result, context="client") return bytearray(_buffer)
[docs] def read_area(self, area, dbnumber, start, size): """This is the main function to read data from a PLC. With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters. :param dbnumber: The DB number, only used when area= S7AreaDB :param start: offset to start writing :param size: number of units to read """ assert area in snap7.snap7types.areas.values() wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] logging.debug("reading area: %s dbnumber: %s start: %s: amount %s: " "wordlen: %s" % (area, dbnumber, start, size, wordlen)) data = (type_ * size)() result = self.library.Cli_ReadArea(self.pointer, area, dbnumber, start, size, wordlen, byref(data)) check_error(result, context="client") return bytearray(data)
@error_wrap def write_area(self, area, dbnumber, start, data): """This is the main function to write data into a PLC. It's the complementary function of Cli_ReadArea(), the parameters and their meanings are the same. The only difference is that the data is transferred from the buffer pointed by pUsrData into PLC. :param dbnumber: The DB number, only used when area= S7AreaDB :param start: offset to start writing :param data: a bytearray containing the payload """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] size = len(data) logging.debug("writing area: %s dbnumber: %s start: %s: size %s: " "type: %s" % (area, dbnumber, start, size, type_)) cdata = (type_ * len(data)).from_buffer(data) return self.library.Cli_WriteArea(self.pointer, area, dbnumber, start, size, wordlen, byref(cdata))
[docs] def read_multi_vars(self, items): """This function read multiple variables from the PLC. :param items: list of S7DataItem objects :returns: a tuple with the return code and a list of data items """ result = self.library.Cli_ReadMultiVars(self.pointer, byref(items), c_int32(len(items))) check_error(result, context="client") return result, items
[docs] def list_blocks(self): """Returns the AG blocks amount divided by type. :returns: a snap7.types.BlocksList object. """ logging.debug("listing blocks") blocksList = BlocksList() result = self.library.Cli_ListBlocks(self.pointer, byref(blocksList)) check_error(result, context="client") logging.debug("blocks: %s" % blocksList) return blocksList
[docs] def list_blocks_of_type(self, blocktype, size): """This function returns the AG list of a specified block type.""" blocktype = snap7.snap7types.block_types.get(blocktype) if not blocktype: raise Snap7Exception("The blocktype parameter was invalid") logging.debug("listing blocks of type: %s size: %s" % (blocktype, size)) data = (c_int * 10)() count = c_int(size) result = self.library.Cli_ListBlocksOfType( self.pointer, blocktype, byref(data), byref(count)) logging.debug("number of items found: %s" % count) check_error(result, context="client") return data
[docs] def get_block_info(self, blocktype, db_number): """Returns the block information for the specified block.""" blocktype = snap7.snap7types.block_types.get(blocktype) if not blocktype: raise Snap7Exception("The blocktype parameter was invalid") logging.debug("retrieving block info for block %s of type %s" % (db_number, blocktype)) data = TS7BlockInfo() result = self.library.Cli_GetAgBlockInfo( self.pointer, blocktype, db_number, byref(data)) check_error(result, context="client") return data
@error_wrap def set_session_password(self, password): """Send the password to the PLC to meet its security level.""" assert len(password) <= 8, 'maximum password length is 8' return self.library.Cli_SetSessionPassword(self.pointer, c_char_p(six.b(password))) @error_wrap def clear_session_password(self): """Clears the password set for the current session (logout).""" return self.library.Cli_ClearSessionPassword(self.pointer)
[docs] def set_connection_params(self, address, local_tsap, remote_tsap): """ Sets internally (IP, LocalTSAP, RemoteTSAP) Coordinates. This function must be called just before Cli_Connect(). :param address: PLC/Equipment IPV4 Address, for example "192.168.1.12" :param local_tsap: Local TSAP (PC TSAP) :param remote_tsap: Remote TSAP (PLC TSAP) """ assert re.match(ipv4, address), '%s is invalid ipv4' % address result = self.library.Cli_SetConnectionParams(self.pointer, address, c_uint16(local_tsap), c_uint16(remote_tsap)) if result != 0: raise Snap7Exception("The parameter was invalid")
[docs] def set_connection_type(self, connection_type): """ Sets the connection resource type, i.e the way in which the Clients connects to a PLC. :param connection_type: 1 for PG, 2 for OP, 3 to 10 for S7 Basic """ result = self.library.Cli_SetConnectionType(self.pointer, c_uint16(connection_type)) if result != 0: raise Snap7Exception("The parameter was invalid")
[docs] def get_connected(self): """ Returns the connection status :returns: a boolean that indicates if connected. """ connected = c_int32() result = self.library.Cli_GetConnected(self.pointer, byref(connected)) check_error(result, context="client") return bool(connected)
[docs] def ab_read(self, start, size): """ This is a lean function of Cli_ReadArea() to read PLC process outputs. """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] data = (type_ * size)() logging.debug("ab_read: start: %s: size %s: " % (start, size)) result = self.library.Cli_ABRead(self.pointer, start, size, byref(data)) check_error(result, context="client") return bytearray(data)
[docs] def ab_write(self, start, data): """ This is a lean function of Cli_WriteArea() to write PLC process outputs """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] size = len(data) cdata = (type_ * size).from_buffer(data) logging.debug("ab write: start: %s: size: %s: " % (start, size)) return self.library.Cli_ABWrite( self.pointer, start, size, byref(cdata))
[docs] def as_ab_read(self, start, size): """ This is the asynchronous counterpart of client.ab_read(). """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] data = (type_ * size)() logging.debug("ab_read: start: %s: size %s: " % (start, size)) result = self.library.Cli_AsABRead(self.pointer, start, size, byref(data)) check_error(result, context="client") return bytearray(data)
[docs] def as_ab_write(self, start, data): """ This is the asynchronous counterpart of Cli_ABWrite. """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] size = len(data) cdata = (type_ * size).from_buffer(data) logging.debug("ab write: start: %s: size: %s: " % (start, size)) return self.library.Cli_AsABWrite( self.pointer, start, size, byref(cdata))
@error_wrap def as_compress(self, time): """ This is the asynchronous counterpart of client.compress(). """ return self.library.Cli_AsCompress(self.pointer, time)
[docs] def copy_ram_to_rom(self): """ """ return self.library.Cli_AsCopyRamToRom(self.pointer)
[docs] def as_ct_read(self): """ """ return self.library.Cli_AsCTRead(self.pointer)
[docs] def as_ct_write(self): """ """ return self.library.Cli_AsCTWrite(self.pointer)
[docs] def as_db_fill(self): """ """ return self.library.Cli_AsDBFill(self.pointer)
[docs] def as_db_get(self, db_number): """ This is the asynchronous counterpart of Cli_DBGet. """ logging.debug("db_get db_number: %s" % db_number) _buffer = buffer_type() result = self.library.Cli_AsDBGet(self.pointer, db_number, byref(_buffer), byref(c_int(buffer_size))) check_error(result, context="client") return bytearray(_buffer)
[docs] def as_db_read(self, db_number, start, size): """ This is the asynchronous counterpart of Cli_DBRead. :returns: user buffer. """ logger.debug("db_read, db_number:%s, start:%s, size:%s" % (db_number, start, size)) type_ = snap7.snap7types.wordlen_to_ctypes[snap7.snap7types.S7WLByte] data = (type_ * size)() result = (self.library.Cli_AsDBRead(self.pointer, db_number, start, size, byref(data))) check_error(result, context="client") return bytearray(data)
[docs] def as_db_write(self, db_number, start, data): """ """ wordlen = snap7.snap7types.S7WLByte type_ = snap7.snap7types.wordlen_to_ctypes[wordlen] size = len(data) cdata = (type_ * size).from_buffer(data) logger.debug("db_write db_number:%s start:%s size:%s data:%s" % (db_number, start, size, data)) return self.library.Cli_AsDBWrite( self.pointer, db_number, start, size, byref(cdata))
@error_wrap def as_download(self, data, block_num=-1): """ Downloads a DB data into the AG asynchronously. A whole block (including header and footer) must be available into the user buffer. :param block_num: New Block number (or -1) :param data: the user buffer """ size = len(data) type_ = c_byte * len(data) cdata = type_.from_buffer(data) return self.library.Cli_AsDownload(self.pointer, block_num, byref(cdata), size) @error_wrap def compress(self, time): """ Performs the Memory compress action. :param time: Maximum time expected to complete the operation (ms). """ return self.library.Cli_Compress(self.pointer, time) @error_wrap def set_param(self, number, value): """Sets an internal Server object parameter. """ logger.debug("setting param number %s to %s" % (number, value)) type_ = param_types[number] return self.library.Cli_SetParam(self.pointer, number, byref(type_(value)))
[docs] def get_param(self, number): """Reads an internal Client object parameter. """ logger.debug("retreiving param number %s" % number) type_ = param_types[number] value = type_() code = self.library.Cli_GetParam(self.pointer, c_int(number), byref(value)) check_error(code) return value.value
[docs] def get_pdu_length(self): """ Returns info about the PDU length. """ logger.info("getting PDU length") requested_ = c_uint16() negotiated_ = c_uint16() code = self.library.Cli_GetPduLength(self.pointer, byref(requested_), byref(negotiated_)) check_error(code) return negotiated_.value