Source code for snap7.partner

"""
Pure Python S7 partner implementation.

S7 peer-to-peer communication for bidirectional data exchange.
Unlike client-server where client requests and server responds,
partners have equal rights and can send data asynchronously.
"""

import socket
import struct
import logging
import sys
import threading
from typing import Optional, Tuple, Callable, Type
from queue import Queue, Empty
from typing import Any
from datetime import datetime
from types import TracebackType
from ctypes import c_int32, c_uint32

from .connection import ISOTCPConnection
from .error import S7Error, S7ConnectionError, S7TimeoutError
from .s7protocol import S7Protocol, S7PDUType
from .type import Parameter

logger = logging.getLogger(__name__)

# S7 partner/push function group
_PUSH_FUNC_GROUP = 0x06

# Partner push subfunctions
_PUSH_SUBFUNCTION_BSEND = 0x01  # bsend data push


[docs] class PartnerStatus: """Partner status constants.""" STOPPED = 0 RUNNING = 1 CONNECTED = 2
[docs] class Partner: """ Pure Python S7 partner implementation. Implements peer-to-peer S7 communication where both partners can send and receive data asynchronously. Supports both active (initiates connection) and passive (waits for connection) modes. Examples: >>> import snap7 >>> partner = snap7.Partner(active=True) >>> partner.start_to("0.0.0.0", "192.168.1.10", 0x0100, 0x0102) >>> partner.set_send_data(b"Hello") >>> partner.b_send() >>> partner.stop() """
[docs] def __init__(self, active: bool = False, **kwargs: object) -> None: """ Initialize S7 partner. Args: active: If True, this partner initiates the connection. If False, this partner waits for incoming connections. **kwargs: Ignored. Kept for backwards compatibility. """ self.active = active self.connected = False self.running = False # Connection parameters self.local_ip = "0.0.0.0" self.remote_ip = "" self.local_tsap = 0x0100 self.remote_tsap = 0x0102 self.port = 1102 # Non-privileged port (was 102) self.local_port = 0 # Let OS choose self.remote_port = 1102 # Non-privileged port (was 102) # Socket and connection self._socket: Optional[socket.socket] = None self._server_socket: Optional[socket.socket] = None # For passive mode self._connection: Optional[ISOTCPConnection] = None # S7 protocol handler (for setup communication and PDU formatting) self._protocol = S7Protocol() self.pdu_length = 480 # R-ID for bsend/brecv matching (default 0, can be set by caller) self.r_id: int = 0 # R-ID received from the last incoming PDU self._recv_r_id: int = 0 # Socket timeout (seconds) used by the async receive listener self.recv_timeout: float = 0.2 # Statistics self.bytes_sent = 0 self.bytes_recv = 0 self.send_errors = 0 self.recv_errors = 0 # Timing self.last_send_time = 0 self.last_recv_time = 0 # Callbacks self._recv_callback: Optional[Callable[[bytes], None]] = None self._send_callback_fn: Optional[Callable[[int], None]] = None # Async operation support self._async_send_queue: Queue[Any] = Queue() self._async_recv_queue: Queue[Any] = Queue() self._async_thread: Optional[threading.Thread] = None self._recv_listener_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._io_lock = threading.RLock() # Last error self.last_error = 0 # Buffer for send/recv operations self._send_data: Optional[bytes] = None self._recv_data: Optional[bytes] = None self._async_send_in_progress = False self._async_send_result = 0 self._async_recv_in_progress = False self._async_recv_result = 0 self._async_recv_started = False logger.info(f"S7 Partner initialized (active={active}, pure Python implementation)")
[docs] def create(self, active: bool = False) -> None: """ Creates a Partner. Note: For pure Python implementation, the partner is created in __init__. This method exists for API compatibility. Args: active: If True, this partner initiates connections """ pass
[docs] def destroy(self) -> int: """ Destroy the Partner. Returns: 0 on success """ self.stop() return 0
[docs] def start(self) -> int: """ Start the partner with default parameters. Returns: 0 on success """ return self.start_to(self.local_ip, self.remote_ip, self.local_tsap, self.remote_tsap)
[docs] def start_to(self, local_ip: str, remote_ip: str, local_tsap: int, remote_tsap: int) -> int: """ Start the partner with specific connection parameters. Args: local_ip: Local IP address to bind to remote_ip: Remote partner IP address (for active mode) local_tsap: Local TSAP remote_tsap: Remote TSAP Returns: 0 on success """ self.local_ip = local_ip self.remote_ip = remote_ip self.local_tsap = local_tsap self.remote_tsap = remote_tsap try: if self.active: # Active mode: initiate connection to remote partner self._connect_to_remote() else: # Passive mode: start listening for incoming connections self._start_listening() self.running = True # Start async processing thread self._stop_event.clear() self._async_thread = threading.Thread(target=self._async_processor, daemon=True) self._async_thread.start() logger.info(f"Partner started ({'active' if self.active else 'passive'} mode)") return 0 except Exception as e: self.last_error = -1 logger.error(f"Partner start failed: {e}") raise S7ConnectionError(f"Partner start failed: {e}")
[docs] def stop(self) -> int: """ Stop the partner and disconnect. Returns: 0 on success """ self._stop_event.set() self._async_recv_in_progress = False if self._async_thread and self._async_thread.is_alive(): self._async_thread.join(timeout=2.0) if self._recv_listener_thread and self._recv_listener_thread.is_alive(): self._recv_listener_thread.join(timeout=2.0) if self._connection: self._connection.disconnect() self._connection = None if self._server_socket: try: self._server_socket.close() except Exception: pass self._server_socket = None if self._socket: try: self._socket.close() except Exception: pass self._socket = None self.connected = False self.running = False logger.info("Partner stopped") return 0
[docs] def b_send(self) -> int: """ Send data synchronously (blocking). Note: Call set_send_data() first to set the data to send. Returns: 0 on success """ if self._send_data is None: return -1 if not self.connected or self._connection is None: self.send_errors += 1 raise S7ConnectionError("Not connected") start_time = datetime.now() try: # Build partner data PDU pdu = self._build_partner_data_pdu(self._send_data) with self._io_lock: # Send via ISO connection self._connection.send_data(pdu) # Wait for acknowledgment ack_data = self._connection.receive_data() self._parse_partner_ack(ack_data) self.bytes_sent += len(self._send_data) self.last_send_time = int((datetime.now() - start_time).total_seconds() * 1000) logger.debug(f"Sent {len(self._send_data)} bytes synchronously") return 0 except Exception as e: self.send_errors += 1 self.last_error = -1 logger.error(f"Synchronous send failed: {e}") raise S7ConnectionError(f"Send failed: {e}")
[docs] def b_recv(self) -> int: """ Receive data synchronously (blocking). Returns: 0 on success """ if not self.connected or self._connection is None: self.recv_errors += 1 self._recv_data = None return -1 start_time = datetime.now() try: with self._io_lock: # Receive partner data data = self._connection.receive_data() received, r_id, pdu_ref = self._parse_partner_data_pdu(data) # Send acknowledgment with the same PDU reference ack = self._build_partner_ack(pdu_ref) self._connection.send_data(ack) self.bytes_recv += len(received) self.last_recv_time = int((datetime.now() - start_time).total_seconds() * 1000) self._recv_data = received self._recv_r_id = r_id # Call receive callback if set if self._recv_callback: self._recv_callback(received) logger.debug(f"Received {len(received)} bytes synchronously") return 0 except socket.timeout: self._recv_data = None return 1 # Timeout except Exception as e: self.recv_errors += 1 self.last_error = -1 self._recv_data = None logger.error(f"Synchronous receive failed: {e}") return -1
[docs] def as_b_send(self) -> int: """ Send data asynchronously (non-blocking). Note: Call set_send_data() first to set the data to send. Returns: 0 on success (send initiated) """ if self._send_data is None: return -1 if not self.connected: self.send_errors += 1 return -1 self._async_send_in_progress = True self._async_send_result = 1 # In progress # Queue the send operation self._async_send_queue.put(self._send_data) logger.debug(f"Async send initiated for {len(self._send_data)} bytes") return 0
[docs] def check_as_b_send_completion(self) -> Tuple[str, c_int32]: """ Check if async send completed. Returns: Tuple of (status_string, operation_result) """ if self._async_send_in_progress: return "job in progress", c_int32(0) return_values = { 0: "job complete", 1: "job in progress", -2: "invalid handled supplied", } result = self._async_send_result return return_values.get(0, "unknown"), c_int32(result)
[docs] def wait_as_b_send_completion(self, timeout: int = 0) -> int: """ Wait for async send to complete. Args: timeout: Timeout in milliseconds (0 for infinite) Returns: 0 on success, non-zero on error/timeout Raises: RuntimeError: If no async operation is in progress """ if not self._async_send_in_progress: raise RuntimeError("No async send operation in progress") # Wait for completion wait_time = timeout / 1000.0 if timeout > 0 else None start = datetime.now() while self._async_send_in_progress: if wait_time is not None: elapsed = (datetime.now() - start).total_seconds() if elapsed >= wait_time: return -1 # Timeout threading.Event().wait(0.01) # Small sleep return self._async_send_result
[docs] def as_b_recv(self) -> int: """ Start asynchronous receive (non-blocking). Begins listening for incoming partner data in the background. Use :meth:`check_as_b_recv_completion` or :meth:`wait_as_b_recv_completion` to check for results. Returns: 0 on success (receive initiated), -1 on error """ if not self.connected: self.recv_errors += 1 return -1 if self._async_recv_in_progress: return -1 self._async_recv_in_progress = True self._async_recv_started = True self._async_recv_result = 1 # In progress if self._recv_listener_thread is None or not self._recv_listener_thread.is_alive(): self._recv_listener_thread = threading.Thread(target=self._recv_listener, daemon=True) self._recv_listener_thread.start() logger.debug("Async receive initiated") return 0
[docs] def check_as_b_recv_completion(self) -> int: """ Check if async receive completed. Returns: 0 if data available, 1 if in progress, -1 on error """ if self._async_recv_result == -1: return -1 try: self._recv_data = self._async_recv_queue.get_nowait() return 0 # Data available except Empty: return 1 # No data yet
[docs] def wait_as_b_recv_completion(self, timeout: int = 0) -> int: """ Wait for async receive to complete. Args: timeout: Timeout in milliseconds (0 for infinite) Returns: 0 on success, -1 on timeout/error Raises: RuntimeError: If no async receive was ever started """ if not self._async_recv_in_progress: if self._async_recv_started: # Listener already finished before wait was called self._async_recv_started = False return self._async_recv_result raise RuntimeError("No async receive operation in progress") wait_time = timeout / 1000.0 if timeout > 0 else None start = datetime.now() while self._async_recv_in_progress: if wait_time is not None: elapsed = (datetime.now() - start).total_seconds() if elapsed >= wait_time: return -1 threading.Event().wait(0.01) return self._async_recv_result
[docs] def get_status(self) -> c_int32: """ Get partner status. Returns: Status code (0=stopped, 1=running, 2=connected) """ if self.connected: return c_int32(PartnerStatus.CONNECTED) elif self.running: return c_int32(PartnerStatus.RUNNING) else: return c_int32(PartnerStatus.STOPPED)
[docs] def get_stats(self) -> Tuple[c_uint32, c_uint32, c_uint32, c_uint32]: """ Get partner statistics. Returns: Tuple of (bytes_sent, bytes_recv, send_errors, recv_errors) """ return (c_uint32(self.bytes_sent), c_uint32(self.bytes_recv), c_uint32(self.send_errors), c_uint32(self.recv_errors))
[docs] def get_times(self) -> Tuple[c_int32, c_int32]: """ Get last operation times. Returns: Tuple of (last_send_time_ms, last_recv_time_ms) """ return c_int32(self.last_send_time), c_int32(self.last_recv_time)
[docs] def get_last_error(self) -> c_int32: """ Get last error code. Returns: Last error code """ return c_int32(self.last_error)
[docs] def get_param(self, parameter: Parameter) -> int: """ Get partner parameter. Args: parameter: Parameter to read Returns: Parameter value """ param_values = { Parameter.LocalPort: self.local_port, Parameter.RemotePort: self.remote_port, Parameter.PingTimeout: 750, Parameter.SendTimeout: 10, Parameter.RecvTimeout: 3000, Parameter.SrcRef: 256, Parameter.DstRef: 0, Parameter.PDURequest: 480, Parameter.WorkInterval: 100, Parameter.BSendTimeout: 3000, Parameter.BRecvTimeout: 3000, Parameter.RecoveryTime: 500, Parameter.KeepAliveTime: 5000, } value = param_values.get(parameter) if value is None: raise RuntimeError(f"Parameter {parameter} not supported") logger.debug(f"Getting parameter {parameter} = {value}") return value
[docs] def set_param(self, parameter: Parameter, value: int) -> int: """ Set partner parameter. Args: parameter: Parameter to set value: Value to set Returns: 0 on success """ # Some parameters cannot be set if parameter == Parameter.RemotePort: raise RuntimeError(f"Cannot set parameter {parameter}") if parameter == Parameter.LocalPort: self.local_port = value logger.debug(f"Setting parameter {parameter} to {value}") return 0
[docs] def set_recv_callback(self, callback: Optional[Callable[[bytes], None]] = None) -> int: """ Register a callback for incoming data. The callback is invoked with the received bytes whenever data arrives via :meth:`b_recv` or async receive. Args: callback: Function called with received data, or ``None`` to clear. Returns: 0 on success """ self._recv_callback = callback logger.debug(f"Receive callback {'set' if callback else 'cleared'}") return 0
[docs] def set_send_callback(self, callback: Optional[Callable[[int], None]] = None) -> int: """ Register a callback for completed async sends. Args: callback: Function called with the result code, or ``None`` to clear. Returns: 0 on success """ self._send_callback_fn = callback logger.debug(f"Send callback {'set' if callback else 'cleared'}") return 0
[docs] def set_send_data(self, data: bytes) -> None: """ Set data to be sent by b_send() or as_b_send(). Args: data: Data to send """ self._send_data = data
[docs] def get_recv_data(self) -> Optional[bytes]: """ Get data received by b_recv() or async receive. Returns: Received data or None """ return self._recv_data
[docs] def get_recv_r_id(self) -> int: """ Get the R-ID from the last received PDU. Returns: R-ID value (0 if no data has been received yet) """ return self._recv_r_id
def _connect_to_remote(self) -> None: """Connect to remote partner (active mode). Performs COTP connection followed by S7 Communication Setup to negotiate PDU size with the remote partner. """ if not self.remote_ip: raise S7ConnectionError("Remote IP not specified for active partner") self._connection = ISOTCPConnection( host=self.remote_ip, port=self.port, local_tsap=self.local_tsap, remote_tsap=self.remote_tsap ) self._connection.connect() self._socket = self._connection.socket # Perform S7 Communication Setup (negotiate PDU size) self._setup_communication() self.connected = True logger.info(f"Connected to remote partner at {self.remote_ip}:{self.port}") def _start_listening(self) -> None: """Start listening for incoming connections (passive mode).""" self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Try to use SO_REUSEPORT if available (Linux, macOS) for faster port reuse if hasattr(socket, "SO_REUSEPORT"): self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self._server_socket.bind((self.local_ip, self.port)) self._server_socket.listen(1) self._server_socket.settimeout(1.0) # Allow periodic check logger.info(f"Partner listening on {self.local_ip}:{self.port}") # Start accept thread accept_thread = threading.Thread(target=self._accept_connection, daemon=True) accept_thread.start() def _accept_connection(self) -> None: """Accept incoming connection in passive mode. After accepting the TCP connection, handles the COTP Connection Request from the active partner and performs S7 Communication Setup. """ if self._server_socket is None: return while self.running and not self._stop_event.is_set(): try: client_sock, addr = self._server_socket.accept() client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # Create connection object self._socket = client_sock self._connection = ISOTCPConnection( host=addr[0], port=addr[1], local_tsap=self.local_tsap, remote_tsap=self.remote_tsap ) self._connection.socket = client_sock # Handle COTP Connection Request from active partner self._handle_cotp_cr(client_sock) self._connection.connected = True # Wait for and handle S7 Communication Setup from active partner self._handle_setup_communication() self.connected = True logger.info(f"Partner connection accepted from {addr}") break except socket.timeout: continue except Exception as e: if self.running: logger.error(f"Accept failed: {e}") break def _async_processor(self) -> None: """Background thread for processing async send operations.""" while not self._stop_event.is_set(): try: data = self._async_send_queue.get(timeout=0.1) try: old_data = self._send_data self._send_data = data with self._io_lock: result = self.b_send() self._send_data = old_data self._async_send_result = result if self._send_callback_fn: self._send_callback_fn(result) except Exception as e: self._async_send_result = -1 logger.error(f"Async send failed: {e}") finally: self._async_send_in_progress = False except Empty: pass except Exception: break def _recv_listener(self) -> None: """Background thread that listens for incoming partner data. Runs while ``_async_recv_in_progress`` is set. Uses a short socket timeout so the thread can be stopped cleanly and releases ``_io_lock`` between attempts to allow sends to proceed. """ while not self._stop_event.is_set() and self._async_recv_in_progress: conn = self._connection if not self.connected or conn is None or conn.socket is None: break try: with self._io_lock: old_timeout = conn.socket.gettimeout() conn.socket.settimeout(self.recv_timeout) try: data = conn.receive_data() received, r_id, pdu_ref = self._parse_partner_data_pdu(data) ack = self._build_partner_ack(pdu_ref) conn.send_data(ack) finally: try: if conn.socket is not None: conn.socket.settimeout(old_timeout) except OSError: pass except (S7TimeoutError, socket.timeout): # Timeout is expected — restore connected flag since # ISOTCPConnection.receive_data() sets it to False on timeout if conn is not None: conn.connected = True continue except Exception as e: self.recv_errors += 1 self._async_recv_result = -1 self._async_recv_in_progress = False logger.error(f"Async receive failed: {e}") break # Data received successfully self._recv_data = received self._recv_r_id = r_id self._async_recv_queue.put(received) self.bytes_recv += len(received) self._async_recv_result = 0 self._async_recv_in_progress = False if self._recv_callback: self._recv_callback(received) logger.debug(f"Async received {len(received)} bytes") def _setup_communication(self) -> None: """Perform S7 Communication Setup after COTP connection. Sends a Setup Communication request and parses the negotiated PDU length from the response. This is required before any S7 data exchange can take place. """ if self._connection is None: raise S7ConnectionError("No connection for S7 setup") request = self._protocol.build_setup_communication_request(max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length) self._connection.send_data(request) response_data = self._connection.receive_data() response = self._protocol.parse_response(response_data) if response.get("parameters") and "pdu_length" in response["parameters"]: self.pdu_length = response["parameters"]["pdu_length"] logger.info(f"S7 Communication Setup complete, PDU length: {self.pdu_length}") def _handle_cotp_cr(self, sock: socket.socket) -> None: """Handle incoming COTP Connection Request and send Connection Confirm. Used by passive partner to complete the COTP handshake initiated by the active partner. """ # Receive TPKT header (4 bytes) tpkt_header = self._recv_exact_from(sock, 4) version, _, length = struct.unpack(">BBH", tpkt_header) if version != 3: raise S7ConnectionError(f"Invalid TPKT version: {version}") payload = self._recv_exact_from(sock, length - 4) if len(payload) < 7: raise S7ConnectionError("COTP CR too short") pdu_type = payload[1] if pdu_type != 0xE0: # COTP_CR raise S7ConnectionError(f"Expected COTP CR (0xE0), got {pdu_type:#04x}") # Build and send Connection Confirm if self._connection is None: raise S7ConnectionError("No connection object") cc_pdu = struct.pack( ">BBHHB", 6, # PDU length 0xD0, # COTP_CC self._connection.src_ref, # Destination reference (our src_ref) 0x0001, # Source reference 0x00, # Class 0 ) # Add PDU size parameter cc_pdu += struct.pack(">BBB", 0xC0, 1, 0x0A) # 1024 bytes # Update length byte total_len = len(cc_pdu) - 1 cc_pdu = struct.pack(">B", total_len) + cc_pdu[1:] tpkt = struct.pack(">BBH", 3, 0, len(cc_pdu) + 4) + cc_pdu sock.sendall(tpkt) logger.debug("Sent COTP Connection Confirm") def _handle_setup_communication(self) -> None: """Handle incoming S7 Communication Setup request from active partner. Receives the setup request, parses it, and sends back a setup response with the negotiated PDU length. """ if self._connection is None: raise S7ConnectionError("No connection for S7 setup") request_data = self._connection.receive_data() if len(request_data) < 10: raise S7ConnectionError("S7 setup request too short") protocol_id, pdu_type = struct.unpack(">BB", request_data[:2]) if protocol_id != 0x32 or pdu_type != S7PDUType.REQUEST: raise S7ConnectionError(f"Expected S7 setup request, got type {pdu_type:#04x}") # Parse the request to get sequence number and requested PDU length _, _, _, sequence, param_len, _ = struct.unpack(">BBHHHH", request_data[:10]) requested_pdu = self.pdu_length if param_len >= 8: params = request_data[10 : 10 + param_len] if len(params) >= 8: _, _, _, _, requested_pdu = struct.unpack(">BBHHH", params[:8]) negotiated_pdu = min(requested_pdu, self.pdu_length) self.pdu_length = negotiated_pdu # Build and send setup response response = struct.pack( ">BBHHHHBB", 0x32, S7PDUType.ACK_DATA, 0x0000, sequence, 0x0008, # param length 0x0000, # data length 0x00, # error class 0x00, # error code ) response += struct.pack( ">BBHHH", 0xF0, # Setup Communication function code 0x00, 1, # max_amq_caller 1, # max_amq_callee negotiated_pdu, ) self._connection.send_data(response) logger.info(f"S7 Communication Setup complete (passive), PDU length: {negotiated_pdu}") @staticmethod def _recv_exact_from(sock: socket.socket, size: int) -> bytes: """Receive exactly *size* bytes from a socket.""" data = bytearray() while len(data) < size: chunk = sock.recv(size - len(data)) if not chunk: raise S7ConnectionError("Connection closed during receive") data.extend(chunk) return bytes(data) def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> bytes: """Build an S7 USERDATA PDU for partner data push (bsend). The PDU uses the standard S7 USERDATA header (10 bytes) followed by a parameter section that identifies this as a PBC (Program Block Communication) push with the R-ID and a variable specification block, and a data section with the payload. Args: data: Payload to send. r_id: Request ID for bsend/brecv matching. Falls back to ``self.r_id``. Returns: Complete S7 PDU bytes (without COTP/TPKT framing). """ if r_id is None: r_id = self.r_id sequence = self._protocol._next_sequence() # Parameter section: USERDATA header (12 bytes) param = struct.pack( ">BBBBBBBBBBH", 0x00, # reserved 0x01, # parameter count 0x12, # type header 0x08, # length of following parameter data 0x12, # method: extended parameter 0x46, # type 4 (request) | group 6 (PBC BSEND) _PUSH_SUBFUNCTION_BSEND, 0x00, # sequence number (always 0 for PBC) 0x00, # data unit reference number 0x00, # last data unit 0x0000, # error code ) # Data section: header + variable spec + R-ID + payload length + payload # Variable specification: type=0x12, len=0x06, syntax_id=0x13, reserved=0x00 var_spec = struct.pack(">BBBB", 0x12, 0x06, 0x13, 0x00) # R-ID (4 bytes) + payload length (2 bytes) var_spec += struct.pack(">IH", r_id, len(data)) # Data header: return_code=0xFF, transport_size=0x09, length=varspec+data data_section = struct.pack(">BBH", 0xFF, 0x09, len(var_spec) + len(data)) + var_spec + data # S7 USERDATA header (10 bytes) header = struct.pack( ">BBHHHH", 0x32, S7PDUType.USERDATA, 0x0000, sequence, len(param), len(data_section), ) return header + param + data_section def _parse_partner_data_pdu(self, pdu: bytes) -> Tuple[bytes, int, int]: """Parse an incoming partner data push PDU and extract the payload. Returns: Tuple of (payload, r_id, pdu_ref). *r_id* and *pdu_ref* are extracted from the variable specification block and the S7 header respectively. If the variable specification is absent both default to ``0``. """ if len(pdu) < 6: raise S7Error("Invalid partner PDU: too short") protocol_id, pdu_type = struct.unpack(">BB", pdu[:2]) if protocol_id != 0x32: raise S7Error(f"Invalid protocol ID: {protocol_id:#04x}") if pdu_type == S7PDUType.USERDATA: if len(pdu) < 10: raise S7Error("USERDATA partner PDU too short") _, _, _, pdu_ref, param_len, data_len = struct.unpack(">BBHHHH", pdu[:10]) data_offset = 10 + param_len if data_offset + 4 > len(pdu): raise S7Error("Partner data section too short") # Skip 4-byte data section header (return_code, transport_size, length) payload = pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" # Parse PBC variable specification block if present # Format: 12 06 13 00 [R-ID 4 bytes] [length 2 bytes] = 10 bytes r_id = 0 if len(payload) >= 2 and payload[0] == 0x12: var_len = payload[1] if var_len == 0x06 and len(payload) >= 8: syntax_id = payload[2] if syntax_id == 0x13: (r_id,) = struct.unpack(">I", payload[4:8]) # skip var spec header (2) + body (var_len) + length field (2) payload = payload[2 + var_len + 2 :] return payload, r_id, pdu_ref else: raise S7Error(f"Unexpected PDU type in partner data: {pdu_type:#04x}") def _build_partner_ack(self, pdu_ref: int = 0) -> bytes: """Build an S7 USERDATA acknowledgment PDU for a received bsend. The PLC expects the same PDU reference in the ACK as in the data PDU it sent. Args: pdu_ref: Protocol Data Unit reference echoed from the data PDU. Returns: Complete S7 PDU bytes. """ sequence = self._protocol._next_sequence() param = struct.pack( ">BBBBBBBB", 0x00, 0x01, 0x12, 0x08, # length: 4 base + 2 (dur/ldu) + 2 (error code) 0x12, # method: response 0x86, # type 8 (response) | group 6 (push) _PUSH_SUBFUNCTION_BSEND, sequence & 0xFF, ) param += struct.pack(">BBH", 0x00, 0x00, 0x0000) # dur, ldu, error_code # Data section: return code 0x0a, transport size 0x00, length 0x0000 data = struct.pack(">BBH", 0x0A, 0x00, 0x0000) header = struct.pack( ">BBHHHH", 0x32, S7PDUType.USERDATA, 0x0000, pdu_ref, len(param), len(data), ) return header + param + data def _parse_partner_ack(self, pdu: bytes) -> None: """Parse a partner acknowledgment PDU. Validates that the PDU is a proper S7 USERDATA response for a push acknowledgment and checks for error codes. """ if len(pdu) < 6: raise S7Error("Invalid partner ACK: too short") protocol_id, pdu_type = struct.unpack(">BB", pdu[:2]) if protocol_id != 0x32: raise S7Error(f"Invalid protocol ID in ACK: {protocol_id:#04x}") if pdu_type != S7PDUType.USERDATA: raise S7Error(f"Expected partner ACK (USERDATA), got {pdu_type:#04x}") # Check for error code in parameter section if len(pdu) >= 10: _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) param = pdu[10 : 10 + param_len] # Parameter layout: 00 01 12 LL [method tg sf seq ...] [error_code] if len(param) >= 4: sub_len = param[3] if sub_len >= 8 and len(param) >= 12: # Error code is at offset 10-11 within param (bytes 6-7 after 12 LL) error_code = struct.unpack(">H", param[10:12])[0] if error_code != 0: raise S7Error(f"Partner ACK error: {error_code:#06x}")
[docs] def __enter__(self) -> "Partner": """Context manager entry.""" return self
[docs] def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> None: """Context manager exit.""" self.destroy()
def __del__(self) -> None: # Best-effort cleanup on garbage collection. Prefer stop() or a # `with` block; during interpreter shutdown module globals may # already be None, so we skip finalization and swallow errors. if sys.is_finalizing(): return try: self.stop() except Exception: pass