"""
Snap7 client used for connection to a siemens 7 server.
"""
import re
import logging
from ctypes import CFUNCTYPE, byref, create_string_buffer, sizeof
from ctypes import Array, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p
from datetime import datetime
from typing import Any, Callable, List, Optional, Tuple, Union, Type
from .error import error_wrap, check_error
from types import TracebackType
from snap7.common import ipv4, load_library
from snap7.protocol import Snap7CliProtocol
from snap7.type import S7SZL, Area, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem, Block
from snap7.type import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen
from snap7.type import S7Object, buffer_size, buffer_type, cpu_statuses
from snap7.type import CDataArrayType, Parameter
logger = logging.getLogger(__name__)
[docs]
class Client:
"""
A snap7 client
Examples:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("127.0.0.1", 0, 0, 1102)
>>> client.get_connected()
True
>>> data = client.db_read(1, 0, 4)
>>> data
bytearray(b"\\x00\\x00\\x00\\x00")
>>> data[3] = 0b00000001
>>> data
bytearray(b'\\x00\\x00\\x00\\x01')
>>> client.db_write(1, 0, data)
"""
_lib: Snap7CliProtocol
_read_callback = None
_callback = None
_s7_client: S7Object
[docs]
def __init__(self, lib_location: Optional[str] = None):
"""Creates a new `Client` instance.
Args:
lib_location: Full path to the snap7.dll file. Optional.
Examples:
>>> import snap7
>>> client = snap7.client.Client() # If the `snap7.dll` file is in the path location
>>> client2 = snap7.client.Client(lib_location="/path/to/snap7.dll") # If the dll is in another location
<snap7.client.Client object at 0x0000028B257128E0>
"""
self._lib: Snap7CliProtocol = load_library(lib_location)
self.create()
def __enter__(self) -> "Client":
return self
def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.destroy()
def __del__(self) -> None:
self.destroy()
[docs]
def create(self) -> None:
"""Creates a SNAP7 client."""
logger.info("creating snap7 client")
self._lib.Cli_Create.restype = S7Object
self._s7_client = S7Object(self._lib.Cli_Create())
[docs]
def destroy(self) -> Optional[int]:
"""Destroys the Client object.
Returns:
Error code from snap7 library.
Examples:
>>> Client().destroy()
640719840
"""
logger.info("destroying snap7 client")
if self._lib and self._s7_client is not None:
return self._lib.Cli_Destroy(byref(self._s7_client))
self._s7_client = None # type: ignore[assignment]
return None
[docs]
def plc_stop(self) -> int:
"""Puts the CPU in STOP mode
Returns:
Error code from snap7 library.
"""
logger.info("stopping plc")
return self._lib.Cli_PlcStop(self._s7_client)
[docs]
def plc_cold_start(self) -> int:
"""Puts the CPU in RUN mode performing a COLD START.
Returns:
Error code from snap7 library.
"""
logger.info("cold starting plc")
return self._lib.Cli_PlcColdStart(self._s7_client)
[docs]
def plc_hot_start(self) -> int:
"""Puts the CPU in RUN mode performing an HOT START.
Returns:
Error code from snap7 library.
"""
logger.info("hot starting plc")
return self._lib.Cli_PlcHotStart(self._s7_client)
[docs]
def get_cpu_state(self) -> str:
"""Returns the CPU status (running/stopped)
Returns:
Description of the cpu state.
Raises:
:obj:`ValueError`: if the cpu state is invalid.
Examples:
>>> Client().get_cpu_state()
'S7CpuStatusRun'
"""
state = c_int(0)
self._lib.Cli_GetPlcStatus(self._s7_client, byref(state))
try:
status_string = cpu_statuses[state.value]
except KeyError:
raise ValueError(f"The cpu state ({state.value}) is invalid")
logger.debug(f"CPU state is {status_string}")
return status_string
[docs]
def get_cpu_info(self) -> S7CpuInfo:
"""Returns some information about the AG.
Returns:
:obj:`S7CpuInfo`: data structure with the information.
Examples:
>>> cpu_info = Client().get_cpu_info()
>>> print(cpu_info)
<S7CpuInfo ModuleTypeName: b'CPU 315-2 PN/DP'
SerialNumber: b'S C-C2UR28922012'
ASName: b'SNAP7-SERVER' Copyright: b'Original Siemens Equipment'
ModuleName: 'CPU 315-2 PN/DP' >
"""
info = S7CpuInfo()
result = self._lib.Cli_GetCpuInfo(self._s7_client, byref(info))
check_error(result, context="client")
return info
@error_wrap(context="client")
def disconnect(self) -> int:
"""Disconnect a client.
Returns:
Error code from snap7 library.
"""
logger.info("disconnecting snap7 client")
return self._lib.Cli_Disconnect(self._s7_client)
[docs]
def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "Client":
"""Connects a Client Object to a PLC.
Args:
address: IP address of the PLC.
rack: rack number where the PLC is located.
slot: slot number where the CPU is located.
tcp_port: port of the PLC.
Returns:
The snap7 Logo instance
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0) # port is implicit = 102.
"""
logger.info(f"connecting to {address}:{tcp_port} rack {rack} slot {slot}")
self.set_param(parameter=Parameter.RemotePort, value=tcp_port)
check_error(self._lib.Cli_ConnectTo(self._s7_client, c_char_p(address.encode()), c_int(rack), c_int(slot)))
return self
[docs]
def db_read(self, db_number: int, start: int, size: int) -> bytearray:
"""Reads a part of a DB from a PLC
Note:
Use it only for reading DBs, not Marks, Inputs, Outputs.
Args:
db_number: number of the DB to be read.
start: byte index from where is start to read from.
size: amount of bytes to be read.
Returns:
Buffer read.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = client.db_read(1, 10, 4) # reads the db number 1 starting from the byte 10 until byte 14.
>>> buffer
bytearray(b'\\x00\\x00')
"""
logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}")
type_ = WordLen.Byte.ctype
data = (type_ * size)()
result = self._lib.Cli_DBRead(self._s7_client, db_number, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
@error_wrap(context="client")
def db_write(self, db_number: int, start: int, data: bytearray) -> int:
"""Writes a part of a DB into a PLC.
Args:
db_number: number of the DB to be written.
start: byte index to start writing to.
data: buffer to be written.
Returns:
Buffer written.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = bytearray([0b00000001])
>>> client.db_write(1, 10, buffer) # writes the bit number 0 from the byte 10 to TRUE.
"""
word_len = WordLen.Byte
type_ = word_len.ctype
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}")
return self._lib.Cli_DBWrite(self._s7_client, db_number, start, size, byref(cdata))
[docs]
def delete(self, block_type: Block, block_num: int) -> int:
"""Delete a block into AG.
Args:
block_type: type of block.
block_num: block number.
Returns:
Error code from snap7 library.
"""
logger.info("deleting block")
result = self._lib.Cli_Delete(self._s7_client, block_type.ctype, block_num)
return result
[docs]
def full_upload(self, block_type: Block, block_num: int) -> Tuple[bytearray, int]:
"""Uploads a block from AG with Header and Footer infos.
The whole block (including header and footer) is copied into the user
buffer.
Args:
block_type: type of block.
block_num: number of block.
Returns:
Tuple of the buffer and size.
"""
buffer = buffer_type()
size = c_int(sizeof(buffer))
result = self._lib.Cli_FullUpload(self._s7_client, block_type.ctype, block_num, byref(buffer), byref(size))
check_error(result, context="client")
return bytearray(buffer)[: size.value], size.value
[docs]
def upload(self, block_num: int) -> bytearray:
"""Uploads a block from AG.
Note:
Upload means from the PLC to the PC.
Args:
block_num: block to be uploaded.
Returns:
Buffer with the uploaded block.
"""
logger.debug(f"db_upload block_num: {block_num}")
buffer = buffer_type()
size = c_int(sizeof(buffer))
result = self._lib.Cli_Upload(self._s7_client, Block.DB.ctype, block_num, byref(buffer), byref(size))
check_error(result, context="client")
logger.info(f"received {size} bytes")
return bytearray(buffer)
@error_wrap(context="client")
def download(self, data: bytearray, block_num: int = -1) -> int:
"""Download a block into AG.
A whole block (including header and footer) must be available into the
user buffer.
Note:
Download means from the PC to the PLC.
Args:
data: buffer data.
block_num: new block number.
Returns:
Error code from snap7 library.
"""
type_ = c_byte
size = len(data)
cdata = (type_ * len(data)).from_buffer_copy(data)
return self._lib.Cli_Download(self._s7_client, block_num, byref(cdata), size)
[docs]
def db_get(self, db_number: int) -> bytearray:
"""Uploads a DB from AG using DBRead.
Note:
This method can't be used for 1200/1500 PLCs.
Args:
db_number: db number to be read from.
Returns:
Buffer with the data read.
Example:
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = client.db_get(1) # reads the db number 1.
>>> buffer
bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00...<truncated>\\x00\\x00")
"""
logger.debug(f"db_get db_number: {db_number}")
_buffer = buffer_type()
result = self._lib.Cli_DBGet(self._s7_client, db_number, byref(_buffer), byref(c_int(buffer_size)))
check_error(result, context="client")
return bytearray(_buffer)
[docs]
def read_area(self, area: Area, db_number: int, start: int, size: int) -> bytearray:
"""Read a data area from a PLC
With this you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
Args:
area: area to be read from.
db_number: The DB number, only used when area=Areas.DB
start: byte index to start reading.
size: number of bytes to read.
Returns:
Buffer with the data read.
Example:
>>> from snap7 import Client, Area
>>> Client().connect("192.168.0.1", 0, 0)
>>> buffer = Client().read_area(Area.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14.
>>> buffer
bytearray(b'\\x00\\x00')
"""
if area not in Area:
raise ValueError(f"{area} is not implemented in types")
elif area == Area.TM:
word_len = WordLen.Timer
elif area == Area.CT:
word_len = WordLen.Counter
else:
word_len = WordLen.Byte
type_ = word_len.ctype
logger.debug(
f"reading area: {area.name} db_number: {db_number} start: {start} amount: {size} "
f"word_len: {word_len.name}={word_len}"
)
data = (type_ * size)()
result = self._lib.Cli_ReadArea(self._s7_client, area, db_number, start, size, word_len, byref(data))
check_error(result, context="client")
return bytearray(data)
@error_wrap(context="client")
def write_area(self, area: Area, db_number: int, start: int, data: bytearray) -> int:
"""Writes a data area into a PLC.
Args:
area: area to be written.
db_number: number of the db to be written to. In case of Inputs, Marks or Outputs, this should be equal to 0
start: byte index to start writting.
data: buffer to be written.
Returns:
Snap7 error code.
Exmaple:
>>> from util.db import DB
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = bytearray([0b00000001])
# Writes the bit 0 of the byte 10 from the DB number 1 to TRUE.
>>> client.write_area(DB, 1, 10, buffer)
"""
if area == Area.TM:
word_len = WordLen.Timer
elif area == Area.CT:
word_len = WordLen.Counter
else:
word_len = WordLen.Byte
type_ = WordLen.Byte.ctype
size = len(data)
logger.debug(
f"writing area: {area.name} db_number: {db_number} start: {start}: size {size}: "
f"word_len {word_len.name}={word_len} type: {type_}"
)
cdata = (type_ * len(data)).from_buffer_copy(data)
return self._lib.Cli_WriteArea(self._s7_client, area, db_number, start, size, word_len, byref(cdata))
[docs]
def read_multi_vars(self, items: Array[S7DataItem]) -> Tuple[int, Array[S7DataItem]]:
"""Reads different kind of variables from a PLC simultaneously.
Args:
items: list of items to be read.
Returns:
Tuple of the return code from the snap7 library and the list of items.
"""
result = self._lib.Cli_ReadMultiVars(self._s7_client, byref(items), c_int32(len(items)))
check_error(result, context="client")
return result, items
[docs]
def list_blocks(self) -> BlocksList:
"""Returns the AG blocks amount divided by type.
Returns:
Block list structure object.
Examples:
>>> print(Client().list_blocks())
<block list count OB: 0 FB: 0 FC: 0 SFB: 0 SFC: 0x0 DB: 1 SDB: 0>
"""
logger.debug("listing blocks")
block_list = BlocksList()
result = self._lib.Cli_ListBlocks(self._s7_client, byref(block_list))
check_error(result, context="client")
logger.debug(f"blocks: {block_list}")
return block_list
[docs]
def list_blocks_of_type(self, block_type: Block, size: int) -> Union[int, Array[c_uint16]]:
"""This function returns the AG list of a specified block type.
Args:
block_type: specified block type.
size: size of the block type.
Returns:
If size is 0, it returns a 0, otherwise an `Array` of specified block type.
Raises:
:obj:`ValueError`: if the `block_type` is not valid.
"""
logger.debug(f"listing blocks of type: {block_type} size: {size}")
if size == 0:
return 0
data = (c_uint16 * size)()
count = c_int(size)
result = self._lib.Cli_ListBlocksOfType(self._s7_client, block_type.ctype, byref(data), byref(count))
logger.debug(f"number of items found: {count}")
check_error(result, context="client")
return data
[docs]
def get_block_info(self, block_type: Block, db_number: int) -> TS7BlockInfo:
"""Returns detailed information about a block present in AG.
Args:
block_type: specified block type.
db_number: number of db to get information from.
Returns:
Structure of information from block.
Raises:
:obj:`ValueError`: if the `blocktype` is not valid.
Examples:
>>> block_info = Client().get_block_info("DB", 1)
>>> print(block_info)
Block type: 10
Block number: 1
Block language: 5
Block flags: 1
MC7Size: 100
Load memory size: 192
Local data: 0
SBB Length: 20
Checksum: 0
Version: 1
Code date: b'1999/11/17'
Interface date: b'1999/11/17'
Author: b''
Family: b''
Header: b''
"""
logger.debug(f"retrieving block info for block {db_number} of type {block_type}")
data = TS7BlockInfo()
result = self._lib.Cli_GetAgBlockInfo(self._s7_client, block_type.ctype, db_number, byref(data))
check_error(result, context="client")
return data
@error_wrap(context="client")
def set_session_password(self, password: str) -> int:
"""Send the password to the PLC to meet its security level.
Args:
password: password to set.
Returns:
Snap7 code.
Raises:
:obj:`ValueError`: if the length of the `password` is more than 8 characters.
"""
if len(password) > 8:
raise ValueError("Maximum password length is 8")
return self._lib.Cli_SetSessionPassword(self._s7_client, c_char_p(password.encode()))
@error_wrap(context="client")
def clear_session_password(self) -> int:
"""Clears the password set for the current session (logout).
Returns:
Snap7 code.
"""
return self._lib.Cli_ClearSessionPassword(self._s7_client)
[docs]
def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int) -> None:
"""Sets internally (IP, LocalTSAP, RemoteTSAP) Coordinates.
Note:
This function must be called just before `Cli_Connect()`.
Args:
address: PLC/Equipment IPV4 Address, for example "192.168.1.12"
local_tsap: Local TSAP (PC TSAP)
remote_tsap: Remote TSAP (PLC TSAP)
Raises:
:obj:`ValueError`: if the `address` is not a valid IPV4.
:obj:`ValueError`: if the result of setting the connection params is
different from 0.
"""
if not re.match(ipv4, address):
raise ValueError(f"{address} is invalid ipv4")
result = self._lib.Cli_SetConnectionParams(self._s7_client, address.encode(), c_uint16(local_tsap), c_uint16(remote_tsap))
if result != 0:
raise ValueError("The parameter was invalid")
[docs]
def set_connection_type(self, connection_type: int) -> None:
"""Sets the connection resource type, i.e. the way in which the Clients connect to a PLC.
Args:
connection_type: 1 for PG, 2 for OP, 3 to 10 for S7 Basic
Raises:
:obj:`ValueError`: if the result of setting the connection type is
different from 0.
"""
result = self._lib.Cli_SetConnectionType(self._s7_client, c_uint16(connection_type))
if result != 0:
raise ValueError("The parameter was invalid")
[docs]
def get_connected(self) -> bool:
"""Returns the connection status
Note:
Sometimes returns True, while connection is lost.
Returns:
True if is connected, otherwise false.
"""
connected = c_int32()
result = self._lib.Cli_GetConnected(self._s7_client, byref(connected))
check_error(result, context="client")
return bool(connected)
[docs]
def ab_read(self, start: int, size: int) -> bytearray:
"""Reads a part of IPU area from a PLC.
Args:
start: byte index from where start to read.
size: amount of bytes to read.
Returns:
Buffer with the data read.
"""
word_len = WordLen.Byte
type_ = word_len.ctype
data = (type_ * size)()
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._lib.Cli_ABRead(self._s7_client, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs]
def ab_write(self, start: int, data: bytearray) -> int:
"""Writes a part of IPU area into a PLC.
Args:
start: byte index from where start to write.
data: buffer with the data to be written.
Returns:
Snap7 code.
"""
word_len = WordLen.Byte
type_ = word_len.ctype
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
return self._lib.Cli_ABWrite(self._s7_client, start, size, byref(cdata))
[docs]
def as_ab_read(self, start: int, size: int, data: Union[Array[c_byte], CDataArrayType]) -> int:
"""Reads a part of IPU area from a PLC asynchronously.
Args:
start: byte index from where start to read.
size: amount of bytes to read.
data: buffer where the data will be place.
Returns:
Snap7 code.
"""
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._lib.Cli_AsABRead(self._s7_client, start, size, byref(data))
check_error(result, context="client")
return result
[docs]
def as_ab_write(self, start: int, data: bytearray) -> int:
"""Writes a part of IPU area into a PLC asynchronously.
Args:
start: byte index from where start to write.
data: buffer with the data to be written.
Returns:
Snap7 code.
"""
word_len = WordLen.Byte
type_ = word_len.ctype
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
result = self._lib.Cli_AsABWrite(self._s7_client, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs]
def as_compress(self, time: int) -> int:
"""Performs the Compress action asynchronously.
Args:
time: timeout.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsCompress(self._s7_client, time)
check_error(result, context="client")
return result
[docs]
def as_copy_ram_to_rom(self, timeout: int = 1) -> int:
"""Performs the Copy Ram to Rom action asynchronously.
Args:
timeout: time to wait until fail.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsCopyRamToRom(self._s7_client, timeout)
check_error(result, context="client")
return result
[docs]
def as_ct_read(self, start: int, amount: int, data: CDataArrayType) -> int:
"""Reads counters from a PLC asynchronously.
Args:
start: byte index to start to read from.
amount: amount of bytes to read.
data: buffer where the value read will be place.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsCTRead(self._s7_client, start, amount, byref(data))
check_error(result, context="client")
return result
[docs]
def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write counters into a PLC.
Args:
start: byte index to start to write from.
amount: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Counter.ctype
cdata = (type_ * amount).from_buffer_copy(data)
result = self._lib.Cli_AsCTWrite(self._s7_client, start, amount, byref(cdata))
check_error(result, context="client")
return result
[docs]
def as_db_fill(self, db_number: int, filler: int) -> int:
"""Fills a DB in AG with a given byte.
Args:
db_number: number of DB to fill.
filler: buffer to fill with.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsDBFill(self._s7_client, db_number, filler)
check_error(result, context="client")
return result
[docs]
def as_db_get(self, db_number: int, data: CDataArrayType, size: int) -> int:
"""Uploads a DB from AG using DBRead.
Note:
This method will not work in 1200/1500.
Args:
db_number: number of DB to get.
data: buffer where the data read will be place.
size: amount of bytes to be read.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsDBGet(self._s7_client, db_number, byref(data), byref(c_int(size)))
check_error(result, context="client")
return result
[docs]
def as_db_read(self, db_number: int, start: int, size: int, data: CDataArrayType) -> int:
"""Reads a part of a DB from a PLC.
Args:
db_number: number of DB to be read.
start: byte index from where start to read from.
size: amount of bytes to read.
data: buffer where the data read will be place.
Returns:
Snap7 code.
Examples:
>>> import ctypes
>>> content = (ctypes.c_uint8 * size)() # In this ctypes array data will be stored.
>>> Client().as_db_read(1, 0, size, content)
0
"""
result = self._lib.Cli_AsDBRead(self._s7_client, db_number, start, size, byref(data))
check_error(result, context="client")
return result
[docs]
def as_db_write(self, db_number: int, start: int, size: int, data: CDataArrayType) -> int:
"""Writes a part of a DB into a PLC.
Args:
db_number: number of DB to be written.
start: byte index from where start to write to.
size: amount of bytes to write.
data: buffer to be written.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsDBWrite(self._s7_client, db_number, start, size, byref(data))
check_error(result, context="client")
return result
[docs]
def as_download(self, data: bytearray, block_num: int) -> int:
"""Download a block into AG asynchronously.
Note:
A whole block (including header and footer) must be available into the user buffer.
Args:
block_num: new block number.
data: buffer where the data will be place.
Returns:
Snap7 code.
"""
size = len(data)
type_ = c_byte * len(data)
cdata = type_.from_buffer_copy(data)
result = self._lib.Cli_AsDownload(self._s7_client, block_num, byref(cdata), size)
check_error(result)
return result
@error_wrap(context="client")
def compress(self, time: int) -> int:
"""Performs the Compress action.
Args:
time: timeout.
Returns:
Snap7 code.
"""
return self._lib.Cli_Compress(self._s7_client, time)
@error_wrap(context="client")
def set_param(self, parameter: Parameter, value: int) -> int:
"""Writes an internal Server Parameter.
Args:
parameter: the parameter to be written.
value: value to be written.
Returns:
Snap7 code.
"""
logger.debug(f"setting param number {parameter} to {value}")
return self._lib.Cli_SetParam(self._s7_client, parameter, byref(parameter.ctype(value)))
[docs]
def get_param(self, parameter: Parameter) -> int:
"""Reads an internal Server parameter.
Args:
parameter: number of argument to be read.
Return:
Value of the param read.
"""
logger.debug(f"retrieving param number {parameter}")
value = parameter.ctype()
code = self._lib.Cli_GetParam(self._s7_client, c_int(parameter), byref(value))
check_error(code)
return value.value
[docs]
def get_pdu_length(self) -> int:
"""Returns info about the PDU length (requested and negotiated).
Returns:
PDU length.
Examples:
>>> Client().get_pdu_length()
480
"""
logger.info("getting PDU length")
requested_ = c_uint16()
negotiated_ = c_uint16()
code = self._lib.Cli_GetPduLength(self._s7_client, byref(requested_), byref(negotiated_))
check_error(code)
return negotiated_.value
[docs]
def get_plc_datetime(self) -> datetime:
"""Returns the PLC date/time.
Returns:
Date and time as datetime
Examples:
>>> Client().get_plc_datetime()
datetime.datetime(2021, 4, 6, 12, 12, 36)
"""
type_ = c_int32
buffer = (type_ * 9)()
result = self._lib.Cli_GetPlcDateTime(self._s7_client, byref(buffer))
check_error(result, context="client")
return datetime(
year=buffer[5] + 1900, month=buffer[4] + 1, day=buffer[3], hour=buffer[2], minute=buffer[1], second=buffer[0]
)
@error_wrap(context="client")
def set_plc_datetime(self, dt: datetime) -> int:
"""Sets the PLC date/time with a given value.
Args:
dt: datetime to be set.
Returns:
Snap7 code.
"""
type_ = c_int32
buffer = (type_ * 9)()
buffer[0] = dt.second
buffer[1] = dt.minute
buffer[2] = dt.hour
buffer[3] = dt.day
buffer[4] = dt.month - 1
buffer[5] = dt.year - 1900
return self._lib.Cli_SetPlcDateTime(self._s7_client, byref(buffer))
[docs]
def check_as_completion(self, p_value: c_int) -> int:
"""Method to check Status of an async request.
Result contains if the check was successful, not the data value itself
Args:
p_value: Pointer where result of this check shall be written.
Returns:
Snap7 code. If 0 - Job is done successfully. If 1 - Job is either pending or contains s7errors
"""
result = self._lib.Cli_CheckAsCompletion(self._s7_client, byref(p_value))
check_error(result, context="client")
return result
[docs]
def set_as_callback(self, call_back: Callable[..., Any]) -> int:
"""
Sets the user callback that is called when an asynchronous data sent is complete.
"""
logger.info("setting event callback")
callback_wrap: Callable[..., Any] = CFUNCTYPE(None, c_void_p, c_int, c_int)
def wrapper(_: None, op_code: int, op_result: int) -> int:
"""Wraps python function into a ctypes function
Args:
_: not used
op_code:
op_result:
Returns:
Should return an int
"""
logger.info(f"callback event: op_code: {op_code} op_result: {op_result}")
call_back(op_code, op_result)
return 0
self._callback = callback_wrap(wrapper)
data = c_void_p()
result = self._lib.Cli_SetAsCallback(self._s7_client, self._callback, data)
check_error(result, context="client")
return result
[docs]
def wait_as_completion(self, timeout: int) -> int:
"""Snap7 Cli_WaitAsCompletion representative.
Args:
timeout: ms to wait for async job
Returns:
Snap7 code.
"""
# Cli_WaitAsCompletion
result = self._lib.Cli_WaitAsCompletion(self._s7_client, c_ulong(timeout))
check_error(result, context="client")
return result
[docs]
def as_read_area(self, area: Area, db_number: int, start: int, size: int, word_len: WordLen, data: CDataArrayType) -> int:
"""Reads a data area from a PLC asynchronously.
With this you can read DB, Inputs, Outputs, Markers, Timers and Counters.
Args:
area: memory area to be read from.
db_number: The DB number, only used when area=Areas.DB
start: offset to start writing
size: number of units to read
data: buffer where the data will be place.
word_len: length of the word to be read.
Returns:
Snap7 code.
"""
logger.debug(
f"reading area: {area.name} db_number: {db_number} start: {start} amount: {size} "
f"word_len: {word_len.name}={word_len.value}"
)
result = self._lib.Cli_AsReadArea(self._s7_client, area, db_number, start, size, word_len, byref(data))
check_error(result, context="client")
return result
[docs]
def as_write_area(self, area: Area, db_number: int, start: int, size: int, word_len: WordLen, data: CDataArrayType) -> int:
"""Writes a data area into a PLC asynchronously.
Args:
area: memory area to be written.
db_number: The DB number, only used when area=Areas.DB
start: offset to start writing.
size: amount of bytes to be written.
word_len: length of the word to be written.
data: buffer to be written.
Returns:
Snap7 code.
"""
type_ = WordLen.Byte.ctype
logger.debug(
f"writing area: {area.name} db_number: {db_number} "
f"start: {start}: size {size}: "
f"word_len {word_len} type: {type_}"
)
cdata = (type_ * len(data)).from_buffer_copy(data)
res = self._lib.Cli_AsWriteArea(self._s7_client, area, db_number, start, size, word_len.value, byref(cdata))
check_error(res, context="client")
return res
[docs]
def as_eb_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Reads a part of IPI area from a PLC asynchronously.
Args:
start: byte index from where to start reading from.
size: amount of bytes to read.
data: buffer where the data read will be place.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsEBRead(self._s7_client, start, size, byref(data))
check_error(result, context="client")
return result
[docs]
def as_eb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of IPI area into a PLC.
Args:
start: byte index from where to start writing from.
size: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Byte.ctype
cdata = (type_ * size).from_buffer_copy(data)
result = self._lib.Cli_AsEBWrite(self._s7_client, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs]
def as_full_upload(self, block_type: Block, block_num: int) -> int:
"""Uploads a block from AG with Header and Footer infos.
Note:
Upload means from PLC to PC.
Args:
block_type: type of block.
block_num: number of block to upload.
Returns:
Snap7 code.
"""
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
result = self._lib.Cli_AsFullUpload(self._s7_client, block_type.ctype, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
[docs]
def as_list_blocks_of_type(self, block_type: Block, data: CDataArrayType, count: int) -> int:
"""Returns the AG blocks list of a given type.
Args:
block_type: block type.
data: buffer where the data will be place.
count: pass.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsListBlocksOfType(self._s7_client, block_type.ctype, byref(data), byref(c_int(count)))
check_error(result, context="client")
return result
[docs]
def as_mb_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Reads a part of Markers area from a PLC.
Args:
start: byte index from where to start to read from.
size: amount of byte to read.
data: buffer where the data read will be place.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsMBRead(self._s7_client, start, size, byref(data))
check_error(result, context="client")
return result
[docs]
def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of Markers area into a PLC.
Args:
start: byte index from where to start to write to.
size: amount of byte to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Byte.ctype
cdata = (type_ * size).from_buffer_copy(data)
result = self._lib.Cli_AsMBWrite(self._s7_client, start, size, byref(cdata))
check_error(result, context="client")
return result
[docs]
def as_read_szl(self, id_: int, index: int, data: S7SZL, size: int) -> int:
"""Reads a partial list of given ID and Index.
Args:
id_: The list ID
index: The list index
data: the user buffer
size: buffer size available
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsReadSZL(self._s7_client, id_, index, byref(data), byref(c_int(size)))
check_error(result, context="client")
return result
[docs]
def as_read_szl_list(self, data: S7SZLList, items_count: int) -> int:
"""Reads the list of partial lists available in the CPU.
Args:
data: the user buffer list
items_count: buffer capacity
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsReadSZLList(self._s7_client, byref(data), byref(c_int(items_count)))
check_error(result, context="client")
return result
[docs]
def as_tm_read(self, start: int, amount: int, data: CDataArrayType) -> int:
"""Reads timers from a PLC.
Args:
start: byte index to start read from.
amount: amount of bytes to read.
data: buffer where the data will be placed.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsTMRead(self._s7_client, start, amount, byref(data))
check_error(result, context="client")
return result
[docs]
def as_tm_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write timers into a PLC.
Args:
start: byte index to start writing to.
amount: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Timer.ctype
cdata = (type_ * amount).from_buffer_copy(data)
result = self._lib.Cli_AsTMWrite(self._s7_client, start, amount, byref(cdata))
check_error(result)
return result
[docs]
def as_upload(self, block_num: int, data: CDataArrayType, size: int) -> int:
"""Uploads a block from AG.
Note:
Uploads means from PLC to PC.
Args:
block_num: block number to upload.
data: buffer where the data will be place.
size: amount of bytes to upload.
Returns:
Snap7 code.
"""
result = self._lib.Cli_AsUpload(self._s7_client, Block.DB.ctype, block_num, byref(data), byref(c_int(size)))
check_error(result, context="client")
return result
[docs]
def copy_ram_to_rom(self, timeout: int = 1) -> int:
"""Performs the Copy Ram to Rom action.
Args:
timeout: timeout time.
Returns:
Snap7 code.
"""
result = self._lib.Cli_CopyRamToRom(self._s7_client, timeout)
check_error(result, context="client")
return result
[docs]
def ct_read(self, start: int, amount: int) -> bytearray:
"""Reads counters from a PLC.
Args:
start: byte index to start read from.
amount: amount of bytes to read.
Returns:
Buffer read.
"""
type_ = WordLen.Counter.ctype
data = (type_ * amount)()
result = self._lib.Cli_CTRead(self._s7_client, start, amount, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs]
def ct_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write counters into a PLC.
Args:
start: byte index to start write to.
amount: amount of bytes to write.
data: buffer data to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Counter.ctype
cdata = (type_ * amount).from_buffer_copy(data)
result = self._lib.Cli_CTWrite(self._s7_client, start, amount, byref(cdata))
check_error(result)
return result
[docs]
def db_fill(self, db_number: int, filler: int) -> int:
"""Fills a DB in AG with a given byte.
Args:
db_number: db number to fill.
filler: value filler.
Returns:
Snap7 code.
"""
result = self._lib.Cli_DBFill(self._s7_client, db_number, filler)
check_error(result)
return result
[docs]
def eb_read(self, start: int, size: int) -> bytearray:
"""Reads a part of IPI area from a PLC.
Args:
start: byte index to start read from.
size: amount of bytes to read.
Returns:
Data read.
"""
type_ = WordLen.Byte.ctype
data = (type_ * size)()
result = self._lib.Cli_EBRead(self._s7_client, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs]
def eb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of IPI area into a PLC.
Args:
start: byte index to be written.
size: amount of bytes to write.
data: data to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Byte.ctype
cdata = (type_ * size).from_buffer_copy(data)
result = self._lib.Cli_EBWrite(self._s7_client, start, size, byref(cdata))
check_error(result)
return result
[docs]
def error_text(self, error: int) -> str:
"""Returns a textual explanation of a given error number.
Args:
error: error number.
Returns:
Text error.
"""
text_length = c_int(256)
error_code = c_int32(error)
text = create_string_buffer(buffer_size)
response = self._lib.Cli_ErrorText(error_code, text, text_length)
check_error(response)
result = bytearray(text)[: text_length.value].decode().strip("\x00")
return result
[docs]
def get_cp_info(self) -> S7CpInfo:
"""Returns some information about the CP (communication processor).
Returns:
Structure object containing the CP information.
"""
cp_info = S7CpInfo()
result = self._lib.Cli_GetCpInfo(self._s7_client, byref(cp_info))
check_error(result)
return cp_info
[docs]
def get_exec_time(self) -> int:
"""Returns the last job execution time in milliseconds.
Returns:
Execution time value.
"""
time = c_int32()
result = self._lib.Cli_GetExecTime(self._s7_client, byref(time))
check_error(result)
return time.value
[docs]
def get_last_error(self) -> int:
"""Returns the last job result.
Returns:
Returns the last error value.
"""
last_error = c_int32()
result = self._lib.Cli_GetLastError(self._s7_client, byref(last_error))
check_error(result)
return last_error.value
[docs]
def get_order_code(self) -> S7OrderCode:
"""Returns the CPU order code.
Returns:
Order of the code in a structure object.
"""
order_code = S7OrderCode()
result = self._lib.Cli_GetOrderCode(self._s7_client, byref(order_code))
check_error(result)
return order_code
[docs]
def get_pg_block_info(self, block: bytearray) -> TS7BlockInfo:
"""Returns detailed information about a block loaded in memory.
Args:
block: buffer where the data will be place.
Returns:
Structure object that contains the block information.
"""
block_info = TS7BlockInfo()
size = c_int(len(block))
buffer = (c_byte * len(block)).from_buffer_copy(block)
result = self._lib.Cli_GetPgBlockInfo(self._s7_client, byref(buffer), byref(block_info), size)
check_error(result)
return block_info
[docs]
def get_protection(self) -> S7Protection:
"""Gets the CPU protection level info.
Returns:
Structure object with protection attributes.
"""
s7_protection = S7Protection()
result = self._lib.Cli_GetProtection(self._s7_client, byref(s7_protection))
check_error(result)
return s7_protection
[docs]
def iso_exchange_buffer(self, data: bytearray) -> bytearray:
"""Exchanges a given S7 PDU (protocol data unit) with the CPU.
Args:
data: buffer to exchange.
Returns:
Snap7 code.
"""
size = c_int(len(data))
cdata = (c_byte * len(data)).from_buffer_copy(data)
response = self._lib.Cli_IsoExchangeBuffer(self._s7_client, byref(cdata), byref(size))
check_error(response)
result = bytearray(cdata)[: size.value]
return result
[docs]
def mb_read(self, start: int, size: int) -> bytearray:
"""Reads a part of Markers area from a PLC.
Args:
start: byte index to be read from.
size: amount of bytes to read.
Returns:
Buffer with the data read.
"""
type_ = WordLen.Byte.ctype
data = (type_ * size)()
result = self._lib.Cli_MBRead(self._s7_client, start, size, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs]
def mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of Markers area into a PLC.
Args:
start: byte index to be written.
size: amount of bytes to write.
data: buffer to write.
Returns:
Snap7 code.
"""
type_ = WordLen.Byte.ctype
cdata = (type_ * size).from_buffer_copy(data)
result = self._lib.Cli_MBWrite(self._s7_client, start, size, byref(cdata))
check_error(result)
return result
[docs]
def read_szl(self, id_: int, index: int = 0) -> S7SZL:
"""Reads a partial list of given ID and Index.
Args:
id_: ssl id to be read.
index: index to be read.
Returns:
SZL structure object.
"""
s7_szl = S7SZL()
size = c_int(sizeof(s7_szl))
result = self._lib.Cli_ReadSZL(self._s7_client, id_, index, byref(s7_szl), byref(size))
check_error(result, context="client")
return s7_szl
[docs]
def read_szl_list(self) -> bytearray:
"""Reads the list of partial lists available in the CPU.
Returns:
Buffer read.
"""
szl_list = S7SZLList()
items_count = c_int(sizeof(szl_list))
response = self._lib.Cli_ReadSZLList(self._s7_client, byref(szl_list), byref(items_count))
check_error(response, context="client")
result = bytearray(szl_list.List)[: items_count.value]
return result
[docs]
def set_plc_system_datetime(self) -> int:
"""Sets the PLC date/time with the host (PC) date/time.
Returns:
Snap7 code.
"""
result = self._lib.Cli_SetPlcSystemDateTime(self._s7_client)
check_error(result)
return result
[docs]
def tm_read(self, start: int, amount: int) -> bytearray:
"""Reads timers from a PLC.
Args:
start: byte index from where is start to read from.
amount: amount of byte to be read.
Returns:
Buffer read.
"""
type_ = WordLen.Timer.ctype
data = (type_ * amount)()
result = self._lib.Cli_TMRead(self._s7_client, start, amount, byref(data))
check_error(result, context="client")
return bytearray(data)
[docs]
def tm_write(self, start: int, amount: int, data: bytearray) -> int:
"""Write timers into a PLC.
Args:
start: byte index from where is start to write to.
amount: amount of byte to be written.
data: data to be written.
Returns:
Snap7 code.
"""
type_ = WordLen.Timer.ctype
cdata = (type_ * amount).from_buffer_copy(data)
result = self._lib.Cli_TMWrite(self._s7_client, start, amount, byref(cdata))
check_error(result)
return result
[docs]
def write_multi_vars(self, items: List[S7DataItem]) -> int:
"""Writes different kind of variables into a PLC simultaneously.
Args:
items: list of items to be written.
Returns:
Snap7 code.
"""
items_count = c_int32(len(items))
data = bytearray()
for item in items:
data += bytearray(item)
cdata = (S7DataItem * len(items)).from_buffer_copy(data)
result = self._lib.Cli_WriteMultiVars(self._s7_client, byref(cdata), items_count)
check_error(result, context="client")
return result