"""
Snap7 client used for connection to a siemens 7 server.
"""
import re
import logging
from ctypes import byref, create_string_buffer, sizeof
from ctypes import Array, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p
from datetime import datetime
from typing import List, Optional, Tuple, Union
from ..common import check_error, ipv4, load_library
from ..types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem
from ..types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen
from ..types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types
from ..types import S7CpuInfo, RemotePort, wordlen_to_ctypes, block_types
logger = logging.getLogger(__name__)
[docs]def error_wrap(func):
"""Parses a s7 error code returned the decorated function."""
def f(*args, **kw):
code = func(*args, **kw)
check_error(code, context="client")
return f
[docs]class Client:
"""
A snap7 client
Examples:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("127.0.0.1", 0, 0, 1102)
>>> client.get_connected()
True
>>> data = client.db_read(1, 0, 4)
>>> data
bytearray(b"\\x00\\x00\\x00\\x00")
>>> data[3] = 0b00000001
>>> data
bytearray(b'\\x00\\x00\\x00\\x01')
>>> client.db_write(1, 0, data)
"""
[docs] def __init__(self, lib_location: Optional[str] = None):
"""Creates a new `Client` instance.
Args:
lib_location: Full path to the snap7.dll file. Optional.
Examples:
>>> import snap7
>>> client = snap7.client.Client() # If the `snap7.dll` file is in the path location
>>> client = snap7.client.Client(lib_location="/path/to/snap7.dll") # If the `snap7.dll` file is in another location
>>> client
<snap7.client.Client object at 0x0000028B257128E0>
"""
self._read_callback = None
self._callback = None
self._pointer = None
self._library = load_library(lib_location)
self.create()
def __del__(self):
self.destroy()
[docs] def create(self):
"""Creates a SNAP7 client.
"""
logger.info("creating snap7 client")
self._library.Cli_Create.restype = c_void_p
self._pointer = S7Object(self._library.Cli_Create())
[docs] def destroy(self) -> Optional[int]:
"""Destroys the Client object.
Returns:
Error code from snap7 library.
Examples:
>>> client.destroy()
640719840
"""
logger.info("destroying snap7 client")
if self._pointer:
return self._library.Cli_Destroy(byref(self._pointer))
self._pointer = None
return None
[docs] def plc_stop(self) -> int:
"""Puts the CPU in STOP mode
Returns:
Error code from snap7 library.
"""
logger.info("stopping plc")
return self._library.Cli_PlcStop(self._pointer)
[docs] def plc_cold_start(self) -> int:
"""Puts the CPU in RUN mode performing a COLD START.
Returns:
Error code from snap7 library.
"""
logger.info("cold starting plc")
return self._library.Cli_PlcColdStart(self._pointer)
[docs] def plc_hot_start(self) -> int:
"""Puts the CPU in RUN mode performing an HOT START.
Returns:
Error code from snap7 library.
"""
logger.info("hot starting plc")
return self._library.Cli_PlcHotStart(self._pointer)
[docs] def get_cpu_state(self) -> str:
"""Returns the CPU status (running/stopped)
Returns:
Description of the cpu state.
Raises:
:obj:`ValueError`: if the cpu state is invalid.
Examples:
>>> client.get_cpu_statE()
'S7CpuStatusRun'
"""
state = c_int(0)
self._library.Cli_GetPlcStatus(self._pointer, byref(state))
try:
status_string = cpu_statuses[state.value]
except KeyError:
raise ValueError(f"The cpu state ({state.value}) is invalid")
logger.debug(f"CPU state is {status_string}")
return status_string
[docs] def get_cpu_info(self) -> S7CpuInfo:
"""Returns some information about the AG.
Returns:
:obj:`S7CpuInfo`: data structure with the information.
Examples:
>>> cpu_info = client.get_cpu_info()
>>> print(cpu_info)
"<S7CpuInfo ModuleTypeName: b'CPU 315-2 PN/DP'
SerialNumber: b'S C-C2UR28922012'
ASName: b'SNAP7-SERVER' Copyright: b'Original Siemens Equipment'
ModuleName: b'CPU 315-2 PN/DP'>
"""
info = S7CpuInfo()
result = self._library.Cli_GetCpuInfo(self._pointer, byref(info))
check_error(result, context="client")
return info
@error_wrap
def disconnect(self) -> int:
"""Disconnect a client.
Returns:
Error code from snap7 library.
"""
logger.info("disconnecting snap7 client")
return self._library.Cli_Disconnect(self._pointer)
@error_wrap
def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int:
"""Connects a Client Object to a PLC.
Args:
address: IP address of the PLC.
rack: rack number where the PLC is located.
slot: slot number where the CPU is located.
tcpport: port of the PLC.
Returns:
Error code from snap7 library.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0) # port is implicit = 102.
"""
logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}")
self.set_param(RemotePort, tcpport)
return self._library.Cli_ConnectTo(
self._pointer, c_char_p(address.encode()),
c_int(rack), c_int(slot))
[docs] def db_read(self, db_number: int, start: int, size: int) -> bytearray:
"""Reads a part of a DB from a PLC
Note:
Use it only for reading DBs, not Marks, Inputs, Outputs.
Args:
db_number: number of the DB to be read.
start: byte index from where is start to read from.
size: amount of bytes to be read.
Returns:
Buffer read.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = client.db_read(1, 10, 4) # reads the db number 1 starting from the byte 10 until byte 14.
>>> buffer
bytearray(b'\\x00\\x00')
"""
logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}")
type_ = wordlen_to_ctypes[WordLen.Byte.value]
data = (type_ * size)()
result = (self._library.Cli_DBRead(
self._pointer, db_number, start, size,
byref(data)))
check_error(result, context="client")
return bytearray(data)
@error_wrap
def db_write(self, db_number: int, start: int, data: bytearray) -> int:
"""Writes a part of a DB into a PLC.
Args:
db_number: number of the DB to be read.
start: byte index to start writing to.
data: buffer to be write.
Returns:
Buffer written.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = bytearray([0b00000001])
>>> client.db_write(1, 10, buffer) # writes the bit number 0 from the byte 10 to TRUE.
"""
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[wordlen.value]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}")
return self._library.Cli_DBWrite(self._pointer, db_number, start, size,
byref(cdata))
[docs] def delete(self, block_type: str, block_num: int) -> int:
"""Delete a block into AG.
Args:
block_type: type of block.
block_num: block number.
Returns:
Error code from snap7 library.
"""
logger.info("deleting block")
blocktype = block_types[block_type]
result = self._library.Cli_Delete(self._pointer, blocktype, block_num)
return result
[docs] def full_upload(self, _type: str, block_num: int) -> Tuple[bytearray, int]:
"""Uploads a block from AG with Header and Footer infos.
The whole block (including header and footer) is copied into the user
buffer.
Args:
_type: type of block.
block_num: number of block.
Returns:
Tuple of the buffer and size.
"""
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
block_type = block_types[_type]
result = self._library.Cli_FullUpload(self._pointer, block_type,
block_num, byref(_buffer),
byref(size))
check_error(result, context="client")
return bytearray(_buffer)[:size.value], size.value
[docs] def upload(self, block_num: int) -> bytearray:
"""Uploads a block from AG.
Note:
Upload means from the PLC to the PC.
Args:
block_num: block to be upload.
Returns:
Buffer with the uploaded block.
"""
logger.debug(f"db_upload block_num: {block_num}")
block_type = block_types['DB']
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
result = self._library.Cli_Upload(self._pointer, block_type, block_num,
byref(_buffer), byref(size))
check_error(result, context="client")
logger.info(f'received {size} bytes')
return bytearray(_buffer)
@error_wrap
def download(self, data: bytearray, block_num: int = -1) -> int:
"""Download a block into AG.
A whole block (including header and footer) must be available into the
user buffer.
Note:
Download means from the PC to the PLC.
Args:
data: buffer data.
block_num: new block number.
Returns:
Error code from snap7 library.
"""
type_ = c_byte
size = len(data)
cdata = (type_ * len(data)).from_buffer_copy(data)
return self._library.Cli_Download(self._pointer, block_num,
byref(cdata), size)
[docs] def db_get(self, db_number: int) -> bytearray:
"""Uploads a DB from AG using DBRead.
Note:
This method can't be use for 1200/1500 PLCs.
Args:
db_number: db number to be read from.
Returns:
Buffer with the data read.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = client.db_get(1) # reads the db number 1.
>>> buffer
bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00...<truncated>\\x00\\x00")
"""
logger.debug(f"db_get db_number: {db_number}")
_buffer = buffer_type()
result = self._library.Cli_DBGet(
self._pointer, db_number, byref(_buffer),
byref(c_int(buffer_size)))
check_error(result, context="client")
return bytearray(_buffer)
[docs] def read_area(self, area: Areas, dbnumber: int, start: int, size: int) -> bytearray:
"""Reads a data area from a PLC
With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
Args:
area: area to be read from.
dbnumber: number of the db to be read from. In case of Inputs, Marks or Outputs, this should be equal to 0.
start: byte index to start reading.
size: number of bytes to read.
Returns:
Buffer with the data read.
Raises:
:obj:`ValueError`: if the area is not defined in the `Areas`
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = client.read_area(Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14.
>>> buffer
bytearray(b'\\x00\\x00')
"""
if area not in Areas:
raise ValueError(f"{area} is not implemented in types")
elif area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[wordlen.value]
logger.debug(f"reading area: {area.name} dbnumber: {dbnumber} start: {start}: amount {size}: wordlen: {wordlen.name}={wordlen.value}")
data = (type_ * size)()
result = self._library.Cli_ReadArea(self._pointer, area.value, dbnumber, start,
size, wordlen.value, byref(data))
check_error(result, context="client")
return bytearray(data)
@error_wrap
def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) -> int:
"""Writes a data area into a PLC.
Args:
area: area to be write.
dbnumber: number of the db to be write to. In case of Inputs, Marks or Outputs, this should be equal to 0.
start: byte index to start writting.
data: buffer to be write.
Returns:
Snap7 error code.
Exmaple:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = bytearray([0b00000001])
>>> client.write_area(Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE.
"""
if area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[WordLen.Byte.value]
size = len(data)
logger.debug(f"writing area: {area.name} dbnumber: {dbnumber} start: {start}: size {size}: "
f"wordlen {wordlen.name}={wordlen.value} type: {type_}")
cdata = (type_ * len(data)).from_buffer_copy(data)
return self._library.Cli_WriteArea(self._pointer, area.value, dbnumber, start,
size, wordlen.value, byref(cdata))
[docs] def read_multi_vars(self, items) -> Tuple[int, S7DataItem]:
"""Reads different kind of variables from a PLC simultaneously.
Args:
items: list of items to be read.
Returns:
Tuple with the return code from the snap7 library and the list of items.
"""
result = self._library.Cli_ReadMultiVars(self._pointer, byref(items),
c_int32(len(items)))
check_error(result, context="client")
return result, items
[docs] def list_blocks(self) -> BlocksList:
"""Returns the AG blocks amount divided by type.
Returns:
Block list structure object.
Examples:
>>> block_list = client.list_blocks()
>>> print(block_list)
<block list count OB: 0 FB: 0 FC: 0 SFB: 0 SFC: 0x0 DB: 1 SDB: 0>
"""
logger.debug("listing blocks")
blocksList = BlocksList()
result = self._library.Cli_ListBlocks(self._pointer, byref(blocksList))
check_error(result, context="client")
logger.debug(f"blocks: {blocksList}")
return blocksList
[docs] def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array]:
"""This function returns the AG list of a specified block type.
Args:
blocktype: specified block type.
size: size of the block type.
Returns:
If size is 0, it returns a 0, otherwise an `Array` of specified block type.
Raises:
:obj:`ValueError`: if the `blocktype` is not valid.
"""
_blocktype = block_types.get(blocktype)
if not _blocktype:
raise ValueError("The blocktype parameter was invalid")
logger.debug(f"listing blocks of type: {_blocktype} size: {size}")
if size == 0:
return 0
data = (c_uint16 * size)()
count = c_int(size)
result = self._library.Cli_ListBlocksOfType(
self._pointer, _blocktype,
byref(data),
byref(count))
logger.debug(f"number of items found: {count}")
check_error(result, context="client")
return data
[docs] def get_block_info(self, blocktype: str, db_number: int) -> TS7BlockInfo:
"""Returns detailed information about a block present in AG.
Args:
blocktype: specified block type.
db_number: number of db to get information from.
Returns:
Structure of information from block.
Raises:
:obj:`ValueError`: if the `blocktype` is not valid.
Examples:
>>> block_info = client.get_block_info("DB", 1)
>>> print(block_info)
Block type: 10
Block number: 1
Block language: 5
Block flags: 1
MC7Size: 100
Load memory size: 192
Local data: 0
SBB Length: 20
Checksum: 0
Version: 1
Code date: b'1999/11/17'
Interface date: b'1999/11/17'
Author: b''
Family: b''
Header: b''
"""
blocktype_ = block_types.get(blocktype)
if not blocktype_:
raise ValueError("The blocktype parameter was invalid")
logger.debug(f"retrieving block info for block {db_number} of type {blocktype_}")
data = TS7BlockInfo()
result = self._library.Cli_GetAgBlockInfo(self._pointer, blocktype_, db_number, byref(data))
check_error(result, context="client")
return data
@error_wrap
def set_session_password(self, password: str) -> int:
"""Send the password to the PLC to meet its security level.
Args:
password: password to set.
Returns:
Snap7 code.
Raises:
:obj:`ValueError`: if the length of the `password` is more than 8 characters.
"""
if len(password) > 8:
raise ValueError("Maximum password length is 8")
return self._library.Cli_SetSessionPassword(self._pointer,
c_char_p(password.encode()))
@error_wrap
def clear_session_password(self) -> int:
"""Clears the password set for the current session (logout).
Returns:
Snap7 code.
"""
return self._library.Cli_ClearSessionPassword(self._pointer)
[docs] def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int) -> None:
"""Sets internally (IP, LocalTSAP, RemoteTSAP) Coordinates.
Note:
This function must be called just before `Cli_Connect()`.
Args:
address: PLC/Equipment IPV4 Address, for example "192.168.1.12"
local_tsap: Local TSAP (PC TSAP)
remote_tsap: Remote TSAP (PLC TSAP)
Raises:
:obj:`ValueError`: if the `address` is not a valid IPV4.
:obj:`ValueError`: if the result of setting the connection params is
different than 0.
"""
if not re.match(ipv4, address):
raise ValueError(f"{address} is invalid ipv4")
result = self._library.Cli_SetConnectionParams(self._pointer, address,
c_uint16(local_tsap),
c_uint16(remote_tsap))
if result != 0:
raise ValueError("The parameter was invalid")
[docs] def set_connection_type(self, connection_type: int):
""" Sets the connection resource type, i.e the way in which the Clients connects to a PLC.
Args:
connection_type: 1 for PG, 2 for OP, 3 to 10 for S7 Basic
Raises:
:obj:`ValueError`: if the result of setting the connection type is
different than 0.
"""
result = self._library.Cli_SetConnectionType(self._pointer,
c_uint16(connection_type))
if result != 0:
raise ValueError("The parameter was invalid")
[docs] def get_connected(self) -> bool:
"""Returns the connection status
Note:
Sometimes returns True, while connection is lost.
Returns:
True if is connected, otherwise false.
"""
connected = c_int32()
result = self._library.Cli_GetConnected(self._pointer, byref(connected))
check_error(result, context="client")
return bool(connected)
[docs] def ab_read(self, start: int, size: int) -> bytearray:
"""Reads a part of IPU area from a PLC.
Args:
start: byte index from where start to read.
size: amount of bytes to read.
Returns:
Buffer with the data read.
"""
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[wordlen.value]
data = (type_ * size)()
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._library.Cli_ABRead(self._pointer, start, size,
byref(data))
check_error(result, context="client")
return bytearray(data)
[docs] def ab_write(self, start: int, data: bytearray) -> int:
"""Writes a part of IPU area into a PLC.
Args:
start: byte index from where start to write.
data: buffer with the data to be written.
Returns:
Snap7 code.
"""
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[wordlen.value]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
return self._library.Cli_ABWrite(
self._pointer, start, size, byref(cdata))
[docs] def as_ab_read(self, start: int, size: int, data) -> int:
"""Reads a part of IPU area from a PLC asynchronously.
Args:
start: byte index from where start to read.
size: amount of bytes to read.
data: buffer where the data will be place.
Returns:
Snap7 code.
"""
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._library.Cli_AsABRead(self._pointer, start, size,
byref(data))
check_error(result, context="client")
return result
[docs] def as_ab_write(self, start: int, data: bytearray) -> int:
"""Writes a part of IPU area into a PLC asynchronously.
Args:
start: byte index from where start to write.
data: buffer with the data to be written.
Returns:
Snap7 code.
"""
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[wordlen.value]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
result = self._library.Cli_AsABWrite(
self._pointer, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs] def as_compress(self, time: int) -> int:
""" Performs the Compress action asynchronously.
Args:
time: timeout.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsCompress(self._pointer, time)
check_error(result, context="client")
return result
[docs] def as_copy_ram_to_rom(self, timeout: int = 1) -> int:
"""Performs the Copy Ram to Rom action asynchronously.
Args:
timeout: time to wait unly fail.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsCopyRamToRom(self._pointer, timeout)
check_error(result, context="client")
return result
[docs] def as_ct_read(self, start: int, amount: int, data) -> int:
"""Reads counters from a PLC asynchronously.
Args:
start: byte index to start to read from.
amount: amount of bytes to read.
data: buffer where the value read will be place.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsCTRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return result
[docs] def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write counters into a PLC.
Args:
start: byte index to start to write from.
amount: amount of bytes to write.
data: buffer to be write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Counter.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_AsCTWrite(self._pointer, start, amount, byref(cdata))
check_error(result, context="client")
return result
[docs] def as_db_fill(self, db_number: int, filler) -> int:
"""Fills a DB in AG with a given byte.
Args:
db_number: number of DB to fill.
filler: buffer to fill with.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsDBFill(self._pointer, db_number, filler)
check_error(result, context="client")
return result
[docs] def as_db_get(self, db_number: int, _buffer, size) -> bytearray:
"""Uploads a DB from AG using DBRead.
Note:
This method will not work in 1200/1500.
Args:
db_number: number of DB to get.
_buffer: buffer where the data read will be place.
size: amount of bytes to be read.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsDBGet(self._pointer, db_number, byref(_buffer), byref(size))
check_error(result, context="client")
return result
[docs] def as_db_read(self, db_number: int, start: int, size: int, data) -> Array:
"""Reads a part of a DB from a PLC.
Args:
db_number: number of DB to be read.
start: byte index from where start to read from.
size: amount of bytes to read.
data: buffer where the data read will be place.
Returns:
Snap7 code.
Examples:
>>> import ctypes
>>> data = (ctypes.c_uint8 * size_to_read)() # In this ctypes array data will be stored.
>>> result = client.as_db_read(1, 0, size_to_read, data)
>>> result # 0 = success
0
"""
result = self._library.Cli_AsDBRead(self._pointer, db_number, start, size, byref(data))
check_error(result, context="client")
return result
[docs] def as_db_write(self, db_number: int, start: int, size: int, data) -> int:
"""Writes a part of a DB into a PLC.
Args:
db_number: number of DB to be write.
start: byte index from where start to write to.
size: amount of bytes to write.
data: buffer to be write.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsDBWrite(self._pointer, db_number, start, size, byref(data))
check_error(result, context="client")
return result
[docs] def as_download(self, data: bytearray, block_num: int) -> int:
"""Download a block into AG asynchronously.
Note:
A whole block (including header and footer) must be available into the user buffer.
Args:
block_num: new block number.
data: buffer where the data will be place.
Returns:
Snap7 code.
"""
size = len(data)
type_ = c_byte * len(data)
cdata = type_.from_buffer_copy(data)
result = self._library.Cli_AsDownload(self._pointer, block_num, byref(cdata), size)
check_error(result)
return result
@error_wrap
def compress(self, time: int) -> int:
"""Performs the Compress action.
Args:
time: timeout.
Returns:
Snap7 code.
"""
return self._library.Cli_Compress(self._pointer, time)
@error_wrap
def set_param(self, number: int, value: int) -> int:
"""Writes an internal Server Parameter.
Args:
number: number of argument to be written.
value: value to be written.
Returns:
Snap7 code.
"""
logger.debug(f"setting param number {number} to {value}")
type_ = param_types[number]
return self._library.Cli_SetParam(self._pointer, number, byref(type_(value)))
[docs] def get_param(self, number: int) -> int:
"""Reads an internal Server parameter.
Args:
number: number of argument to be read.
Return:
Value of the param read.
"""
logger.debug(f"retreiving param number {number}")
type_ = param_types[number]
value = type_()
code = self._library.Cli_GetParam(self._pointer, c_int(number), byref(value))
check_error(code)
return value.value
[docs] def get_pdu_length(self) -> int:
"""Returns info about the PDU length (requested and negotiated).
Returns:
PDU length.
Examples:
>>> client.get_pdu_length()
480
"""
logger.info("getting PDU length")
requested_ = c_uint16()
negotiated_ = c_uint16()
code = self._library.Cli_GetPduLength(self._pointer, byref(requested_), byref(negotiated_))
check_error(code)
return negotiated_.value
[docs] def get_plc_datetime(self) -> datetime:
"""Returns the PLC date/time.
Returns:
Date and time as datetime
Examples:
>>> client.get_plc_datetime()
datetime.datetime(2021, 4, 6, 12, 12, 36)
"""
type_ = c_int32
buffer = (type_ * 9)()
result = self._library.Cli_GetPlcDateTime(self._pointer, byref(buffer))
check_error(result, context="client")
return datetime(
year=buffer[5] + 1900,
month=buffer[4] + 1,
day=buffer[3],
hour=buffer[2],
minute=buffer[1],
second=buffer[0]
)
@error_wrap
def set_plc_datetime(self, dt: datetime) -> int:
"""Sets the PLC date/time with a given value.
Args:
dt: datetime to be set.
Returns:
Snap7 code.
"""
type_ = c_int32
buffer = (type_ * 9)()
buffer[0] = dt.second
buffer[1] = dt.minute
buffer[2] = dt.hour
buffer[3] = dt.day
buffer[4] = dt.month - 1
buffer[5] = dt.year - 1900
return self._library.Cli_SetPlcDateTime(self._pointer, byref(buffer))
[docs] def check_as_completion(self, p_value) -> int:
"""Method to check Status of an async request. Result contains if the check was successful, not the data value itself
Args:
p_value: Pointer where result of this check shall be written.
Returns:
Snap7 code. If 0 - Job is done successfully. If 1 - Job is either pending or contains s7errors
"""
result = self._library.Cli_CheckAsCompletion(self._pointer, p_value)
check_error(result, context="client")
return result
def set_as_callback(self, pfn_clicompletion, p_usr):
# Cli_SetAsCallback
result = self._library.Cli_SetAsCallback(self._pointer, pfn_clicompletion, p_usr)
check_error(result, context='client')
return result
[docs] def wait_as_completion(self, timeout: int) -> int:
"""Snap7 Cli_WaitAsCompletion representative.
Args:
timeout: ms to wait for async job
Returns:
Snap7 code.
"""
# Cli_WaitAsCompletion
result = self._library.Cli_WaitAsCompletion(self._pointer, c_ulong(timeout))
check_error(result, context="client")
return result
def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array]:
if area not in Areas:
raise ValueError(f"{area} is not implemented in types")
elif area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[wordlen.value]
usrdata = (type_ * size)()
return wordlen, usrdata
[docs] def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata) -> int:
"""Reads a data area from a PLC asynchronously.
With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
Args:
area: memory area to be read from.
dbnumber: The DB number, only used when area=Areas.DB
start: offset to start writing
size: number of units to read
pusrdata: buffer where the data will be place.
wordlen: length of the word to be read.
Returns:
Snap7 code.
"""
logger.debug(f"reading area: {area.name} dbnumber: {dbnumber} start: {start}: amount {size}: wordlen: {wordlen.name}={wordlen.value}")
result = self._library.Cli_AsReadArea(self._pointer, area.value, dbnumber, start, size, wordlen.value, pusrdata)
check_error(result, context="client")
return result
def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array]:
if area not in Areas:
raise ValueError(f"{area} is not implemented in types")
elif area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * len(data)).from_buffer_copy(data)
return wordlen, cdata
[docs] def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata) -> int:
"""Writes a data area into a PLC asynchronously.
Args:
area: memory area to be written.
dbnumber: The DB number, only used when area=Areas.DB
start: offset to start writing.
size: amount of bytes to be written.
wordlen: length of the word to be written.
pusrdata: buffer to be written.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
logger.debug(f"writing area: {area.name} dbnumber: {dbnumber} start: {start}: size {size}: "
f"wordlen {wordlen} type: {type_}")
cdata = (type_ * len(pusrdata)).from_buffer_copy(pusrdata)
res = self._library.Cli_AsWriteArea(self._pointer, area.value, dbnumber, start, size, wordlen.value, byref(cdata))
check_error(res, context="client")
return res
[docs] def as_eb_read(self, start: int, size: int, data) -> int:
"""Reads a part of IPI area from a PLC asynchronously.
Args:
start: byte index from where to start reading from.
size: amount of bytes to read.
data: buffer where the data read will be place.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsEBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return result
[docs] def as_eb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of IPI area into a PLC.
Args:
start: byte index from where to start writing from.
size: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_AsEBWrite(self._pointer, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs] def as_full_upload(self, _type: str, block_num: int) -> int:
"""Uploads a block from AG with Header and Footer infos.
Note:
Upload means from PLC to PC.
Args:
_type: type of block.
block_num: number of block to upload.
Returns:
Snap7 code.
"""
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
block_type = block_types[_type]
result = self._library.Cli_AsFullUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
[docs] def as_list_blocks_of_type(self, blocktype: str, data, count) -> int:
"""Returns the AG blocks list of a given type.
Args:
blocktype: block type.
data: buffer where the data will be place.
count: pass.
Returns:
Snap7 code.
Raises:
:obj:`ValueError`: if the `blocktype` is invalid
"""
_blocktype = block_types.get(blocktype)
if not _blocktype:
raise ValueError("The blocktype parameter was invalid")
result = self._library.Cli_AsListBlocksOfType(self._pointer, _blocktype, byref(data), byref(count))
check_error(result, context="client")
return result
[docs] def as_mb_read(self, start: int, size: int, data) -> int:
"""Reads a part of Merkers area from a PLC.
Args:
start: byte index from where to start to read from.
size: amount of byte to read.
data: buffer where the data read will be place.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsMBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return result
[docs] def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of Merkers area into a PLC.
Args:
start: byte index from where to start to write to.
size: amount of byte to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_AsMBWrite(self._pointer, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs] def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int:
"""Reads a partial list of given ID and Index.
Args:
ssl_id: TODO
index: TODO
s7_szl: TODO
size: TODO
Returns:
Snap7 code.
"""
result = self._library.Cli_AsReadSZL(self._pointer, ssl_id, index, byref(s7_szl), byref(size))
check_error(result, context="client")
return result
[docs] def as_read_szl_list(self, szl_list, items_count) -> int:
"""Reads the list of partial lists available in the CPU.
Args:
szl_list: TODO
items_count: TODO
Returns:
Snap7 code.
"""
result = self._library.Cli_AsReadSZLList(self._pointer, byref(szl_list), byref(items_count))
check_error(result, context="client")
return result
[docs] def as_tm_read(self, start: int, amount: int, data) -> bytearray:
"""Reads timers from a PLC.
Args:
start: byte index to start read from.
amount: amount of bytes to read.
data: buffer where the data will be placed.
Returns:
Snap7 code.
"""
result = self._library.Cli_AsTMRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return result
[docs] def as_tm_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write timers into a PLC.
Args:
start: byte index to start writing to.
amount: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Timer.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_AsTMWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
return result
[docs] def as_upload(self, block_num: int, _buffer, size) -> int:
"""Uploads a block from AG.
Note:
Uploads means from PLC to PC.
Args:
block_num: block number to upload.
_buffer: buffer where the data will be place.
size: amount of bytes to uplaod.
Returns:
Snap7 code.
"""
block_type = block_types['DB']
result = self._library.Cli_AsUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
[docs] def copy_ram_to_rom(self, timeout: int = 1) -> int:
"""Performs the Copy Ram to Rom action.
Args:
timeout: timeout time.
Returns:
Snap7 code.
"""
result = self._library.Cli_CopyRamToRom(self._pointer, timeout)
check_error(result, context="client")
return result
[docs] def ct_read(self, start: int, amount: int) -> bytearray:
"""Reads counters from a PLC.
Args:
start: byte index to start read from.
amount: amount of bytes to read.
Returns:
Buffer read.
"""
type_ = wordlen_to_ctypes[WordLen.Counter.value]
data = (type_ * amount)()
result = self._library.Cli_CTRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs] def ct_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write counters into a PLC.
Args:
start: byte index to start write to.
amount: amount of bytes to write.
data: buffer data to write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Counter.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_CTWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
return result
[docs] def db_fill(self, db_number: int, filler: int) -> int:
"""Fills a DB in AG with a given byte.
Args:
db_number: db number to fill.
filler: value filler.
Returns:
Snap7 code.
"""
result = self._library.Cli_DBFill(self._pointer, db_number, filler)
check_error(result)
return result
[docs] def eb_read(self, start: int, size: int) -> bytearray:
"""Reads a part of IPI area from a PLC.
Args:
start: byte index to start read from.
size: amount of bytes to read.
Returns:
Data read.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
data = (type_ * size)()
result = self._library.Cli_EBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs] def eb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of IPI area into a PLC.
Args:
start: byte index to be written.
size: amount of bytes to write.
data: data to write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_EBWrite(self._pointer, start, size, byref(cdata))
check_error(result)
return result
[docs] def error_text(self, error: int) -> str:
"""Returns a textual explanation of a given error number.
Args:
error: error number.
Returns:
Text error.
"""
text_length = c_int(256)
error_code = c_int32(error)
text = create_string_buffer(buffer_size)
response = self._library.Cli_ErrorText(error_code, byref(text), text_length)
check_error(response)
result = bytearray(text)[:text_length.value].decode().strip('\x00')
return result
[docs] def get_cp_info(self) -> S7CpInfo:
"""Returns some information about the CP (communication processor).
Returns:
Structure object containing the CP information.
"""
cp_info = S7CpInfo()
result = self._library.Cli_GetCpInfo(self._pointer, byref(cp_info))
check_error(result)
return cp_info
[docs] def get_exec_time(self) -> int:
"""Returns the last job execution time in milliseconds.
Returns:
Execution time value.
"""
time = c_int32()
result = self._library.Cli_GetExecTime(self._pointer, byref(time))
check_error(result)
return time.value
[docs] def get_last_error(self) -> int:
"""Returns the last job result.
Returns:
Returns the last error value.
"""
last_error = c_int32()
result = self._library.Cli_GetLastError(self._pointer, byref(last_error))
check_error(result)
return last_error.value
[docs] def get_order_code(self) -> S7OrderCode:
"""Returns the CPU order code.
Returns:
Order of the code in a structure object.
"""
order_code = S7OrderCode()
result = self._library.Cli_GetOrderCode(self._pointer, byref(order_code))
check_error(result)
return order_code
[docs] def get_pg_block_info(self, block: bytearray) -> TS7BlockInfo:
"""Returns detailed information about a block loaded in memory.
Args:
block: buffer where the data will be place.
Returns:
Structure object that contains the block information.
"""
block_info = TS7BlockInfo()
size = c_int(len(block))
buffer = (c_byte * len(block)).from_buffer_copy(block)
result = self._library.Cli_GetPgBlockInfo(self._pointer, byref(buffer), byref(block_info), size)
check_error(result)
return block_info
[docs] def get_protection(self) -> S7Protection:
"""Gets the CPU protection level info.
Returns:
Structure object with protection attributes.
"""
s7_protection = S7Protection()
result = self._library.Cli_GetProtection(self._pointer, byref(s7_protection))
check_error(result)
return s7_protection
[docs] def iso_exchange_buffer(self, data: bytearray) -> bytearray:
"""Exchanges a given S7 PDU (protocol data unit) with the CPU.
Args:
data: buffer to exchange.
Returns:
Snap7 code.
"""
size = c_int(len(data))
cdata = (c_byte * len(data)).from_buffer_copy(data)
response = self._library.Cli_IsoExchangeBuffer(self._pointer, byref(cdata), byref(size))
check_error(response)
result = bytearray(cdata)[:size.value]
return result
[docs] def mb_read(self, start: int, size: int) -> bytearray:
"""Reads a part of Merkers area from a PLC.
Args:
start: byte index to be read from.
size: amount of bytes to read.
Returns:
Buffer with the data read.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
data = (type_ * size)()
result = self._library.Cli_MBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs] def mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of Merkers area into a PLC.
Args:
start: byte index to be written.
size: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_MBWrite(self._pointer, start, size, byref(cdata))
check_error(result)
return result
[docs] def read_szl(self, ssl_id: int, index: int = 0x0000) -> S7SZL:
"""Reads a partial list of given ID and Index.
Args:
ssl_id: ssl id to be read.
index: index to be read.
Returns:
SZL structure object.
"""
s7_szl = S7SZL()
size = c_int(sizeof(s7_szl))
result = self._library.Cli_ReadSZL(self._pointer, ssl_id, index, byref(s7_szl), byref(size))
check_error(result, context="client")
return s7_szl
[docs] def read_szl_list(self) -> bytearray:
"""Reads the list of partial lists available in the CPU.
Returns:
Buffer read.
"""
szl_list = S7SZLList()
items_count = c_int(sizeof(szl_list))
response = self._library.Cli_ReadSZLList(self._pointer, byref(szl_list), byref(items_count))
check_error(response, context="client")
result = bytearray(szl_list.List)[:items_count.value]
return result
[docs] def set_plc_system_datetime(self) -> int:
"""Sets the PLC date/time with the host (PC) date/time.
Returns:
Snap7 code.
"""
result = self._library.Cli_SetPlcSystemDateTime(self._pointer)
check_error(result)
return result
[docs] def tm_read(self, start: int, amount: int) -> bytearray:
"""Reads timers from a PLC.
Args:
start: byte index from where is start to read from.
amount: amount of byte to be read.
Returns:
Buffer read.
"""
wordlen = WordLen.Timer
type_ = wordlen_to_ctypes[wordlen.value]
data = (type_ * amount)()
result = self._library.Cli_TMRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs] def tm_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write timers into a PLC.
Args:
start: byte index from where is start to write to.
amount: amount of byte to be written.
data: data to be write.
Returns:
Snap7 code.
"""
wordlen = WordLen.Timer
type_ = wordlen_to_ctypes[wordlen.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_TMWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
return result
[docs] def write_multi_vars(self, items: List[S7DataItem]) -> int:
"""Writes different kind of variables into a PLC simultaneously.
Args:
items: list of items to be written.
Returns:
Snap7 code.
"""
items_count = c_int32(len(items))
data = bytearray()
for item in items:
data += bytearray(item)
cdata = (S7DataItem * len(items)).from_buffer_copy(data)
result = self._library.Cli_WriteMultiVars(self._pointer, byref(cdata), items_count)
check_error(result, context="client")
return result