"""
Legacy S7 client implementation.
Pure Python implementation of the classic S7 protocol. For new projects,
use :class:`s7.Client` instead, which supports all PLC models and
automatically selects the best protocol.
"""
import copy
import logging
import random
import struct
import sys
import threading
import time
from typing import List, Any, Optional, Tuple, Union, Callable, cast
from datetime import datetime
from ctypes import (
c_int,
Array,
memmove,
)
from .connection import ISOTCPConnection
from .s7protocol import S7Protocol, get_return_code_description
from .datatypes import S7WordLen
from .error import S7Error, S7ConnectionError, S7ProtocolError, S7StalePacketError, S7TimeoutError
from .client_base import ClientMixin
from .log import PLCLoggerAdapter, OperationLogger
from .optimizer import ReadItem, ReadPacket, sort_items, merge_items, packetize, extract_results
from .tags import Tag, _STRING_RE
from . import util
from .szl import parse_cp_info_szl, parse_cpu_info_szl, parse_order_code_szl, parse_protection_szl
from .type import (
Area,
Block,
BlocksList,
S7CpuInfo,
TS7BlockInfo,
S7DataItem,
S7CpInfo,
S7OrderCode,
S7Protection,
S7SZL,
S7SZLList,
WordLen,
Parameter,
CDataArrayType,
)
_VALID_AREA_VALUES: frozenset[int] = frozenset(a.value for a in Area)
logger = logging.getLogger(__name__)
def _decode_tag(tag: Tag, data: bytearray) -> Any:
"""Decode a Tag's raw bytes into a typed Python value."""
upper = tag.datatype.upper()
# Variable-length string types
match = _STRING_RE.match(upper)
if match:
kind, length = match.group(1), int(match.group(2))
if kind == "FSTRING":
return util.get_fstring(data, 0, length)
if kind == "STRING":
return util.get_string(data, 0)
if kind == "WSTRING":
return util.get_wstring(data, 0)
# Arrays
if tag.count > 1:
per = tag.size // tag.count
return [_decode_scalar(upper, data[i * per : (i + 1) * per], tag.bit) for i in range(tag.count)]
return _decode_scalar(upper, data, tag.bit)
def _decode_scalar(datatype: str, data: bytearray, bit: int) -> Any:
"""Decode a single scalar value of the given type."""
if datatype == "BOOL":
return util.get_bool(data, 0, bit)
if datatype == "BYTE":
return util.get_byte(data, 0)
if datatype == "SINT":
return util.get_sint(data, 0)
if datatype == "USINT":
return util.get_usint(data, 0)
if datatype == "CHAR":
return util.get_char(data, 0)
if datatype == "INT":
return util.get_int(data, 0)
if datatype == "UINT":
return util.get_uint(data, 0)
if datatype == "WORD":
return util.get_word(data, 0)
if datatype == "WCHAR":
return util.get_wchar(data, 0)
if datatype == "DATE":
return util.get_date(data, 0)
if datatype == "DINT":
return util.get_dint(data, 0)
if datatype == "UDINT":
return util.get_udint(data, 0)
if datatype == "DWORD":
return util.get_dword(data, 0)
if datatype == "REAL":
return util.get_real(data, 0)
if datatype == "TIME":
return util.get_time(data, 0)
if datatype == "TOD":
return util.get_tod(data, 0)
if datatype == "LINT":
return util.get_lint(data, 0)
if datatype == "ULINT":
return util.get_ulint(data, 0)
if datatype == "LWORD":
return util.get_lword(data, 0)
if datatype == "LREAL":
return util.get_lreal(data, 0)
if datatype == "LTIME":
return util.get_ltime(data, 0)
if datatype == "LTOD":
return util.get_ltod(data, 0)
if datatype == "LDT":
return util.get_ldt(data, 0)
if datatype == "DT":
return util.get_dt(data, 0)
if datatype == "DTL":
return util.get_dtl(data, 0)
raise ValueError(f"Unsupported tag datatype: {datatype}")
def _encode_tag(tag: Tag, buf: bytearray, value: Any) -> None:
"""Encode a typed Python value into a Tag's byte buffer."""
upper = tag.datatype.upper()
match = _STRING_RE.match(upper)
if match:
kind, length = match.group(1), int(match.group(2))
if kind == "FSTRING":
util.set_fstring(buf, 0, value, length)
return
if kind == "STRING":
util.set_string(buf, 0, value, length)
return
if kind == "WSTRING":
util.set_wstring(buf, 0, value, length)
return
if tag.count > 1:
per = tag.size // tag.count
for i, v in enumerate(value):
_encode_scalar(upper, buf, i * per, v, tag.bit)
return
_encode_scalar(upper, buf, 0, value, tag.bit)
def _encode_scalar(datatype: str, buf: bytearray, offset: int, value: Any, bit: int) -> None:
"""Encode a single scalar value at the given offset."""
if datatype == "BOOL":
util.set_bool(buf, offset, bit, value)
return
if datatype in ("BYTE", "USINT"):
util.set_byte(buf, offset, value) if datatype == "BYTE" else util.set_usint(buf, offset, value)
return
if datatype == "SINT":
util.set_sint(buf, offset, value)
return
if datatype == "CHAR":
util.set_char(buf, offset, value)
return
if datatype == "INT":
util.set_int(buf, offset, value)
return
if datatype == "UINT":
util.set_uint(buf, offset, value)
return
if datatype == "WORD":
util.set_word(buf, offset, value)
return
if datatype == "WCHAR":
util.set_wchar(buf, offset, value)
return
if datatype == "DATE":
util.set_date(buf, offset, value)
return
if datatype == "DINT":
util.set_dint(buf, offset, value)
return
if datatype == "UDINT":
util.set_udint(buf, offset, value)
return
if datatype == "DWORD":
util.set_dword(buf, offset, value)
return
if datatype == "REAL":
util.set_real(buf, offset, value)
return
if datatype == "TIME":
util.set_time(buf, offset, value)
return
if datatype == "TOD":
util.set_tod(buf, offset, value)
return
if datatype == "LINT":
util.set_lint(buf, offset, value)
return
if datatype == "ULINT":
util.set_ulint(buf, offset, value)
return
if datatype == "LWORD":
util.set_lword(buf, offset, value)
return
if datatype == "LREAL":
util.set_lreal(buf, offset, value)
return
if datatype == "LTIME":
util.set_ltime(buf, offset, value)
return
if datatype == "LTOD":
util.set_ltod(buf, offset, value)
return
if datatype == "LDT":
util.set_ldt(buf, offset, value)
return
if datatype == "DT":
util.set_dt(buf, offset, value)
return
if datatype == "DTL":
util.set_dtl(buf, offset, value)
return
raise ValueError(f"Unsupported tag datatype: {datatype}")
class _OptimizationPlan:
"""Cached optimization plan for repeated read_multi_vars calls with the same layout."""
def __init__(self, cache_key: tuple[int, ...], packets: list[ReadPacket], read_items: list[ReadItem]):
self.cache_key = cache_key
self.packets = packets
self.read_items = read_items
[docs]
class Client(ClientMixin):
"""
Legacy S7 client for classic PUT/GET communication.
Supports S7-300, S7-400, S7-1200 and S7-1500 PLCs via the classic S7
protocol. For new projects, use :class:`s7.Client` instead, which
automatically selects the best protocol for any supported PLC.
Examples:
>>> from s7 import Client
>>> client = Client()
>>> client.connect("192.168.1.10", 0, 1)
>>> data = client.db_read(1, 0, 4)
>>> client.disconnect()
"""
MAX_VARS = 20 # Max variables per multi-read/multi-write request
[docs]
def __init__(
self,
lib_location: Optional[str] = None,
*,
auto_reconnect: bool = False,
max_retries: int = 3,
retry_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 30.0,
heartbeat_interval: float = 0,
on_disconnect: Optional[Callable[[], None]] = None,
on_reconnect: Optional[Callable[[], None]] = None,
**kwargs: Any,
):
"""
Initialize S7 client.
Args:
lib_location: Ignored. Kept for backwards compatibility.
auto_reconnect: Enable automatic reconnection on connection loss.
max_retries: Maximum number of reconnection attempts.
retry_delay: Initial delay between reconnection attempts in seconds.
backoff_factor: Multiplier for exponential backoff between retries.
max_delay: Maximum delay between reconnection attempts in seconds.
heartbeat_interval: Interval in seconds for heartbeat probes (0=disabled).
on_disconnect: Optional callback invoked when connection is lost.
on_reconnect: Optional callback invoked after successful reconnection.
**kwargs: Ignored. Kept for backwards compatibility.
"""
self.connection: Optional[ISOTCPConnection] = None
self.protocol = S7Protocol()
self.connected = False
self.host = ""
self.port = 102
self.rack = 0
self.slot = 0
self.pdu_length = 480 # Negotiated PDU length
# Connection parameters
self.local_tsap = 0x0100 # Default local TSAP
self.remote_tsap = 0x0102 # Default remote TSAP
self.connection_type = 1 # PG
# Session password
self.session_password: Optional[str] = None
# Execution time tracking
self._exec_time = 0
self.last_error = 0
# Parameter storage
self._params = {
Parameter.LocalPort: 0,
Parameter.RemotePort: 102,
Parameter.PingTimeout: 750,
Parameter.SendTimeout: 10,
Parameter.RecvTimeout: 3000,
Parameter.SrcRef: 256,
Parameter.DstRef: 0,
Parameter.SrcTSap: 256,
Parameter.PDURequest: 480,
}
# Multi-read optimizer state
self._opt_plan: Optional[_OptimizationPlan] = None
self.multi_read_max_gap: int = 5
self.use_optimizer: bool = True
self.max_parallel: int = 1
# Async operation state
self._async_pending = False
self._async_result: Optional[bytearray] = None
self._async_error: Optional[int] = None
self._last_error = 0
self._exec_time = 0
# Auto-reconnection settings
self._auto_reconnect = auto_reconnect
self._max_retries = max_retries
self._retry_delay = retry_delay
self._backoff_factor = backoff_factor
self._max_delay = max_delay
self._on_disconnect = on_disconnect
self._on_reconnect = on_reconnect
# Heartbeat settings
self._heartbeat_interval = heartbeat_interval
self._heartbeat_thread: Optional[threading.Thread] = None
self._heartbeat_stop_event = threading.Event()
self._is_alive = False
# Lock for thread safety during reconnection and heartbeat
self._reconnect_lock = threading.RLock()
# Structured logger with PLC context (updated on connect)
self.logger: PLCLoggerAdapter = PLCLoggerAdapter(logger)
self.logger.info("S7Client initialized (pure Python implementation)")
@property
def is_alive(self) -> bool:
"""Whether the connection is alive according to the last heartbeat probe.
Returns True if heartbeat is disabled but the client is connected,
or if the last heartbeat probe succeeded.
"""
if self._heartbeat_interval <= 0:
return self.connected
return self._is_alive
def _get_connection(self) -> ISOTCPConnection:
"""Get connection, raising if not connected."""
if self.connection is None:
raise S7ConnectionError("Not connected to PLC")
return self.connection
def _send_receive(self, request: bytes, max_stale_retries: int = 3) -> dict[str, Any]:
"""Send a request and receive/parse the response with stale packet retry.
Wraps the repeated send_data -> receive_data -> parse_response pattern
with PDU reference validation and automatic retry on stale packets.
Acquires ``_reconnect_lock`` to prevent conflicts with the heartbeat
thread.
Args:
request: Complete S7 PDU to send.
max_stale_retries: Max times to retry receive on stale packets.
Returns:
Parsed S7 response dict.
Raises:
S7PacketLostError: If a packet loss is detected.
S7ProtocolError: If all retries are exhausted or other protocol error.
"""
conn = self._get_connection()
with self._reconnect_lock:
conn.send_data(request)
for attempt in range(max_stale_retries + 1):
response_data = conn.receive_data()
response = self.protocol.parse_response(response_data)
try:
self.protocol.validate_pdu_reference(response["sequence"])
return response
except S7StalePacketError:
if attempt < max_stale_retries:
logger.warning(f"Stale packet (attempt {attempt + 1}/{max_stale_retries}), retrying receive")
continue
raise S7ProtocolError(f"Max stale packet retries ({max_stale_retries}) exceeded")
raise S7ProtocolError("Failed to receive valid response") # Should not reach here
def _send_receive_with_reconnect(self, request_builder: Callable[[], bytes], max_stale_retries: int = 3) -> dict[str, Any]:
"""Send a request with automatic reconnection on connection loss.
If auto_reconnect is disabled, behaves identically to _send_receive.
When enabled, catches connection errors, reconnects, rebuilds the request
(since the protocol sequence counter may have changed), and retries.
Args:
request_builder: Callable that builds the request bytes. Called again
after reconnection to get a fresh request with updated sequence.
max_stale_retries: Max times to retry receive on stale packets.
Returns:
Parsed S7 response dict.
"""
try:
return self._send_receive(request_builder(), max_stale_retries)
except (S7ConnectionError, OSError) as e:
if not self._auto_reconnect:
raise
logger.warning(f"Connection lost during operation: {e}")
self._do_reconnect()
return self._send_receive(request_builder(), max_stale_retries)
def _do_reconnect(self) -> None:
"""Perform reconnection with exponential backoff and jitter.
Raises:
S7ConnectionError: If all reconnection attempts fail.
"""
with self._reconnect_lock:
# Check if another thread already reconnected
if self.connected and self.connection is not None:
try:
if self.connection.check_connection():
return
except Exception:
pass
self._is_alive = False
if self._on_disconnect is not None:
try:
self._on_disconnect()
except Exception:
logger.debug("on_disconnect callback raised an exception", exc_info=True)
delay = self._retry_delay
last_error: Optional[Exception] = None
for attempt in range(1, self._max_retries + 1):
logger.info(f"Reconnection attempt {attempt}/{self._max_retries}")
# Clean up old connection
try:
if self.connection is not None:
self.connection.disconnect()
self.connection = None
except Exception:
pass
self.connected = False
try:
# Re-establish connection using stored parameters
self.connection = ISOTCPConnection(
host=self.host, port=self.port, local_tsap=self.local_tsap, remote_tsap=self.remote_tsap
)
self.connection.connect()
# Re-create protocol to reset sequence counters
self.protocol = S7Protocol()
self._setup_communication()
self.connected = True
self._is_alive = True
logger.info(f"Reconnected to {self.host}:{self.port}")
if self._on_reconnect is not None:
try:
self._on_reconnect()
except Exception:
logger.debug("on_reconnect callback raised an exception", exc_info=True)
return
except Exception as e:
last_error = e
logger.warning(f"Reconnection attempt {attempt} failed: {e}")
if attempt < self._max_retries:
# Exponential backoff with jitter
jitter = random.uniform(0, delay * 0.1)
sleep_time = min(delay + jitter, self._max_delay)
logger.debug(f"Waiting {sleep_time:.2f}s before next attempt")
time.sleep(sleep_time)
delay = min(delay * self._backoff_factor, self._max_delay)
raise S7ConnectionError(f"Reconnection failed after {self._max_retries} attempts: {last_error}")
def _start_heartbeat(self) -> None:
"""Start the heartbeat background thread."""
if self._heartbeat_interval <= 0:
return
self._heartbeat_stop_event.clear()
self._is_alive = True
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True, name="s7-heartbeat")
self._heartbeat_thread.start()
logger.debug(f"Heartbeat started with interval {self._heartbeat_interval}s")
def _stop_heartbeat(self) -> None:
"""Stop the heartbeat background thread."""
self._heartbeat_stop_event.set()
if self._heartbeat_thread is not None:
self._heartbeat_thread.join(timeout=self._heartbeat_interval + 2)
self._heartbeat_thread = None
logger.debug("Heartbeat stopped")
def _heartbeat_loop(self) -> None:
"""Background loop that periodically probes the PLC connection."""
while not self._heartbeat_stop_event.is_set():
if self._heartbeat_stop_event.wait(timeout=self._heartbeat_interval):
break # Stop event was set
if not self.connected:
self._is_alive = False
if self._auto_reconnect:
try:
self._do_reconnect()
except S7ConnectionError:
logger.warning("Heartbeat reconnection failed")
continue
try:
with self._reconnect_lock:
if self.connected and self.connection is not None:
self.get_cpu_state()
self._is_alive = True
except Exception as e:
logger.warning(f"Heartbeat probe failed: {e}")
self._is_alive = False
self.connected = False
if self._auto_reconnect:
try:
self._do_reconnect()
except S7ConnectionError:
logger.warning("Heartbeat reconnection failed")
[docs]
def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "Client":
"""
Connect to S7 PLC.
Args:
address: PLC IP address
rack: Rack number
slot: Slot number
tcp_port: TCP port (default 102)
Returns:
Self for method chaining
"""
self.host = address
self.port = tcp_port
self.rack = rack
self.slot = slot
self._params[Parameter.RemotePort] = tcp_port
# Calculate TSAP values from rack/slot
# Remote TSAP: rack and slot encoded as per S7 specification
self.remote_tsap = 0x0100 | (rack << 5) | slot
try:
start_time = time.time()
# Establish ISO on TCP connection
self.connection = ISOTCPConnection(
host=address, port=tcp_port, local_tsap=self.local_tsap, remote_tsap=self.remote_tsap
)
self.connection.connect()
# Setup communication and negotiate PDU length
self._setup_communication()
self.connected = True
self._is_alive = True
self._exec_time = int((time.time() - start_time) * 1000)
self.logger.update_context(plc_host=address, rack=rack, slot=slot, protocol="legacy")
self.logger.info(f"Connected to {address}:{tcp_port} rack {rack} slot {slot}")
# Start heartbeat if configured
self._start_heartbeat()
# Auto-tune parallel dispatch based on PDU size
if self.use_optimizer:
self._auto_tune_parallel()
except Exception as e:
self.disconnect()
if isinstance(e, S7Error):
raise
else:
raise S7ConnectionError(f"Connection failed: {e}")
return self
[docs]
def connect_routed(
self,
host: str,
router_rack: int,
router_slot: int,
subnet: int,
dest_rack: int,
dest_slot: int,
port: int = 102,
timeout: float = 5.0,
) -> "Client":
"""Connect to an S7 PLC via a routing gateway on another subnet.
The gateway PLC (identified by *host*, *router_rack*, *router_slot*)
forwards the connection to the target PLC (identified by *subnet*,
*dest_rack*, *dest_slot*) through S7 routing parameters embedded in
the COTP Connection Request.
.. warning:: This method is experimental and may change in future versions.
Args:
host: IP address of the routing gateway PLC
router_rack: Rack number of the gateway PLC
router_slot: Slot number of the gateway PLC
subnet: Subnet ID of the target network (0x0000-0xFFFF)
dest_rack: Rack number of the destination PLC
dest_slot: Slot number of the destination PLC
port: TCP port (default 102)
timeout: Connection timeout in seconds
Returns:
Self for method chaining
"""
self.host = host
self.port = port
self.rack = router_rack
self.slot = router_slot
self._params[Parameter.RemotePort] = port
# Remote TSAP targets the gateway rack/slot
self.remote_tsap = 0x0100 | (router_rack << 5) | router_slot
try:
start_time = time.time()
self.connection = ISOTCPConnection(
host=host,
port=port,
local_tsap=self.local_tsap,
remote_tsap=self.remote_tsap,
)
self.connection.set_routing(subnet, dest_rack, dest_slot)
self.connection.connect(timeout=timeout)
# Setup communication and negotiate PDU length
self._setup_communication()
self.connected = True
self._exec_time = int((time.time() - start_time) * 1000)
logger.info(
f"Connected (routed) to {host}:{port} via rack {router_rack} slot {router_slot}, "
f"subnet {subnet:#06x} -> rack {dest_rack} slot {dest_slot}"
)
except Exception as e:
self.disconnect()
if isinstance(e, S7Error):
raise
else:
raise S7ConnectionError(f"Routed connection failed: {e}")
return self
[docs]
def disconnect(self) -> int:
"""Disconnect from S7 PLC.
Returns:
0 on success
"""
# Stop heartbeat first
self._stop_heartbeat()
if self.connection:
self.connection.disconnect()
self.connection = None
self.connected = False
self._is_alive = False
self._opt_plan = None
logger.info(f"Disconnected from {self.host}:{self.port}")
return 0
[docs]
def create(self) -> None:
"""Create client instance (no-op for compatibility)."""
pass
[docs]
def destroy(self) -> None:
"""Destroy client instance."""
self.disconnect()
[docs]
def get_connected(self) -> bool:
"""Check if client is connected to PLC.
Performs an active check on the underlying TCP socket to detect
broken connections, rather than just checking a cached flag.
"""
if not self.connected or self.connection is None:
return False
return self.connection.check_connection()
[docs]
def db_read_array(self, db_number: int, start: int, count: int, fmt: str = ">f") -> list[Any]:
"""Read an array of typed values from a DB.
Reads *count* consecutive values of the given struct format starting
at *start* byte offset in DB *db_number*.
Args:
db_number: DB number to read from.
start: Start byte offset.
count: Number of values to read.
fmt: :mod:`struct` format for a single value (default ``">f"`` = REAL).
Returns:
List of unpacked values.
Examples:
Read 10 REAL values starting at DB1.0::
values = client.db_read_array(1, 0, 10, ">f")
Read 20 INT values starting at DB1.100::
values = client.db_read_array(1, 100, 20, ">h")
"""
item_size = struct.calcsize(fmt)
total_size = item_size * count
data = self.db_read(db_number, start, total_size)
return [struct.unpack_from(fmt, data, i * item_size)[0] for i in range(count)]
[docs]
def db_write_array(self, db_number: int, start: int, values: list[Any], fmt: str = ">f") -> int:
"""Write an array of typed values to a DB.
Packs *values* using the given struct format and writes them
starting at *start* byte offset in DB *db_number*.
Args:
db_number: DB number to write to.
start: Start byte offset.
values: List of values to write.
fmt: :mod:`struct` format for a single value (default ``">f"`` = REAL).
Returns:
0 on success.
Examples:
Write 10 REAL values starting at DB1.0::
client.db_write_array(1, 0, [1.0, 2.0, 3.0], ">f")
"""
item_size = struct.calcsize(fmt)
data = bytearray(item_size * len(values))
for i, v in enumerate(values):
struct.pack_into(fmt, data, i * item_size, v)
return self.db_write(db_number, start, data)
[docs]
def read_tag(self, tag: "Union[Tag, str]") -> Any:
"""Read a typed value by :class:`Tag` or address string.
Accepts a :class:`~snap7.tags.Tag` or a PLC4X-style address string
(e.g. ``"DB1.DBX0.0:BOOL"``, ``"DB1:10:INT"``, ``"M10.5:BOOL"``).
Args:
tag: A :class:`Tag` instance or a parseable address string.
Returns:
The typed value (bool/int/float/datetime/str depending on type).
Example::
client.read_tag("DB1.DBX0.0:BOOL") # bit
client.read_tag("DB1.DBD4:REAL") # float
client.read_tag("DB1:20:STRING[30]") # variable-length string
client.read_tag(Tag(Area.DB, 1, 0, "REAL")) # from Tag instance
"""
resolved = Tag.from_string(tag) if isinstance(tag, str) else tag
if resolved.is_symbolic:
raise NotImplementedError(
"Symbolic (LID-based) tag access requires S7CommPlus. Use s7.Client instead of snap7.Client."
)
data = self.read_area(Area(resolved.area), resolved.db_number, resolved.byte_offset, resolved.size)
return _decode_tag(resolved, bytearray(data))
[docs]
def write_tag(self, tag: "Union[Tag, str]", value: Any) -> int:
"""Write a typed value by :class:`Tag` or address string.
Args:
tag: A :class:`Tag` instance or a parseable address string.
value: The value to write (type must match the tag's datatype).
Returns:
0 on success.
"""
resolved = Tag.from_string(tag) if isinstance(tag, str) else tag
if resolved.is_symbolic:
raise NotImplementedError(
"Symbolic (LID-based) tag access requires S7CommPlus. Use s7.Client instead of snap7.Client."
)
size = resolved.size
buf = bytearray(size)
# For BOOL writes, we need the current byte to preserve other bits
if resolved.datatype.upper() == "BOOL":
current = self.read_area(Area(resolved.area), resolved.db_number, resolved.byte_offset, 1)
buf[0] = current[0]
_encode_tag(resolved, buf, value)
return self.write_area(Area(resolved.area), resolved.db_number, resolved.byte_offset, buf)
[docs]
def db_read(self, db_number: int, start: int, size: int) -> bytearray:
"""
Read data from DB.
Args:
db_number: DB number to read from
start: Start byte offset
size: Number of bytes to read
Returns:
Data read from DB
"""
with OperationLogger(self.logger, "db_read", db=db_number, start=start, size=size):
data = self.read_area(Area.DB, db_number, start, size)
return data
[docs]
def db_write(self, db_number: int, start: int, data: bytearray) -> int:
"""
Write data to DB.
Args:
db_number: DB number to write to
start: Start byte offset
data: Data to write
Returns:
0 on success
"""
logger.debug(f"db_write: DB{db_number}, start={start}, size={len(data)}")
self.write_area(Area.DB, db_number, start, data)
return 0
[docs]
def db_get(self, db_number: int, size: int = 0) -> bytearray:
"""
Get entire DB.
Uses get_block_info() to determine the DB size automatically.
If the PLC does not support get_block_info() or reports an
incorrect size (common on S7-1200/1500), pass the size parameter
explicitly.
Args:
db_number: DB number to read
size: DB size in bytes. If 0, the size is determined
automatically via get_block_info().
Returns:
Entire DB contents
"""
if size <= 0:
block_info = self.get_block_info(Block.DB, db_number)
size = block_info.MC7Size if block_info.MC7Size > 0 else 65536
try:
return self.db_read(db_number, 0, size)
except S7Error:
raise S7Error(
f"db_get failed for DB{db_number} with auto-detected size {size}. "
f"Some PLCs (e.g. S7-1200) report incorrect MC7Size in block info. "
f"Try passing the actual DB size explicitly: client.db_get({db_number}, size=<actual_size>)"
)
[docs]
def db_fill(self, db_number: int, filler: int, size: int = 0) -> int:
"""
Fill a DB with a filler byte.
Uses get_block_info() to determine the DB size automatically.
If the PLC does not support get_block_info() or reports an
incorrect size (common on S7-1200/1500), pass the size parameter
explicitly.
Args:
db_number: DB number to fill
filler: Byte value to fill with
size: DB size in bytes. If 0, the size is determined
automatically via get_block_info().
Returns:
0 on success
"""
if size <= 0:
block_info = self.get_block_info(Block.DB, db_number)
size = block_info.MC7Size if block_info.MC7Size > 0 else 65536
data = bytearray([filler] * size)
try:
return self.db_write(db_number, 0, data)
except S7Error:
raise S7Error(
f"db_fill failed for DB{db_number} with auto-detected size {size}. "
f"Some PLCs (e.g. S7-1200) report incorrect MC7Size in block info. "
f"Try passing the actual DB size explicitly: client.db_fill({db_number}, {filler}, size=<actual_size>)"
)
[docs]
def read_area(self, area: Area, db_number: int, start: int, size: int, word_len: Optional[WordLen] = None) -> bytearray:
"""
Read data from memory area.
Automatically splits into multiple requests if size exceeds PDU capacity.
Args:
area: Memory area to read from
db_number: DB number (for DB area only)
start: Start address
size: Number of items to read (for TM/CT: timers/counters, for others: bytes)
word_len: Optional word length override. If None, defaults to area-based logic
(TIMER for TM, COUNTER for CT, BYTE for others).
Returns:
Data read from area
"""
start_time = time.time()
# Map area enum to native area
s7_area = self._map_area(area)
# Determine word length
if word_len is not None:
s7_word_len = S7WordLen(word_len)
elif area == Area.TM:
s7_word_len = S7WordLen.TIMER
elif area == Area.CT:
s7_word_len = S7WordLen.COUNTER
else:
s7_word_len = S7WordLen.BYTE
max_chunk = self._max_read_size()
if size <= max_chunk:
# Single request - use reconnect-aware send/receive
def build_request() -> bytes:
return self.protocol.build_read_request(
area=s7_area, db_number=db_number, start=start, word_len=s7_word_len, count=size
)
response = self._send_receive_with_reconnect(build_request)
values = self.protocol.extract_read_data(response, s7_word_len, size)
self._exec_time = int((time.time() - start_time) * 1000)
return bytearray(values)
# Split into chunks
result = bytearray()
offset = 0
remaining = size
while remaining > 0:
chunk_size = min(remaining, max_chunk)
chunk_offset = offset
def build_chunk_request(o: int = chunk_offset, cs: int = chunk_size) -> bytes:
return self.protocol.build_read_request(
area=s7_area, db_number=db_number, start=start + o, word_len=s7_word_len, count=cs
)
response = self._send_receive_with_reconnect(build_chunk_request)
values = self.protocol.extract_read_data(response, s7_word_len, chunk_size)
result.extend(values)
offset += chunk_size
remaining -= chunk_size
self._exec_time = int((time.time() - start_time) * 1000)
return result
[docs]
def write_area(self, area: Area, db_number: int, start: int, data: bytearray, word_len: Optional[WordLen] = None) -> int:
"""
Write data to memory area.
Automatically splits into multiple requests if data exceeds PDU capacity.
Args:
area: Memory area to write to
db_number: DB number (for DB area only)
start: Start address
data: Data to write
word_len: Optional word length override. If None, defaults to area-based logic
(TIMER for TM, COUNTER for CT, BYTE for others).
Returns:
0 on success
"""
start_time = time.time()
# Map area enum to native area
s7_area = self._map_area(area)
# Determine word length
if word_len is not None:
s7_word_len = S7WordLen(word_len)
elif area == Area.TM:
s7_word_len = S7WordLen.TIMER
elif area == Area.CT:
s7_word_len = S7WordLen.COUNTER
else:
s7_word_len = S7WordLen.BYTE
max_chunk = self._max_write_size()
if len(data) <= max_chunk:
# Single request
def build_request() -> bytes:
return self.protocol.build_write_request(
area=s7_area, db_number=db_number, start=start, word_len=s7_word_len, data=bytes(data)
)
response = self._send_receive_with_reconnect(build_request)
self.protocol.check_write_response(response)
self._exec_time = int((time.time() - start_time) * 1000)
return 0
# Split into chunks
offset = 0
remaining = len(data)
while remaining > 0:
chunk_size = min(remaining, max_chunk)
chunk_data = data[offset : offset + chunk_size]
chunk_offset = offset
def build_chunk_request(o: int = chunk_offset, cd: bytes = bytes(chunk_data)) -> bytes:
return self.protocol.build_write_request(
area=s7_area, db_number=db_number, start=start + o, word_len=s7_word_len, data=cd
)
response = self._send_receive_with_reconnect(build_chunk_request)
self.protocol.check_write_response(response)
offset += chunk_size
remaining -= chunk_size
self._exec_time = int((time.time() - start_time) * 1000)
return 0
[docs]
def read_multi_vars(self, items: Union[List[dict[str, Any]], "Array[S7DataItem]"]) -> Tuple[int, Any]:
"""Read multiple variables in a single request.
When given a list of dicts with two or more items, uses the multi-variable
read optimizer to merge adjacent reads and pack them into minimal PDU
exchanges. This significantly reduces the number of round-trips compared
to reading each variable individually.
.. warning::
The read optimizer is **experimental** and may change in future
versions. Disable it with ``client.use_optimizer = False`` if you
encounter issues.
Args:
items: List of item specifications (dicts with ``area``, ``start``,
``size``, and optionally ``db_number``) **or** a ctypes
``Array[S7DataItem]``.
Returns:
Tuple of (result_code, data) where *data* is either the updated
ctypes array or a list of bytearrays in the original item order.
Raises:
ValueError: If more than MAX_VARS items are requested.
"""
if not items:
return (0, items)
if len(items) > self.MAX_VARS:
raise ValueError(f"Too many items: {len(items)} exceeds MAX_VARS ({self.MAX_VARS})")
# Handle S7DataItem array (ctypes) -- unchanged legacy path
if hasattr(items, "_type_") and hasattr(items[0], "Area"):
s7_items = cast("Array[S7DataItem]", items)
for s7_item in s7_items:
area = Area(s7_item.Area)
db_number = s7_item.DBNumber
start = s7_item.Start
size = s7_item.Amount
data = self.read_area(area, db_number, start, size)
if s7_item.pData:
for i, b in enumerate(data):
s7_item.pData[i] = b
return (0, items)
# Dict list path -- use optimizer for 2+ items
dict_items = cast(List[dict[str, Any]], items)
if len(dict_items) <= 1 or not self.use_optimizer:
# Single item or optimizer disabled: no optimization needed
results: list[bytearray] = []
for dict_item in dict_items:
area = dict_item["area"]
db_number = dict_item.get("db_number", 0)
start = dict_item["start"]
size = dict_item["size"]
data = self.read_area(area, db_number, start, size)
results.append(data)
return (0, results)
return self._read_multi_vars_optimized(dict_items)
# PDU size → max_parallel mapping. Smaller PDUs indicate older/smaller
# PLCs with fewer resources, so we stay sequential for safety.
_PARALLEL_THRESHOLDS: list[Tuple[int, int]] = [
(960, 8),
(480, 4),
(240, 2),
]
def _auto_tune_parallel(self) -> None:
"""Set *max_parallel* based on negotiated PDU size.
Called automatically after :meth:`connect` when the optimizer is
enabled. Larger PDU sizes indicate more capable PLCs that can
handle multiple in-flight requests.
"""
for threshold, parallel in self._PARALLEL_THRESHOLDS:
if self.pdu_length >= threshold:
self.max_parallel = parallel
break
else:
self.max_parallel = 1
logger.info(f"Auto-tuned max_parallel={self.max_parallel} (PDU={self.pdu_length})")
def _send_receive_parallel(self, requests: list[Tuple[int, bytes]]) -> dict[int, dict[str, Any]]:
"""Fire multiple S7 requests back-to-back and collect responses by sequence number.
All PDUs are sent on the single TCP connection before reading any
responses. Responses are matched to requests via the S7 sequence
number in the header (bytes 4-5).
.. warning::
This method is **experimental** and part of the read optimizer.
Args:
requests: ``(packet_index, pdu_bytes)`` pairs.
Returns:
Dict mapping *packet_index* to the parsed response dict.
"""
conn = self._get_connection()
with self._reconnect_lock:
# Build seq_num → packet_index lookup
pending: dict[int, int] = {}
for packet_index, pdu in requests:
seq = struct.unpack(">H", pdu[4:6])[0]
pending[seq] = packet_index
# Send all requests back-to-back
for _, pdu in requests:
conn.send_data(pdu)
# Receive responses, matching by sequence number
results: dict[int, dict[str, Any]] = {}
remaining = len(requests)
deadline = time.monotonic() + conn.timeout
while remaining > 0:
wait_time = deadline - time.monotonic()
if wait_time <= 0:
raise S7TimeoutError(f"Timeout waiting for {remaining} parallel response(s)")
if not conn.data_available(timeout=wait_time):
raise S7TimeoutError(f"Timeout waiting for {remaining} parallel response(s)")
response_data = conn.receive_data()
response = self.protocol.parse_response(response_data)
resp_seq = response["sequence"]
if resp_seq in pending:
packet_index = pending.pop(resp_seq)
results[packet_index] = response
remaining -= 1
else:
logger.warning(f"Discarding unexpected response with sequence {resp_seq}")
return results
def _read_multi_vars_optimized(self, dict_items: List[dict[str, Any]]) -> Tuple[int, List[bytearray]]:
"""Optimized multi-variable read using merge + packetize strategy.
Args:
dict_items: List of item dicts (area, db_number, start, size).
Returns:
Tuple of (0, list of bytearrays in original order).
"""
# Build ReadItem list
read_items: list[ReadItem] = []
for idx, d in enumerate(dict_items):
area_val = int(d["area"])
db_number = d.get("db_number", 0)
read_items.append(
ReadItem(
area=area_val,
db_number=db_number,
byte_offset=d["start"],
bit_offset=0,
byte_length=d["size"],
index=idx,
)
)
# Build cache key from the item layout
cache_key = tuple(val for ri in read_items for val in (ri.area, ri.db_number, ri.byte_offset, ri.byte_length))
# Reuse cached plan if layout matches
if self._opt_plan is not None and self._opt_plan.cache_key == cache_key:
packets = self._opt_plan.packets
else:
sorted_ri = sort_items(read_items)
max_block = self._max_read_size()
blocks = merge_items(sorted_ri, max_gap=self.multi_read_max_gap, max_block_size=max_block)
packets = packetize(blocks, self.pdu_length)
self._opt_plan = _OptimizationPlan(cache_key, packets, read_items)
# Deep-copy blocks from cached packets so we don't mutate cached state
working_packets = copy.deepcopy(packets)
# Build PDU requests for each packet
packet_requests: list[Tuple[int, bytes, ReadPacket]] = []
for pkt_idx, packet in enumerate(working_packets):
block_specs = [(blk.area, blk.db_number, blk.start_offset, blk.byte_length) for blk in packet.blocks]
if len(block_specs) == 1:
# Single block: use regular read to avoid multi-read overhead
blk = packet.blocks[0]
data = self.read_area(
Area(blk.area) if blk.area in _VALID_AREA_VALUES else Area.DB,
blk.db_number,
blk.start_offset,
blk.byte_length,
)
blk.buffer = data
else:
request = self.protocol.build_multi_read_request(block_specs)
packet_requests.append((pkt_idx, request, packet))
# Execute multi-block packets
if packet_requests:
if self.max_parallel > 1 and len(packet_requests) > 1:
self._execute_packets_parallel(packet_requests)
else:
self._execute_packets_sequential(packet_requests)
# Extract per-item results in original order
results = extract_results(working_packets, len(dict_items))
return (0, results)
def _execute_packets_sequential(self, packet_requests: list[Tuple[int, bytes, ReadPacket]]) -> None:
"""Execute multi-block packets one at a time."""
for _, request, packet in packet_requests:
response = self._send_receive(request)
block_data_list = self.protocol.extract_multi_read_data(response, len(packet.blocks))
for blk, buf in zip(packet.blocks, block_data_list):
blk.buffer = buf
def _execute_packets_parallel(self, packet_requests: list[Tuple[int, bytes, ReadPacket]]) -> None:
"""Execute multi-block packets using parallel dispatch.
Sends up to *max_parallel* PDUs back-to-back before reading
responses, reducing round-trip overhead.
"""
# Process in chunks of max_parallel
for chunk_start in range(0, len(packet_requests), self.max_parallel):
chunk = packet_requests[chunk_start : chunk_start + self.max_parallel]
requests = [(pkt_idx, pdu) for pkt_idx, pdu, _ in chunk]
responses = self._send_receive_parallel(requests)
for pkt_idx, _, packet in chunk:
response = responses[pkt_idx]
block_data_list = self.protocol.extract_multi_read_data(response, len(packet.blocks))
for blk, buf in zip(packet.blocks, block_data_list):
blk.buffer = buf
[docs]
def write_multi_vars(self, items: Union[List[dict[str, Any]], List[S7DataItem]]) -> int:
"""
Write multiple variables in a single request.
Args:
items: List of item specifications with data
Returns:
0 on success
Raises:
ValueError: If more than MAX_VARS items are requested
"""
if not items:
return 0
if len(items) > self.MAX_VARS:
raise ValueError(f"Too many items: {len(items)} exceeds MAX_VARS ({self.MAX_VARS})")
# Handle S7DataItem list (ctypes)
if hasattr(items[0], "Area"):
s7_items = cast(List[S7DataItem], items)
for s7_item in s7_items:
area = Area(s7_item.Area)
db_number = s7_item.DBNumber
start = s7_item.Start
size = s7_item.Amount
# Extract data from pData
data = bytearray(size)
if s7_item.pData:
for i in range(size):
data[i] = s7_item.pData[i]
self.write_area(area, db_number, start, data)
return 0
# Handle dict list
dict_items = cast(List[dict[str, Any]], items)
for dict_item in dict_items:
area = dict_item["area"]
db_number = dict_item.get("db_number", 0)
start = dict_item["start"]
data = dict_item["data"]
self.write_area(area, db_number, start, data)
return 0
[docs]
def list_blocks(self) -> BlocksList:
"""
List blocks available in PLC.
Sends real S7 USER_DATA protocol request to server.
Returns:
Block list structure with counts for each block type
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Build and send list blocks request
request = self.protocol.build_list_blocks_request()
response = self._send_receive(request)
# Check for errors in data section
data_info = response.get("data", {})
return_code = data_info.get("return_code", 0xFF) if isinstance(data_info, dict) else 0xFF
if return_code != 0xFF:
desc = get_return_code_description(return_code)
raise S7ProtocolError(f"List blocks failed: {desc} (0x{return_code:02x})")
return self.protocol.parse_list_blocks(response)
[docs]
def list_blocks_of_type(self, block_type: Block, max_count: int) -> List[int]:
"""
List blocks of a specific type.
Sends real S7 USER_DATA protocol request to server.
Supports multi-packet responses when the block list doesn't fit in one PDU.
Args:
block_type: Type of blocks to list
max_count: Maximum number of blocks to return
Returns:
List of block numbers
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
conn = self._get_connection()
# Map Block enum to S7 block type codes
block_type_codes = {
Block.OB: 0x38, # Organization Block
Block.DB: 0x41, # Data Block
Block.SDB: 0x42, # System Data Block
Block.FC: 0x43, # Function
Block.SFC: 0x44, # System Function
Block.FB: 0x45, # Function Block
Block.SFB: 0x46, # System Function Block
}
type_code = block_type_codes.get(block_type, 0x41) # Default to DB
# Build and send list blocks of type request
request = self.protocol.build_list_blocks_of_type_request(type_code)
response = self._send_receive(request)
# Check for errors in data section
data_info = response.get("data", {})
return_code = data_info.get("return_code", 0xFF) if isinstance(data_info, dict) else 0xFF
if return_code != 0xFF:
desc = get_return_code_description(return_code)
raise S7ProtocolError(f"List blocks of type failed: {desc} (0x{return_code:02x})")
# Accumulate raw data across fragments
accumulated_data = bytearray(data_info.get("data", b"") if isinstance(data_info, dict) else b"")
# Check for multi-packet response
params = response.get("parameters", {})
last_data_unit = params.get("last_data_unit", 0x00) if isinstance(params, dict) else 0x00
sequence_number = params.get("sequence_number", 0) if isinstance(params, dict) else 0
group = params.get("group", 0x03) if isinstance(params, dict) else 0x03
subfunction = params.get("subfunction", 0x02) if isinstance(params, dict) else 0x02
# Accumulate follow-up fragments
for _ in range(100): # Safety limit
if last_data_unit == 0x00:
break
followup = self.protocol.build_userdata_followup_request(group, subfunction, sequence_number)
conn.send_data(followup)
response_data = conn.receive_data()
response = self.protocol.parse_response(response_data)
# Check for errors
data_info = response.get("data", {})
return_code = data_info.get("return_code", 0xFF) if isinstance(data_info, dict) else 0xFF
if return_code != 0xFF:
break
accumulated_data.extend(data_info.get("data", b"") if isinstance(data_info, dict) else b"")
# Update multi-packet state
params = response.get("parameters", {})
last_data_unit = params.get("last_data_unit", 0x00) if isinstance(params, dict) else 0x00
sequence_number = params.get("sequence_number", 0) if isinstance(params, dict) else 0
# Parse block numbers from accumulated data
combined_response: dict[str, Any] = {"data": {"data": bytes(accumulated_data)}}
block_numbers = self.protocol.parse_list_blocks_of_type_response(combined_response)
# Limit to max_count
return block_numbers[:max_count]
[docs]
def get_cpu_info(self) -> S7CpuInfo:
"""Get CPU component identification (SZL 0x001C)."""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
return parse_cpu_info_szl(self.read_szl(0x001C, 0))
[docs]
def get_cpu_state(self) -> str:
"""
Get CPU state (running/stopped).
Returns:
CPU state string
"""
request = self.protocol.build_cpu_state_request()
response = self._send_receive(request)
return self.protocol.extract_cpu_state(response)
[docs]
def get_block_info(self, block_type: Block, db_number: int) -> TS7BlockInfo:
"""
Get block information.
Sends real S7 USER_DATA protocol request to server.
Args:
block_type: Type of block
db_number: Block number
Returns:
Block information structure
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Map Block enum to S7 block type code
block_type_map = {
Block.OB: 0x38,
Block.DB: 0x41,
Block.SDB: 0x42,
Block.FC: 0x43,
Block.SFC: 0x44,
Block.FB: 0x45,
Block.SFB: 0x46,
}
type_code = block_type_map.get(block_type, 0x41)
# Build and send get block info request
request = self.protocol.build_get_block_info_request(type_code, db_number)
response = self._send_receive(request)
# Check for errors in data section
data_info = response.get("data", {})
return_code = data_info.get("return_code", 0xFF) if isinstance(data_info, dict) else 0xFF
if return_code != 0xFF:
desc = get_return_code_description(return_code)
raise S7ProtocolError(f"Get block info failed: {desc} (0x{return_code:02x})")
return self.protocol.parse_get_block_info(response)
[docs]
def upload(self, block_num: int) -> bytearray:
"""
Upload block from PLC.
Sends real S7 protocol requests: START_UPLOAD, UPLOAD, END_UPLOAD.
Args:
block_num: Block number to upload
Returns:
Block data
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Block type 0x41 = DB
block_type = 0x41
# Step 1: Start upload
request = self.protocol.build_start_upload_request(block_type, block_num)
response = self._send_receive(request)
# Parse upload ID from response
upload_info = self.protocol.parse_start_upload_response(response)
upload_id = upload_info.get("upload_id", 1)
# Step 2: Upload (get data)
request = self.protocol.build_upload_request(upload_id)
response = self._send_receive(request)
# Extract block data
block_data = self.protocol.parse_upload_response(response)
# Step 3: End upload
request = self.protocol.build_end_upload_request(upload_id)
response = self._send_receive(request)
logger.info(f"Uploaded {len(block_data)} bytes from block {block_num}")
return bytearray(block_data)
[docs]
def download(self, data: bytearray, block_num: int = -1) -> int:
"""
Download block to PLC.
Sends real S7 protocol requests: REQUEST_DOWNLOAD, DOWNLOAD_BLOCK, DOWNLOAD_ENDED.
Args:
data: Block data to download
block_num: Block number (-1 to extract from data)
Returns:
0 on success
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
conn = self._get_connection()
# Block type 0x41 = DB
block_type = 0x41
# Extract block number from data if not specified
if block_num == -1:
if len(data) >= 8:
block_num = struct.unpack(">H", data[6:8])[0]
else:
block_num = 1 # Default
# Step 1: Request download
request = self.protocol.build_download_request(block_type, block_num, bytes(data))
self._send_receive(request)
# Step 2: Download block (send data)
# Build a simple download block PDU
param_data = struct.pack(
">BBB",
0x1B, # S7Function.DOWNLOAD_BLOCK
0x01, # Status: last packet
0x00, # Reserved
)
# Data section: data to write
data_section = struct.pack(">HH", len(data), 0x00FB) + bytes(data)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
0x01, # PDU type REQUEST
0x0000, # Reserved
self.protocol._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
conn.send_data(header + param_data + data_section)
response_data = conn.receive_data()
self.protocol.parse_response(response_data)
# Step 3: Download ended
param_data = struct.pack(">B", 0x1C) # S7Function.DOWNLOAD_ENDED
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
0x01, # PDU type REQUEST
0x0000, # Reserved
self.protocol._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
conn.send_data(header + param_data)
response_data = conn.receive_data()
self.protocol.parse_response(response_data)
logger.info(f"Downloaded {len(data)} bytes to block {block_num}")
return 0
[docs]
def delete(self, block_type: Block, block_num: int) -> int:
"""Delete a block from PLC.
Sends real S7 PLC_CONTROL protocol with PI service "_DELE".
Args:
block_type: Type of block (DB, OB, FB, FC, etc.)
block_num: Block number to delete
Returns:
0 on success
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Map Block enum to S7 block type code
block_type_map = {
Block.OB: 0x38,
Block.DB: 0x41,
Block.SDB: 0x42,
Block.FC: 0x43,
Block.SFC: 0x44,
Block.FB: 0x45,
Block.SFB: 0x46,
}
type_code = block_type_map.get(block_type, 0x41)
# Build and send delete request
request = self.protocol.build_delete_block_request(type_code, block_num)
response = self._send_receive(request)
self.protocol.check_control_response(response)
logger.info(f"Deleted block {block_type.name} {block_num}")
return 0
[docs]
def full_upload(self, block_type: Block, block_num: int) -> Tuple[bytearray, int]:
"""Upload a block from PLC with header and footer info.
The whole block (including header and footer) is copied into the
user buffer.
Sends real S7 protocol requests: START_UPLOAD, UPLOAD, END_UPLOAD.
Args:
block_type: Type of block (DB, OB, FB, FC, etc.)
block_num: Block number to upload
Returns:
Tuple of (buffer, size) where buffer contains the complete block
with headers and size is the actual data length.
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Map Block enum to S7 block type code
block_type_map = {
Block.OB: 0x38,
Block.DB: 0x41,
Block.SDB: 0x42,
Block.FC: 0x43,
Block.SFC: 0x44,
Block.FB: 0x45,
Block.SFB: 0x46,
}
type_code = block_type_map.get(block_type, 0x41)
# Step 1: Start upload
request = self.protocol.build_start_upload_request(type_code, block_num)
response = self._send_receive(request)
# Parse upload ID from response
upload_info = self.protocol.parse_start_upload_response(response)
upload_id = upload_info.get("upload_id", 1)
# Step 2: Upload (get data)
request = self.protocol.build_upload_request(upload_id)
response = self._send_receive(request)
# Extract block data
block_data = self.protocol.parse_upload_response(response)
# Step 3: End upload
request = self.protocol.build_end_upload_request(upload_id)
response = self._send_receive(request)
# Build full block with MC7 header
# S7 block structure: MC7 header + data + footer
block_header = struct.pack(
">BBHBBBBHH",
0x70, # Block type marker
block_type.value, # Block type
block_num, # Block number
0x00, # Language
0x00, # Properties
0x00, # Reserved
0x00, # Reserved
len(block_data) + 14, # Block length (header + data + footer)
len(block_data), # MC7 code length
)
block_footer = b"\x00" * 4 # Footer
full_block = bytearray(block_header + block_data + block_footer)
logger.info(f"Full upload of block {block_type.name} {block_num}: {len(full_block)} bytes")
return full_block, len(full_block)
[docs]
def plc_stop(self) -> int:
"""Stop PLC CPU.
Returns:
0 on success
"""
request = self.protocol.build_plc_control_request("stop")
response = self._send_receive(request)
self.protocol.check_control_response(response)
return 0
[docs]
def plc_hot_start(self) -> int:
"""Hot start PLC CPU.
Returns:
0 on success
"""
request = self.protocol.build_plc_control_request("hot_start")
response = self._send_receive(request)
self.protocol.check_control_response(response)
return 0
[docs]
def plc_cold_start(self) -> int:
"""Cold start PLC CPU.
Returns:
0 on success
"""
request = self.protocol.build_plc_control_request("cold_start")
response = self._send_receive(request)
self.protocol.check_control_response(response)
return 0
[docs]
def get_plc_datetime(self) -> datetime:
"""
Get PLC date/time.
Sends real S7 USER_DATA protocol request to server.
Returns:
PLC date and time
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Build and send get clock request
request = self.protocol.build_get_clock_request()
response = self._send_receive(request)
# Parse clock response
return self.protocol.parse_get_clock_response(response)
[docs]
def set_plc_datetime(self, dt: datetime) -> int:
"""
Set PLC date/time.
Sends real S7 USER_DATA protocol request to server.
Args:
dt: Date and time to set
Returns:
0 on success
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Build and send set clock request
request = self.protocol.build_set_clock_request(dt)
self._send_receive(request)
logger.info(f"Set PLC datetime to {dt}")
return 0
[docs]
def set_plc_system_datetime(self) -> int:
"""Set PLC time to system time.
Returns:
0 on success
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
current_time = datetime.now()
self.set_plc_datetime(current_time)
logger.info(f"Set PLC time to current system time: {current_time}")
return 0
[docs]
def compress(self, timeout: int) -> int:
"""
Compress PLC memory.
Sends real S7 PLC_CONTROL protocol with PI service "_MSZL".
Args:
timeout: Timeout in milliseconds (used for receive timeout)
Returns:
0 on success
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Build and send compress request
request = self.protocol.build_compress_request()
response = self._send_receive(request)
self.protocol.check_control_response(response)
logger.info(f"Compress PLC memory completed (timeout={timeout}ms)")
return 0
[docs]
def copy_ram_to_rom(self, timeout: int = 0) -> int:
"""
Copy RAM to ROM.
Sends real S7 PLC_CONTROL protocol with PI service "_MSZL" and file ID "P".
Args:
timeout: Timeout in milliseconds (used for receive timeout)
Returns:
0 on success
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Build and send copy RAM to ROM request
request = self.protocol.build_copy_ram_to_rom_request()
response = self._send_receive(request)
self.protocol.check_control_response(response)
logger.info(f"Copy RAM to ROM completed (timeout={timeout}ms)")
return 0
[docs]
def get_cp_info(self) -> S7CpInfo:
"""Get communication processor info (SZL 0x0131)."""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
return parse_cp_info_szl(self.read_szl(0x0131, 0))
[docs]
def get_order_code(self) -> S7OrderCode:
"""Get module order code and firmware version (SZL 0x0011)."""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
return parse_order_code_szl(self.read_szl(0x0011, 0))
[docs]
def get_protection(self) -> S7Protection:
"""Get protection settings (SZL 0x0232)."""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
return parse_protection_szl(self.read_szl(0x0232, 0))
[docs]
def read_szl(self, ssl_id: int, index: int = 0) -> S7SZL:
"""
Read SZL (System Status List).
Sends real S7 USER_DATA protocol request to server.
Supports multi-packet responses where SZL data spans multiple PDUs.
Args:
ssl_id: SZL ID
index: SZL index
Returns:
SZL structure with header and data
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
conn = self._get_connection()
# Build and send read SZL request
request = self.protocol.build_read_szl_request(ssl_id, index)
response = self._send_receive(request)
# Check for errors in data section (for USERDATA - return_code != 0xFF means error)
data_info = response.get("data", {})
return_code = data_info.get("return_code", 0xFF) if isinstance(data_info, dict) else 0xFF
if return_code != 0xFF:
desc = get_return_code_description(return_code)
raise RuntimeError(f"Read SZL failed: {desc} (0x{return_code:02x})")
# Parse first fragment (includes SZL header)
szl_result = self.protocol.parse_read_szl_response(response)
accumulated_data = bytearray(szl_result["data"])
# Check for multi-packet response
params = response.get("parameters", {})
last_data_unit = params.get("last_data_unit", 0x00) if isinstance(params, dict) else 0x00
sequence_number = params.get("sequence_number", 0) if isinstance(params, dict) else 0
group = params.get("group", 0x04) if isinstance(params, dict) else 0x04
subfunction = params.get("subfunction", 0x01) if isinstance(params, dict) else 0x01
# Accumulate follow-up fragments
for _ in range(100): # Safety limit
if last_data_unit == 0x00:
break
followup = self.protocol.build_userdata_followup_request(group, subfunction, sequence_number)
conn.send_data(followup)
response_data = conn.receive_data()
response = self.protocol.parse_response(response_data)
# Check for errors
data_info = response.get("data", {})
return_code = data_info.get("return_code", 0xFF) if isinstance(data_info, dict) else 0xFF
if return_code != 0xFF:
break
# Parse follow-up fragment (no SZL header)
fragment = self.protocol.parse_read_szl_response(response, first_fragment=False)
accumulated_data.extend(fragment["data"])
# Update multi-packet state
params = response.get("parameters", {})
last_data_unit = params.get("last_data_unit", 0x00) if isinstance(params, dict) else 0x00
sequence_number = params.get("sequence_number", 0) if isinstance(params, dict) else 0
# Build S7SZL structure
szl = S7SZL()
szl.Header.LengthDR = len(accumulated_data)
szl.Header.NDR = 1
# Copy data to SZL.Data array
for i, b in enumerate(accumulated_data[: min(len(accumulated_data), len(szl.Data))]):
szl.Data[i] = b
return szl
[docs]
def read_szl_list(self) -> bytes:
"""
Read list of available SZL IDs.
Sends real S7 USER_DATA protocol request to server.
Returns:
SZL list data
"""
if not self.get_connected():
raise S7ConnectionError("Not connected to PLC")
# Read SZL ID 0x0000 to get list of available IDs
szl = self.read_szl(0x0000, 0)
# Return raw data
return bytes(szl.Data[: szl.Header.LengthDR])
[docs]
def read_diagnostic_buffer(self) -> list[dict[str, Any]]:
"""Read the PLC diagnostic buffer.
.. warning:: This method is **experimental** and may change.
Returns a list of diagnostic entries, newest first. Each entry
is a dict with keys ``event_id``, ``timestamp``, and ``description``.
Returns:
List of diagnostic buffer entries.
"""
# SZL ID 0x00A0, index 0 = diagnostic buffer
szl = self.read_szl(0x00A0, 0)
raw = bytes(szl.Data[: szl.Header.LengthDR])
entries: list[dict[str, Any]] = []
# Each diagnostic entry is 20 bytes
entry_size = 20
offset = 0
while offset + entry_size <= len(raw):
event_id = struct.unpack(">H", raw[offset : offset + 2])[0]
# BCD-encoded timestamp at offset 2..9
ts_bytes = raw[offset + 2 : offset + 10]
try:
ts = self._parse_bcd_timestamp(ts_bytes)
except Exception:
ts = None
# Additional info at offset 10..19
info = raw[offset + 10 : offset + entry_size]
entries.append(
{
"event_id": event_id,
"timestamp": ts,
"info": info.hex(),
}
)
offset += entry_size
return entries
@staticmethod
def _parse_bcd_timestamp(data: bytes) -> datetime:
"""Parse a BCD-encoded S7 timestamp (8 bytes) to datetime."""
def bcd(b: int) -> int:
return (b >> 4) * 10 + (b & 0x0F)
year = bcd(data[0])
year += 2000 if year < 90 else 1900
month = bcd(data[1])
day = bcd(data[2])
hour = bcd(data[3])
minute = bcd(data[4])
second = bcd(data[5])
return datetime(year, month, day, hour, minute, second)
[docs]
def iso_exchange_buffer(self, data: bytearray) -> bytearray:
"""
Exchange raw ISO PDU.
Args:
data: Raw PDU data
Returns:
Response PDU data
"""
conn = self._get_connection()
conn.send_data(bytes(data))
response = conn.receive_data()
return bytearray(response)
# Convenience methods for specific memory areas
[docs]
def ab_read(self, start: int, size: int) -> bytearray:
"""Read from process output area (PA).
Args:
start: Start byte offset
size: Number of bytes to read
Returns:
Data read from output area
"""
return self.read_area(Area.PA, 0, start, size)
[docs]
def ab_write(self, start: int, data: bytearray) -> int:
"""Write to process output area (PA).
Args:
start: Start byte offset
data: Data to write
Returns:
0 on success
"""
return self.write_area(Area.PA, 0, start, data)
[docs]
def eb_read(self, start: int, size: int) -> bytearray:
"""Read from process input area (PE).
Args:
start: Start byte offset
size: Number of bytes to read
Returns:
Data read from input area
"""
return self.read_area(Area.PE, 0, start, size)
[docs]
def eb_write(self, start: int, size: int, data: bytearray) -> int:
"""Write to process input area (PE).
Args:
start: Start byte offset
size: Number of bytes to write (must match len(data))
data: Data to write
Returns:
0 on success
"""
return self.write_area(Area.PE, 0, start, data[:size])
[docs]
def mb_read(self, start: int, size: int) -> bytearray:
"""Read from marker/flag area (MK).
Args:
start: Start byte offset
size: Number of bytes to read
Returns:
Data read from marker area
"""
return self.read_area(Area.MK, 0, start, size)
[docs]
def mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Write to marker/flag area (MK).
Args:
start: Start byte offset
size: Number of bytes to write (must match len(data))
data: Data to write
Returns:
0 on success
"""
return self.write_area(Area.MK, 0, start, data[:size])
[docs]
def tm_read(self, start: int, size: int) -> bytearray:
"""Read from timer area (TM).
Args:
start: Start offset
size: Number of timers to read
Returns:
Timer data
"""
return self.read_area(Area.TM, 0, start, size) # read_area handles word length
[docs]
def tm_write(self, start: int, size: int, data: bytearray) -> int:
"""Write to timer area (TM).
Args:
start: Start offset
size: Number of timers to write
data: Timer data to write
Returns:
0 on success
"""
if len(data) != size * 2:
raise ValueError(f"Data length {len(data)} doesn't match size {size * 2}")
try:
return self.write_area(Area.TM, 0, start, data)
except S7ProtocolError as e:
raise RuntimeError(str(e)) from e
[docs]
def ct_read(self, start: int, size: int) -> bytearray:
"""Read from counter area (CT).
Args:
start: Start offset
size: Number of counters to read
Returns:
Counter data
"""
return self.read_area(Area.CT, 0, start, size) # read_area handles word length
[docs]
def ct_write(self, start: int, size: int, data: bytearray) -> int:
"""Write to counter area (CT).
Args:
start: Start offset
size: Number of counters to write
data: Counter data to write
Returns:
0 on success
"""
if len(data) != size * 2:
raise ValueError(f"Data length {len(data)} doesn't match size {size * 2}")
return self.write_area(Area.CT, 0, start, data)
# Typed DB access methods
[docs]
def db_read_bool(self, db_number: int, byte_offset: int, bit_offset: int) -> bool:
"""Read a single bit from a DB.
Args:
db_number: DB number
byte_offset: Byte offset within the DB
bit_offset: Bit offset within the byte (0-7)
Returns:
Boolean value
"""
from .util import get_bool
data = self.db_read(db_number, byte_offset, 1)
return get_bool(data, 0, bit_offset)
[docs]
def db_write_bool(self, db_number: int, byte_offset: int, bit_offset: int, value: bool) -> None:
"""Write a single bit to a DB (preserving other bits in the byte).
Args:
db_number: DB number
byte_offset: Byte offset within the DB
bit_offset: Bit offset within the byte (0-7)
value: Boolean value to write
"""
from .util import set_bool
data = self.db_read(db_number, byte_offset, 1)
set_bool(data, 0, bit_offset, value)
self.db_write(db_number, byte_offset, data)
[docs]
def db_read_byte(self, db_number: int, offset: int) -> int:
"""Read a BYTE (8-bit unsigned) from a DB."""
data = self.db_read(db_number, offset, 1)
return data[0]
[docs]
def db_write_byte(self, db_number: int, offset: int, value: int) -> None:
"""Write a BYTE (8-bit unsigned) to a DB."""
from .util import set_byte
data = bytearray(1)
set_byte(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_int(self, db_number: int, offset: int) -> int:
"""Read an INT (16-bit signed) from a DB."""
from .util import get_int
data = self.db_read(db_number, offset, 2)
return get_int(data, 0)
[docs]
def db_write_int(self, db_number: int, offset: int, value: int) -> None:
"""Write an INT (16-bit signed) to a DB."""
from .util import set_int
data = bytearray(2)
set_int(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_uint(self, db_number: int, offset: int) -> int:
"""Read a UINT (16-bit unsigned) from a DB."""
from .util import get_uint
data = self.db_read(db_number, offset, 2)
return get_uint(data, 0)
[docs]
def db_write_uint(self, db_number: int, offset: int, value: int) -> None:
"""Write a UINT (16-bit unsigned) to a DB."""
from .util import set_uint
data = bytearray(2)
set_uint(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_word(self, db_number: int, offset: int) -> int:
"""Read a WORD (16-bit unsigned) from a DB."""
data = self.db_read(db_number, offset, 2)
return (data[0] << 8) | data[1]
[docs]
def db_write_word(self, db_number: int, offset: int, value: int) -> None:
"""Write a WORD (16-bit unsigned) to a DB."""
from .util import set_word
data = bytearray(2)
set_word(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_dint(self, db_number: int, offset: int) -> int:
"""Read a DINT (32-bit signed) from a DB."""
from .util import get_dint
data = self.db_read(db_number, offset, 4)
return get_dint(data, 0)
[docs]
def db_write_dint(self, db_number: int, offset: int, value: int) -> None:
"""Write a DINT (32-bit signed) to a DB."""
from .util import set_dint
data = bytearray(4)
set_dint(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_udint(self, db_number: int, offset: int) -> int:
"""Read a UDINT (32-bit unsigned) from a DB."""
from .util import get_udint
data = self.db_read(db_number, offset, 4)
return get_udint(data, 0)
[docs]
def db_write_udint(self, db_number: int, offset: int, value: int) -> None:
"""Write a UDINT (32-bit unsigned) to a DB."""
from .util import set_udint
data = bytearray(4)
set_udint(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_dword(self, db_number: int, offset: int) -> int:
"""Read a DWORD (32-bit unsigned) from a DB."""
from .util import get_dword
data = self.db_read(db_number, offset, 4)
return get_dword(data, 0)
[docs]
def db_write_dword(self, db_number: int, offset: int, value: int) -> None:
"""Write a DWORD (32-bit unsigned) to a DB."""
from .util import set_dword
data = bytearray(4)
set_dword(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_real(self, db_number: int, offset: int) -> float:
"""Read a REAL (32-bit float) from a DB."""
from .util import get_real
data = self.db_read(db_number, offset, 4)
return get_real(data, 0)
[docs]
def db_write_real(self, db_number: int, offset: int, value: float) -> None:
"""Write a REAL (32-bit float) to a DB."""
from .util import set_real
data = bytearray(4)
set_real(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_lreal(self, db_number: int, offset: int) -> float:
"""Read a LREAL (64-bit float) from a DB."""
from .util import get_lreal
data = self.db_read(db_number, offset, 8)
return get_lreal(data, 0)
[docs]
def db_write_lreal(self, db_number: int, offset: int, value: float) -> None:
"""Write a LREAL (64-bit float) to a DB."""
from .util import set_lreal
data = bytearray(8)
set_lreal(data, 0, value)
self.db_write(db_number, offset, data)
[docs]
def db_read_string(self, db_number: int, offset: int) -> str:
"""Read an S7 STRING from a DB.
Reads the 2-byte header to determine max length, then reads the full string.
"""
from .util import get_string
header = self.db_read(db_number, offset, 2)
max_len = header[0]
data = self.db_read(db_number, offset, 2 + max_len)
return get_string(data, 0)
[docs]
def db_write_string(self, db_number: int, offset: int, value: str, max_length: int = 254) -> None:
"""Write an S7 STRING to a DB.
Args:
db_number: DB number
offset: Byte offset
value: String to write
max_length: Maximum string length (default 254)
"""
from .util import set_string
data = bytearray(2 + max_length)
set_string(data, 0, value, max_length)
actual_size = 2 + max_length
self.db_write(db_number, offset, data[:actual_size])
[docs]
def db_read_wstring(self, db_number: int, offset: int) -> str:
"""Read an S7 WSTRING from a DB.
Reads the 4-byte header to determine max length, then reads the full string.
"""
from .util import get_wstring
header = self.db_read(db_number, offset, 4)
max_len = (header[0] << 8) | header[1]
data = self.db_read(db_number, offset, 4 + max_len * 2)
return get_wstring(data, 0)
[docs]
def db_write_wstring(self, db_number: int, offset: int, value: str, max_length: int = 254) -> None:
"""Write an S7 WSTRING to a DB.
Args:
db_number: DB number
offset: Byte offset
value: String to write
max_length: Maximum string length in characters (default 254)
"""
from .util import set_wstring
data = bytearray(4 + max_length * 2)
set_wstring(data, 0, value, max_length)
self.db_write(db_number, offset, data)
# Async methods
[docs]
def as_ab_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Async read from process output area."""
result = self.ab_read(start, size)
for i, b in enumerate(result):
data[i] = b
self._async_pending = True
return 0
[docs]
def as_ab_write(self, start: int, data: bytearray) -> int:
"""Async write to process output area."""
self.ab_write(start, data)
self._async_pending = True
return 0
[docs]
def as_compress(self, timeout: int) -> int:
"""Async compress PLC memory."""
self.compress(timeout)
self._async_pending = True
return 0
[docs]
def as_copy_ram_to_rom(self, timeout: int = 0) -> int:
"""Async copy RAM to ROM."""
self.copy_ram_to_rom(timeout)
self._async_pending = True
return 0
[docs]
def as_ct_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Async read from counter area."""
result = self.ct_read(start, size)
# Copy raw bytes to ctypes buffer
memmove(data, bytes(result), len(result))
self._async_pending = True
return 0
[docs]
def as_ct_write(self, start: int, size: int, data: bytearray) -> int:
"""Async write to counter area."""
self.ct_write(start, size, data)
self._async_pending = True
return 0
[docs]
def as_db_fill(self, db_number: int, filler: int) -> int:
"""Async fill DB."""
self.db_fill(db_number, filler)
self._async_pending = True
return 0
[docs]
def as_db_get(self, db_number: int, data: CDataArrayType, size: int) -> int:
"""Async get entire DB."""
result = self.db_get(db_number)
for i, b in enumerate(result[:size]):
data[i] = b
self._async_pending = True
return 0
[docs]
def as_db_read(self, db_number: int, start: int, size: int, data: CDataArrayType) -> int:
"""Async read from DB."""
result = self.db_read(db_number, start, size)
for i, b in enumerate(result):
data[i] = b
self._async_pending = True
return 0
[docs]
def as_db_write(self, db_number: int, start: int, size: int, data: CDataArrayType) -> int:
"""Async write to DB."""
write_data = bytearray(data)[:size]
self.db_write(db_number, start, write_data)
self._async_pending = True
return 0
[docs]
def as_download(self, data: bytearray, block_num: int = -1) -> int:
"""Async download block."""
self.download(data, block_num)
self._async_pending = True
return 0
[docs]
def as_eb_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Async read from input area."""
result = self.eb_read(start, size)
for i, b in enumerate(result):
data[i] = b
self._async_pending = False
return 0
[docs]
def as_eb_write(self, start: int, size: int, data: bytearray) -> int:
"""Async write to input area."""
self.eb_write(start, size, data)
self._async_pending = False
return 0
[docs]
def as_full_upload(self, block_type: Block, block_num: int) -> int:
"""Async full upload of block."""
# This operation is not supported - leave _async_pending = False
# so wait_as_completion will raise RuntimeError
self._async_pending = False
return 0
[docs]
def as_list_blocks_of_type(self, block_type: Block, data: CDataArrayType, count: int) -> int:
"""Async list blocks of type."""
# This operation is not supported - leave _async_pending = False
# so wait_as_completion will raise RuntimeError
self._async_pending = False
return 0
[docs]
def as_mb_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Async read from marker area."""
result = self.mb_read(start, size)
for i, b in enumerate(result):
data[i] = b
self._async_pending = False
return 0
[docs]
def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Async write to marker area."""
self.mb_write(start, size, data)
self._async_pending = False
return 0
[docs]
def as_read_area(self, area: Area, db_number: int, start: int, size: int, wordlen: WordLen, data: CDataArrayType) -> int:
"""Async read from memory area."""
result = self.read_area(area, db_number, start, size)
# Copy raw bytes to ctypes buffer
memmove(data, bytes(result), len(result))
self._async_pending = True # Mark operation as pending for wait_as_completion
return 0
[docs]
def as_read_szl(self, ssl_id: int, index: int, szl: S7SZL, size: int) -> int:
"""Async read SZL."""
result = self.read_szl(ssl_id, index)
szl.Header = result.Header
for i in range(min(len(result.Data), len(szl.Data))):
szl.Data[i] = result.Data[i]
self._async_pending = True
return 0
[docs]
def as_read_szl_list(self, szl_list: S7SZLList, items_count: int) -> int:
"""Async read SZL list."""
data = self.read_szl_list()
szl_list.Header.LengthDR = 2
szl_list.Header.NDR = len(data) // 2
# Copy raw bytes directly to preserve byte order
memmove(szl_list.List, data, min(len(data), len(szl_list.List) * 2))
self._async_pending = True
return 0
[docs]
def as_tm_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Async read from timer area."""
result = self.tm_read(start, size)
# Copy raw bytes to ctypes buffer
memmove(data, bytes(result), len(result))
self._async_pending = True
return 0
[docs]
def as_tm_write(self, start: int, size: int, data: bytearray) -> int:
"""Async write to timer area."""
self.tm_write(start, size, data)
self._async_pending = True
return 0
[docs]
def as_upload(self, block_num: int, data: CDataArrayType, size: int) -> int:
"""Async upload block."""
# This operation is not supported - leave _async_pending = False
# so wait_as_completion will raise RuntimeError
self._async_pending = False
return 0
[docs]
def as_write_area(self, area: Area, db_number: int, start: int, size: int, wordlen: WordLen, data: CDataArrayType) -> int:
"""Async write to memory area."""
write_data = bytearray(data)[:size]
self.write_area(area, db_number, start, write_data)
self._async_pending = True # Mark operation as pending for wait_as_completion
return 0
[docs]
def check_as_completion(self, status: "c_int") -> int:
"""Check async completion status."""
# In pure Python, async operations complete immediately
status.value = 0 # 0 = completed
return 0
[docs]
def wait_as_completion(self, timeout: int) -> int:
"""Wait for async completion.
Raises:
RuntimeError: If no async operation is pending or timeout=0
"""
# In pure Python, async operations complete immediately.
# If there's no pending operation, raise error for API compatibility
if not self._async_pending:
raise RuntimeError(b"CLI : Job Timeout")
# Simulate timeout behavior when timeout=0 - sometimes timeout on first call
if timeout == 0:
self._async_pending = False
raise RuntimeError(b"CLI : Job Timeout")
self._async_pending = False
return 0
[docs]
def set_as_callback(self, callback: Callable[[int, int], None]) -> int:
"""Set async callback."""
self._async_callback = callback
return 0
def _setup_communication(self) -> None:
"""Setup communication and negotiate PDU length."""
request = self.protocol.build_setup_communication_request(max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length)
response = self._send_receive(request)
if response.get("parameters"):
params = response["parameters"]
if "pdu_length" in params:
self.pdu_length = params["pdu_length"]
self._params[Parameter.PDURequest] = self.pdu_length
logger.info(f"Negotiated PDU length: {self.pdu_length}")
[docs]
def __enter__(self) -> "Client":
"""Context manager entry."""
return self
[docs]
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Context manager exit."""
self.disconnect()
def __del__(self) -> None:
# Best-effort cleanup on garbage collection. Prefer disconnect()
# or a `with` block; during interpreter shutdown module globals
# may already be None, so we skip finalization and swallow errors.
if sys.is_finalizing():
return
try:
self.disconnect()
except Exception:
pass