"""
Snap7 client used for connection to a siemens7 server.
"""
import logging
import re
from ctypes import c_int, c_char_p, byref, sizeof, c_uint16, c_int32, c_byte, c_ulong, Array
from ctypes import c_void_p, create_string_buffer
from datetime import datetime
from typing import Union, Tuple, Optional, List
import snap7
from snap7.common import check_error, load_library, ipv4
from snap7.exceptions import Snap7Exception
from snap7.types import S7Object, buffer_type, buffer_size, BlocksList, S7CpuInfo, S7DataItem, S7SZL, S7OrderCode, \
S7Protection, S7SZLList, S7CpInfo
from snap7.types import TS7BlockInfo, param_types, cpu_statuses
logger = logging.getLogger(__name__)
[docs]def error_wrap(func):
"""Parses a s7 error code returned the decorated function."""
def f(*args, **kw):
code = func(*args, **kw)
check_error(code, context="client")
return f
[docs]class Client:
"""
A snap7 client
"""
def __init__(self):
self._read_callback = None
self._callback = None
self._pointer = None
self._library = load_library()
self.create()
def __del__(self):
self.destroy()
[docs] def create(self):
"""
create a SNAP7 client.
"""
logger.info("creating snap7 client")
self._library.Cli_Create.restype = c_void_p
self._pointer = S7Object(self._library.Cli_Create())
[docs] def destroy(self) -> Optional[int]:
"""
destroy a client.
"""
logger.info("destroying snap7 client")
if self._pointer:
return self._library.Cli_Destroy(byref(self._pointer))
self._pointer = None
return None
[docs] def plc_stop(self) -> int:
"""
stops a client
"""
logger.info("stopping plc")
return self._library.Cli_PlcStop(self._pointer)
[docs] def plc_cold_start(self) -> int:
"""
cold starts a client
"""
logger.info("cold starting plc")
return self._library.Cli_PlcColdStart(self._pointer)
[docs] def plc_hot_start(self) -> int:
"""
hot starts a client
"""
logger.info("hot starting plc")
return self._library.Cli_PlcHotStart(self._pointer)
[docs] def get_cpu_state(self) -> str:
"""
Retrieves CPU state from client
"""
state = c_int(0)
self._library.Cli_GetPlcStatus(self._pointer, byref(state))
try:
status_string = cpu_statuses[state.value]
except KeyError:
raise Snap7Exception(f"The cpu state ({state.value}) is invalid")
logger.debug(f"CPU state is {status_string}")
return status_string
[docs] def get_cpu_info(self) -> S7CpuInfo:
"""
Retrieves CPU info from client
"""
info = snap7.types.S7CpuInfo()
result = self._library.Cli_GetCpuInfo(self._pointer, byref(info))
check_error(result, context="client")
return info
@error_wrap
def disconnect(self) -> int:
"""
disconnect a client.
"""
logger.info("disconnecting snap7 client")
return self._library.Cli_Disconnect(self._pointer)
@error_wrap
def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int:
"""
Connect to a S7 server.
:param address: IP address of server
:param rack: rack on server
:param slot: slot on server.
:param tcpport: port of server, 102 is standard
"""
logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}")
self.set_param(snap7.types.RemotePort, tcpport)
return self._library.Cli_ConnectTo(
self._pointer, c_char_p(address.encode()),
c_int(rack), c_int(slot))
[docs] def db_read(self, db_number: int, start: int, size: int) -> bytearray:
"""This is a lean function of Cli_ReadArea() to read PLC DB.
:returns: user buffer.
"""
logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}")
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
data = (type_ * size)()
result = (self._library.Cli_DBRead(
self._pointer, db_number, start, size,
byref(data)))
check_error(result, context="client")
return bytearray(data)
@error_wrap
def db_write(self, db_number: int, start: int, data: bytearray) -> int:
"""
Writes to a DB object.
:param start: write offset
:param data: bytearray
:param db_number: target DB number
"""
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[wordlen]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}")
return self._library.Cli_DBWrite(self._pointer, db_number, start, size,
byref(cdata))
[docs] def delete(self, block_type: str, block_num: int) -> int:
"""
Deletes a block
:param block_type: Type of block
:param block_num: Bloc number
"""
logger.info("deleting block")
blocktype = snap7.types.block_types[block_type]
result = self._library.Cli_Delete(self._pointer, blocktype, block_num)
return result
[docs] def full_upload(self, _type: str, block_num: int) -> Tuple[bytearray, int]:
"""
Uploads a full block body from AG.
The whole block (including header and footer) is copied into the user
buffer.
:param block_num: Number of Block
"""
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
block_type = snap7.types.block_types[_type]
result = self._library.Cli_FullUpload(self._pointer, block_type,
block_num, byref(_buffer),
byref(size))
check_error(result, context="client")
return bytearray(_buffer)[:size.value], size.value
[docs] def upload(self, block_num: int) -> bytearray:
"""
Uploads a block body from AG
:param block_num: bytearray
"""
logger.debug(f"db_upload block_num: {block_num}")
block_type = snap7.types.block_types['DB']
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
result = self._library.Cli_Upload(self._pointer, block_type, block_num,
byref(_buffer), byref(size))
check_error(result, context="client")
logger.info(f'received {size} bytes')
return bytearray(_buffer)
@error_wrap
def download(self, data: bytearray, block_num: int = -1) -> int:
"""
Downloads a DB data into the AG.
A whole block (including header and footer) must be available into the
user buffer.
:param block_num: New Block number (or -1)
:param data: the user buffer
"""
type_ = c_byte
size = len(data)
cdata = (type_ * len(data)).from_buffer_copy(data)
return self._library.Cli_Download(self._pointer, block_num,
byref(cdata), size)
[docs] def db_get(self, db_number: int) -> bytearray:
"""Uploads a DB from AG.
"""
logger.debug(f"db_get db_number: {db_number}")
_buffer = buffer_type()
result = self._library.Cli_DBGet(
self._pointer, db_number, byref(_buffer),
byref(c_int(buffer_size)))
check_error(result, context="client")
return bytearray(_buffer)
[docs] def read_area(self, area: str, dbnumber: int, start: int, size: int) -> bytearray:
"""This is the main function to read data from a PLC.
With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
:param dbnumber: The DB number, only used when area= S7AreaDB
:param start: offset to start writing
:param size: number of units to read
"""
assert area in snap7.types.areas.values()
if area == snap7.types.S7AreaTM:
wordlen = snap7.types.S7WLTimer
elif area == snap7.types.S7AreaCT:
wordlen = snap7.types.S7WLCounter
else:
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[wordlen]
logger.debug(f"reading area: {area} dbnumber: {dbnumber} start: {start}: amount {size}: wordlen: {wordlen}")
data = (type_ * size)()
result = self._library.Cli_ReadArea(self._pointer, area, dbnumber, start,
size, wordlen, byref(data))
check_error(result, context="client")
return bytearray(data)
@error_wrap
def write_area(self, area: str, dbnumber: int, start: int, data: bytearray) -> int:
"""This is the main function to write data into a PLC. It's the
complementary function of Cli_ReadArea(), the parameters and their
meanings are the same. The only difference is that the data is
transferred from the buffer pointed by pUsrData into PLC.
:param dbnumber: The DB number, only used when area= S7AreaDB
:param start: offset to start writing
:param data: a bytearray containing the payload
"""
if area == snap7.types.S7AreaTM:
wordlen = snap7.types.S7WLTimer
elif area == snap7.types.S7AreaCT:
wordlen = snap7.types.S7WLCounter
else:
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
size = len(data)
logger.debug(f"writing area: {area} dbnumber: {dbnumber} start: {start}: size {size}: "
f"wordlen {wordlen} type: {type_}")
cdata = (type_ * len(data)).from_buffer_copy(data)
return self._library.Cli_WriteArea(self._pointer, area, dbnumber, start,
size, wordlen, byref(cdata))
[docs] def read_multi_vars(self, items) -> Tuple[int, S7DataItem]:
"""This function read multiple variables from the PLC.
:param items: list of S7DataItem objects
:returns: a tuple with the return code and a list of data items
"""
result = self._library.Cli_ReadMultiVars(self._pointer, byref(items),
c_int32(len(items)))
check_error(result, context="client")
return result, items
[docs] def list_blocks(self) -> BlocksList:
"""Returns the AG blocks amount divided by type.
:returns: a snap7.types.BlocksList object.
"""
logger.debug("listing blocks")
blocksList = BlocksList()
result = self._library.Cli_ListBlocks(self._pointer, byref(blocksList))
check_error(result, context="client")
logger.debug(f"blocks: {blocksList}")
return blocksList
[docs] def list_blocks_of_type(self, blocktype, size: int) -> Union[int, Array]:
"""This function returns the AG list of a specified block type."""
blocktype = snap7.types.block_types.get(blocktype)
if not blocktype:
raise Snap7Exception("The blocktype parameter was invalid")
logger.debug(f"listing blocks of type: {blocktype} size: {size}")
if size == 0:
return 0
data = (c_uint16 * size)()
count = c_int(size)
result = self._library.Cli_ListBlocksOfType(
self._pointer, blocktype,
byref(data),
byref(count))
logger.debug(f"number of items found: {count}")
check_error(result, context="client")
return data
[docs] def get_block_info(self, blocktype: str, db_number: int) -> TS7BlockInfo:
"""Returns the block information for the specified block."""
blocktype_ = snap7.types.block_types.get(blocktype)
if not blocktype_:
raise Snap7Exception("The blocktype parameter was invalid")
logger.debug(f"retrieving block info for block {db_number} of type {blocktype_}")
data = TS7BlockInfo()
result = self._library.Cli_GetAgBlockInfo(self._pointer, blocktype_, db_number, byref(data))
check_error(result, context="client")
return data
@error_wrap
def set_session_password(self, password: str) -> int:
"""Send the password to the PLC to meet its security level."""
assert len(password) <= 8, 'maximum password length is 8'
return self._library.Cli_SetSessionPassword(self._pointer,
c_char_p(password.encode()))
@error_wrap
def clear_session_password(self) -> int:
"""Clears the password set for the current session (logout)."""
return self._library.Cli_ClearSessionPassword(self._pointer)
[docs] def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int):
"""
Sets internally (IP, LocalTSAP, RemoteTSAP) Coordinates.
This function must be called just before Cli_Connect().
:param address: PLC/Equipment IPV4 Address, for example "192.168.1.12"
:param local_tsap: Local TSAP (PC TSAP)
:param remote_tsap: Remote TSAP (PLC TSAP)
"""
assert re.match(ipv4, address), f'{address} is invalid ipv4'
result = self._library.Cli_SetConnectionParams(self._pointer, address,
c_uint16(local_tsap),
c_uint16(remote_tsap))
if result != 0:
raise Snap7Exception("The parameter was invalid")
[docs] def set_connection_type(self, connection_type: int):
"""
Sets the connection resource type, i.e the way in which the Clients
connects to a PLC.
:param connection_type: 1 for PG, 2 for OP, 3 to 10 for S7 Basic
"""
result = self._library.Cli_SetConnectionType(self._pointer,
c_uint16(connection_type))
if result != 0:
raise Snap7Exception("The parameter was invalid")
[docs] def get_connected(self) -> bool:
"""
Returns the connection status
:returns: a boolean that indicates if connected.
"""
connected = c_int32()
result = self._library.Cli_GetConnected(self._pointer, byref(connected))
check_error(result, context="client")
return bool(connected)
[docs] def ab_read(self, start: int, size: int) -> bytearray:
"""
This is a lean function of Cli_ReadArea() to read PLC process outputs.
"""
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[wordlen]
data = (type_ * size)()
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._library.Cli_ABRead(self._pointer, start, size,
byref(data))
check_error(result, context="client")
return bytearray(data)
[docs] def ab_write(self, start: int, data: bytearray) -> int:
"""
This is a lean function of Cli_WriteArea() to write PLC process
outputs
"""
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[wordlen]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
return self._library.Cli_ABWrite(
self._pointer, start, size, byref(cdata))
[docs] def as_ab_read(self, start: int, size: int, data) -> int:
"""
This is the asynchronous counterpart of client.ab_read().
"""
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._library.Cli_AsABRead(self._pointer, start, size,
byref(data))
check_error(result, context="client")
return result
[docs] def as_ab_write(self, start: int, data: bytearray) -> int:
"""
This is the asynchronous counterpart of Cli_ABWrite.
"""
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[wordlen]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
result = self._library.Cli_AsABWrite(
self._pointer, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs] def as_compress(self, time: int) -> int:
"""
This is the asynchronous counterpart of client.compress().
"""
result = self._library.Cli_AsCompress(self._pointer, time)
check_error(result, context="client")
return result
def as_copy_ram_to_rom(self, timeout: int = 1) -> int:
result = self._library.Cli_AsCopyRamToRom(self._pointer)
check_error(result, context="client")
return result
def as_ct_read(self, start: int, amount: int, data) -> int:
# Cli_CTRead
result = self._library.Cli_AsCTRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return result
def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
# Cli_CTWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLCounter]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_AsCTWrite(self._pointer, start, amount, byref(cdata))
check_error(result, context="client")
return result
def as_db_fill(self, db_number: int, filler) -> int:
result = self._library.Cli_AsDBFill(self._pointer, db_number, filler)
check_error(result, context="client")
return result
[docs] def as_db_get(self, db_number: int, _buffer, size) -> bytearray:
"""
This is the asynchronous counterpart of Cli_DBGet.
"""
result = self._library.Cli_AsDBGet(self._pointer, db_number, byref(_buffer), byref(size))
check_error(result, context="client")
return result
def as_db_read(self, db_number: int, start: int, size: int, data) -> Array:
result = self._library.Cli_AsDBRead(self._pointer, db_number, start, size, byref(data))
check_error(result, context="client")
return result
def as_db_write(self, db_number: int, start: int, size: int, data) -> int:
result = self._library.Cli_AsDBWrite(self._pointer, db_number, start, size, byref(data))
check_error(result, context="client")
return result
[docs] def as_download(self, data: bytearray, block_num: int) -> int:
"""
Downloads a DB data into the AG asynchronously.
A whole block (including header and footer) must be available into the
user buffer.
:param block_num: New Block number (or -1)
:param data: the user buffer
"""
size = len(data)
type_ = c_byte * len(data)
cdata = type_.from_buffer_copy(data)
result = self._library.Cli_AsDownload(self._pointer, block_num, byref(cdata), size)
check_error(result)
return result
@error_wrap
def compress(self, time: int) -> int:
"""
Performs the Memory compress action.
:param time: Maximum time expected to complete the operation (ms).
"""
return self._library.Cli_Compress(self._pointer, time)
@error_wrap
def set_param(self, number: int, value: int) -> int:
"""Sets an internal Server object parameter.
"""
logger.debug(f"setting param number {number} to {value}")
type_ = param_types[number]
return self._library.Cli_SetParam(self._pointer, number, byref(type_(value)))
[docs] def get_param(self, number: int) -> int:
"""Reads an internal Client object parameter.
"""
logger.debug(f"retreiving param number {number}")
type_ = param_types[number]
value = type_()
code = self._library.Cli_GetParam(self._pointer, c_int(number), byref(value))
check_error(code)
return value.value
[docs] def get_pdu_length(self) -> int:
"""
Returns info about the PDU length.
"""
logger.info("getting PDU length")
requested_ = c_uint16()
negotiated_ = c_uint16()
code = self._library.Cli_GetPduLength(self._pointer, byref(requested_), byref(negotiated_))
check_error(code)
return negotiated_.value
[docs] def get_plc_datetime(self) -> datetime:
"""
Get date and time from PLC.
:return: date and time as datetime
"""
type_ = c_int32
buffer = (type_ * 9)()
result = self._library.Cli_GetPlcDateTime(self._pointer, byref(buffer))
check_error(result, context="client")
return datetime(
year=buffer[5] + 1900,
month=buffer[4] + 1,
day=buffer[3],
hour=buffer[2],
minute=buffer[1],
second=buffer[0]
)
@error_wrap
def set_plc_datetime(self, dt: datetime) -> int:
"""
Set date and time in PLC
:param dt: date and time as datetime
"""
type_ = c_int32
buffer = (type_ * 9)()
buffer[0] = dt.second
buffer[1] = dt.minute
buffer[2] = dt.hour
buffer[3] = dt.day
buffer[4] = dt.month - 1
buffer[5] = dt.year - 1900
return self._library.Cli_SetPlcDateTime(self._pointer, byref(buffer))
[docs] def check_as_completion(self, p_value) -> int:
"""
Method to check Status of an async request. Result contains if the check was successful,
not the data value itself
:param p_value: Pointer where result of this check shall be written.
:return: 0 - Job is done successfully
:return: 1 - Job is either pending or contains s7errors
"""
result = self._library.Cli_CheckAsCompletion(self._pointer, p_value)
check_error(result, context="client")
return result
def set_as_callback(self, pfn_clicompletion, p_usr):
# Cli_SetAsCallback
result = self._library.Cli_SetAsCallback(self._pointer, pfn_clicompletion, p_usr)
check_error(result, context='client')
return result
[docs] def wait_as_completion(self, timeout: int) -> int:
"""
Snap7 Cli_WaitAsCompletion representative.
:param timeout: ms to wait for async job
:return: Result of request in int format
"""
# Cli_WaitAsCompletion
result = self._library.Cli_WaitAsCompletion(self._pointer, c_ulong(timeout))
check_error(result, context="client")
return result
def _prepare_as_read_area(self, area: str, size: int) -> Tuple[int, Array]:
if area not in snap7.types.areas.values():
raise NotImplementedError(f"{area} is not implemented in snap7.types")
if area == snap7.types.S7AreaTM:
wordlen = snap7.types.S7WLTimer
elif area == snap7.types.S7AreaCT:
wordlen = snap7.types.S7WLCounter
else:
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[wordlen]
usrdata = (type_ * size)()
return wordlen, usrdata
[docs] def as_read_area(self, area: str, dbnumber: int, start: int, size: int, wordlen: int, pusrdata) -> int:
"""This is the main function to read data from a PLC.
With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
:param area: chosen memory_area
:param dbnumber: The DB number, only used when area= S7AreaDB
:param start: offset to start writing
:param size: number of units to read
:param pusrdata:
:param wordlen:
"""
logger.debug(f"reading area: {area} dbnumber: {dbnumber} start: {start}: amount {size}: wordlen: {wordlen}")
result = self._library.Cli_AsReadArea(self._pointer, area, dbnumber, start, size, wordlen, pusrdata)
check_error(result, context="client")
return result
def _prepare_as_write_area(self, area: str, data: bytearray) -> Tuple[int, Array]:
if area not in snap7.types.areas.values():
raise NotImplementedError(f"{area} is not implemented in snap7.types")
if area == snap7.types.S7AreaTM:
wordlen = snap7.types.S7WLTimer
elif area == snap7.types.S7AreaCT:
wordlen = snap7.types.S7WLCounter
else:
wordlen = snap7.types.S7WLByte
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
cdata = (type_ * len(data)).from_buffer_copy(data)
return wordlen, cdata
[docs] def as_write_area(self, area: str, dbnumber: int, start: int, size: int, wordlen: int, pusrdata) -> int:
"""This is the main function to write data into a PLC. It's the
complementary function of Cli_ReadArea(), the parameters and their
meanings are the same. The only difference is that the data is
transferred from the buffer pointed by pUsrData into PLC.
:param area: chosen memory_area
:param dbnumber: The DB number, only used when area= S7AreaDB
:param start: offset to start writing
"""
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
logger.debug(f"writing area: {area} dbnumber: {dbnumber} start: {start}: size {size}: "
f"wordlen {wordlen} type: {type_}")
cdata = (type_ * len(pusrdata)).from_buffer_copy(pusrdata)
res = self._library.Cli_AsWriteArea(self._pointer, area, dbnumber, start, size, wordlen, byref(cdata))
check_error(res, context="client")
return res
def as_eb_read(self, start: int, size: int, data) -> int:
# Cli_AsEBRead
result = self._library.Cli_AsEBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return result
def as_eb_write(self, start: int, size: int, data: bytearray) -> int:
# Cli_AsEBWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_AsEBWrite(self._pointer, start, size, byref(cdata))
check_error(result, context="client")
return result
def as_full_upload(self, _type: str, block_num: int) -> int:
# Cli_AsFullUpload
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
block_type = snap7.types.block_types[_type]
result = self._library.Cli_AsFullUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
def as_list_blocks_of_type(self, blocktype, data, count) -> int:
blocktype = snap7.types.block_types.get(blocktype)
if not blocktype:
raise Snap7Exception("The blocktype parameter was invalid")
result = self._library.Cli_AsListBlocksOfType(self._pointer, blocktype, byref(data), byref(count))
check_error(result, context="client")
return result
def as_mb_read(self, start: int, size: int, data) -> int:
# Cli_AsMBRead
result = self._library.Cli_AsMBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return result
def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
# Cli_AsMBWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_AsMBWrite(self._pointer, start, size, byref(cdata))
check_error(result, context="client")
return result
def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int:
# Cli_AsReadSZL
result = self._library.Cli_AsReadSZL(self._pointer, ssl_id, index, byref(s7_szl), byref(size))
check_error(result, context="client")
return result
def as_read_szl_list(self, szl_list, items_count) -> int:
# Cli_AsReadSZLList
result = self._library.Cli_AsReadSZLList(self._pointer, byref(szl_list), byref(items_count))
check_error(result, context="client")
return result
def as_tm_read(self, start: int, amount: int, data) -> bytearray:
# Cli_ASTMRead
result = self._library.Cli_AsTMRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return result
def as_tm_write(self, start: int, amount: int, data: bytearray) -> int:
# Cli_TMWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLTimer]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_AsTMWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
return result
def as_upload(self, block_num: int, _buffer, size) -> int:
block_type = snap7.types.block_types['DB']
result = self._library.Cli_AsUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
def copy_ram_to_rom(self, timeout: int = 1) -> int:
# Cli_CopyRamToRom
result = self._library.Cli_CopyRamToRom(self._pointer, timeout)
check_error(result, context="client")
return result
def ct_read(self, start: int, amount: int) -> bytearray:
# Cli_CTRead
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLCounter]
data = (type_ * amount)()
result = self._library.Cli_CTRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return bytearray(data)
def ct_write(self, start: int, amount: int, data: bytearray) -> int:
# Cli_CTWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLCounter]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_CTWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
return result
def db_fill(self, db_number: int, filler: int) -> int:
# Cli_DBFill
result = self._library.Cli_DBFill(self._pointer, db_number, filler)
check_error(result)
return result
def eb_read(self, start: int, size: int) -> bytearray:
# Cli_EBRead
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
data = (type_ * size)()
result = self._library.Cli_EBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
def eb_write(self, start: int, size: int, data: bytearray) -> int:
# Cli_EBWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_EBWrite(self._pointer, start, size, byref(cdata))
check_error(result)
return result
def error_text(self, error: int) -> str:
# Cli_ErrorText
text_length = c_int(256)
error_code = c_int32(error)
text = create_string_buffer(buffer_size)
response = self._library.Cli_ErrorText(error_code, byref(text), text_length)
check_error(response)
result = bytearray(text)[:text_length.value].decode().strip('\x00')
return result
def get_cp_info(self) -> S7CpInfo:
# Cli_GetCpInfo
cp_info = S7CpInfo()
result = self._library.Cli_GetCpInfo(self._pointer, byref(cp_info))
check_error(result)
return cp_info
def get_exec_time(self) -> int:
# Cli_GetExecTime
time = c_int32()
result = self._library.Cli_GetExecTime(self._pointer, byref(time))
check_error(result)
return time.value
def get_last_error(self) -> int:
# Cli_GetLastError
last_error = c_int32()
result = self._library.Cli_GetLastError(self._pointer, byref(last_error))
check_error(result)
return last_error.value
def get_order_code(self) -> S7OrderCode:
# Cli_GetOrderCode
order_code = S7OrderCode()
result = self._library.Cli_GetOrderCode(self._pointer, byref(order_code))
check_error(result)
return order_code
def get_pg_block_info(self, block: bytearray) -> TS7BlockInfo:
block_info = TS7BlockInfo()
size = c_int(len(block))
buffer = (c_byte * len(block)).from_buffer_copy(block)
result = self._library.Cli_GetPgBlockInfo(self._pointer, byref(buffer), byref(block_info), size)
check_error(result)
return block_info
def get_protection(self) -> S7Protection:
# Cli_GetProtection
s7_protection = S7Protection()
result = self._library.Cli_GetProtection(self._pointer, byref(s7_protection))
check_error(result)
return s7_protection
def iso_exchange_buffer(self, data: bytearray) -> bytearray:
# Cli_IsoExchangeBuffer
size = c_int(len(data))
cdata = (c_byte * len(data)).from_buffer_copy(data)
response = self._library.Cli_IsoExchangeBuffer(self._pointer, byref(cdata), byref(size))
check_error(response)
result = bytearray(cdata)[:size.value]
return result
def mb_read(self, start: int, size: int) -> bytearray:
# Cli_MBRead
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
data = (type_ * size)()
result = self._library.Cli_MBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
def mb_write(self, start: int, size: int, data: bytearray) -> int:
# Cli_MBWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_MBWrite(self._pointer, start, size, byref(cdata))
check_error(result)
return result
def read_szl(self, ssl_id: int, index: int = 0x0000) -> S7SZL:
# Cli_ReadSZL
s7_szl = S7SZL()
size = c_int(sizeof(s7_szl))
result = self._library.Cli_ReadSZL(self._pointer, ssl_id, index, byref(s7_szl), byref(size))
check_error(result, context="client")
return s7_szl
def read_szl_list(self) -> bytearray:
# Cli_ReadSZLList
szl_list = S7SZLList()
items_count = c_int(sizeof(szl_list))
response = self._library.Cli_ReadSZLList(self._pointer, byref(szl_list), byref(items_count))
check_error(response, context="client")
result = bytearray(szl_list.List)[:items_count.value]
return result
def set_plc_system_datetime(self) -> int:
# Cli_SetPlcSystemDateTime
result = self._library.Cli_SetPlcSystemDateTime(self._pointer)
check_error(result)
return result
def tm_read(self, start: int, amount: int) -> bytearray:
# Cli_TMRead
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLTimer]
data = (type_ * amount)()
result = self._library.Cli_TMRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
return bytearray(data)
def tm_write(self, start: int, amount: int, data: bytearray) -> int:
# Cli_TMWrite
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLTimer]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_TMWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
return result
def write_multi_vars(self, items: List[S7DataItem]) -> int:
# Cli_WriteMultiVars
items_count = c_int32(len(items))
data = bytearray()
for item in items:
data += bytearray(item)
cdata = (S7DataItem * len(items)).from_buffer_copy(data)
result = self._library.Cli_WriteMultiVars(self._pointer, byref(cdata), items_count)
check_error(result, context="client")
return result