"""
Pure Python S7 partner implementation.
S7 peer-to-peer communication for bidirectional data exchange.
Unlike client-server where client requests and server responds,
partners have equal rights and can send data asynchronously.
"""
import socket
import struct
import logging
import threading
from typing import Optional, Tuple, Callable, Type
from queue import Queue, Empty
from typing import Any
from datetime import datetime
from types import TracebackType
from ctypes import c_int32, c_uint32
from .connection import ISOTCPConnection
from .error import S7Error, S7ConnectionError
from .type import Parameter
logger = logging.getLogger(__name__)
[docs]
class PartnerStatus:
"""Partner status constants."""
STOPPED = 0
RUNNING = 1
CONNECTED = 2
[docs]
class Partner:
"""
Pure Python S7 partner implementation.
Implements peer-to-peer S7 communication where both partners can
send and receive data asynchronously. Supports both active (initiates
connection) and passive (waits for connection) modes.
Examples:
>>> import snap7
>>> partner = snap7.Partner(active=True)
>>> partner.start_to("0.0.0.0", "192.168.1.10", 0x0100, 0x0102)
>>> partner.set_send_data(b"Hello")
>>> partner.b_send()
>>> partner.stop()
"""
[docs]
def __init__(self, active: bool = False, **kwargs: object) -> None:
"""
Initialize S7 partner.
Args:
active: If True, this partner initiates the connection.
If False, this partner waits for incoming connections.
**kwargs: Ignored. Kept for backwards compatibility.
"""
self.active = active
self.connected = False
self.running = False
# Connection parameters
self.local_ip = "0.0.0.0"
self.remote_ip = ""
self.local_tsap = 0x0100
self.remote_tsap = 0x0102
self.port = 1102 # Non-privileged port (was 102)
self.local_port = 0 # Let OS choose
self.remote_port = 1102 # Non-privileged port (was 102)
# Socket and connection
self._socket: Optional[socket.socket] = None
self._server_socket: Optional[socket.socket] = None # For passive mode
self._connection: Optional[ISOTCPConnection] = None
# Statistics
self.bytes_sent = 0
self.bytes_recv = 0
self.send_errors = 0
self.recv_errors = 0
# Timing
self.last_send_time = 0
self.last_recv_time = 0
# Callbacks
self._recv_callback: Optional[Callable[[bytes], None]] = None
self._send_callback_fn: Optional[Callable[[int], None]] = None
# Async operation support
self._async_send_queue: Queue[Any] = Queue()
self._async_recv_queue: Queue[Any] = Queue()
self._async_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Last error
self.last_error = 0
# Buffer for send/recv operations
self._send_data: Optional[bytes] = None
self._recv_data: Optional[bytes] = None
self._async_send_in_progress = False
self._async_send_result = 0
logger.info(f"S7 Partner initialized (active={active}, pure Python implementation)")
[docs]
def create(self, active: bool = False) -> None:
"""
Creates a Partner.
Note: For pure Python implementation, the partner is created in __init__.
This method exists for API compatibility.
Args:
active: If True, this partner initiates connections
"""
pass
[docs]
def destroy(self) -> int:
"""
Destroy the Partner.
Returns:
0 on success
"""
self.stop()
return 0
[docs]
def start(self) -> int:
"""
Start the partner with default parameters.
Returns:
0 on success
"""
return self.start_to(self.local_ip, self.remote_ip, self.local_tsap, self.remote_tsap)
[docs]
def start_to(self, local_ip: str, remote_ip: str, local_tsap: int, remote_tsap: int) -> int:
"""
Start the partner with specific connection parameters.
Args:
local_ip: Local IP address to bind to
remote_ip: Remote partner IP address (for active mode)
local_tsap: Local TSAP
remote_tsap: Remote TSAP
Returns:
0 on success
"""
self.local_ip = local_ip
self.remote_ip = remote_ip
self.local_tsap = local_tsap
self.remote_tsap = remote_tsap
try:
if self.active:
# Active mode: initiate connection to remote partner
self._connect_to_remote()
else:
# Passive mode: start listening for incoming connections
self._start_listening()
self.running = True
# Start async processing thread
self._stop_event.clear()
self._async_thread = threading.Thread(target=self._async_processor, daemon=True)
self._async_thread.start()
logger.info(f"Partner started ({'active' if self.active else 'passive'} mode)")
return 0
except Exception as e:
self.last_error = -1
logger.error(f"Partner start failed: {e}")
raise S7ConnectionError(f"Partner start failed: {e}")
[docs]
def stop(self) -> int:
"""
Stop the partner and disconnect.
Returns:
0 on success
"""
self._stop_event.set()
if self._async_thread and self._async_thread.is_alive():
self._async_thread.join(timeout=2.0)
if self._connection:
self._connection.disconnect()
self._connection = None
if self._server_socket:
try:
self._server_socket.close()
except Exception:
pass
self._server_socket = None
if self._socket:
try:
self._socket.close()
except Exception:
pass
self._socket = None
self.connected = False
self.running = False
logger.info("Partner stopped")
return 0
[docs]
def b_send(self) -> int:
"""
Send data synchronously (blocking).
Note: Call set_send_data() first to set the data to send.
Returns:
0 on success
"""
if self._send_data is None:
return -1
if not self.connected or self._connection is None:
self.send_errors += 1
raise S7ConnectionError("Not connected")
start_time = datetime.now()
try:
# Build partner data PDU
pdu = self._build_partner_data_pdu(self._send_data)
# Send via ISO connection
self._connection.send_data(pdu)
# Wait for acknowledgment
ack_data = self._connection.receive_data()
self._parse_partner_ack(ack_data)
self.bytes_sent += len(self._send_data)
self.last_send_time = int((datetime.now() - start_time).total_seconds() * 1000)
logger.debug(f"Sent {len(self._send_data)} bytes synchronously")
return 0
except Exception as e:
self.send_errors += 1
self.last_error = -1
logger.error(f"Synchronous send failed: {e}")
raise S7ConnectionError(f"Send failed: {e}")
[docs]
def b_recv(self) -> int:
"""
Receive data synchronously (blocking).
Returns:
0 on success
"""
if not self.connected or self._connection is None:
self.recv_errors += 1
self._recv_data = None
return -1
start_time = datetime.now()
try:
# Receive partner data
data = self._connection.receive_data()
received = self._parse_partner_data_pdu(data)
# Send acknowledgment
ack = self._build_partner_ack()
self._connection.send_data(ack)
self.bytes_recv += len(received)
self.last_recv_time = int((datetime.now() - start_time).total_seconds() * 1000)
self._recv_data = received
# Call receive callback if set
if self._recv_callback:
self._recv_callback(received)
logger.debug(f"Received {len(received)} bytes synchronously")
return 0
except socket.timeout:
self._recv_data = None
return 1 # Timeout
except Exception as e:
self.recv_errors += 1
self.last_error = -1
self._recv_data = None
logger.error(f"Synchronous receive failed: {e}")
return -1
[docs]
def as_b_send(self) -> int:
"""
Send data asynchronously (non-blocking).
Note: Call set_send_data() first to set the data to send.
Returns:
0 on success (send initiated)
"""
if self._send_data is None:
return -1
if not self.connected:
self.send_errors += 1
return -1
self._async_send_in_progress = True
self._async_send_result = 1 # In progress
# Queue the send operation
self._async_send_queue.put(self._send_data)
logger.debug(f"Async send initiated for {len(self._send_data)} bytes")
return 0
[docs]
def check_as_b_send_completion(self) -> Tuple[str, c_int32]:
"""
Check if async send completed.
Returns:
Tuple of (status_string, operation_result)
"""
if self._async_send_in_progress:
return "job in progress", c_int32(0)
return_values = {
0: "job complete",
1: "job in progress",
-2: "invalid handled supplied",
}
result = self._async_send_result
return return_values.get(0, "unknown"), c_int32(result)
[docs]
def wait_as_b_send_completion(self, timeout: int = 0) -> int:
"""
Wait for async send to complete.
Args:
timeout: Timeout in milliseconds (0 for infinite)
Returns:
0 on success, non-zero on error/timeout
Raises:
RuntimeError: If no async operation is in progress
"""
if not self._async_send_in_progress:
raise RuntimeError("No async send operation in progress")
# Wait for completion
wait_time = timeout / 1000.0 if timeout > 0 else None
start = datetime.now()
while self._async_send_in_progress:
if wait_time is not None:
elapsed = (datetime.now() - start).total_seconds()
if elapsed >= wait_time:
return -1 # Timeout
threading.Event().wait(0.01) # Small sleep
return self._async_send_result
[docs]
def check_as_b_recv_completion(self) -> int:
"""
Check if async receive completed.
Returns:
0 if data available, 1 if in progress
"""
try:
self._recv_data = self._async_recv_queue.get_nowait()
return 0 # Data available
except Empty:
return 1 # No data yet
[docs]
def get_status(self) -> c_int32:
"""
Get partner status.
Returns:
Status code (0=stopped, 1=running, 2=connected)
"""
if self.connected:
return c_int32(PartnerStatus.CONNECTED)
elif self.running:
return c_int32(PartnerStatus.RUNNING)
else:
return c_int32(PartnerStatus.STOPPED)
[docs]
def get_stats(self) -> Tuple[c_uint32, c_uint32, c_uint32, c_uint32]:
"""
Get partner statistics.
Returns:
Tuple of (bytes_sent, bytes_recv, send_errors, recv_errors)
"""
return (c_uint32(self.bytes_sent), c_uint32(self.bytes_recv), c_uint32(self.send_errors), c_uint32(self.recv_errors))
[docs]
def get_times(self) -> Tuple[c_int32, c_int32]:
"""
Get last operation times.
Returns:
Tuple of (last_send_time_ms, last_recv_time_ms)
"""
return c_int32(self.last_send_time), c_int32(self.last_recv_time)
[docs]
def get_last_error(self) -> c_int32:
"""
Get last error code.
Returns:
Last error code
"""
return c_int32(self.last_error)
[docs]
def get_param(self, parameter: Parameter) -> int:
"""
Get partner parameter.
Args:
parameter: Parameter to read
Returns:
Parameter value
"""
param_values = {
Parameter.LocalPort: self.local_port,
Parameter.RemotePort: self.remote_port,
Parameter.PingTimeout: 750,
Parameter.SendTimeout: 10,
Parameter.RecvTimeout: 3000,
Parameter.SrcRef: 256,
Parameter.DstRef: 0,
Parameter.PDURequest: 480,
Parameter.WorkInterval: 100,
Parameter.BSendTimeout: 3000,
Parameter.BRecvTimeout: 3000,
Parameter.RecoveryTime: 500,
Parameter.KeepAliveTime: 5000,
}
value = param_values.get(parameter)
if value is None:
raise RuntimeError(f"Parameter {parameter} not supported")
logger.debug(f"Getting parameter {parameter} = {value}")
return value
[docs]
def set_param(self, parameter: Parameter, value: int) -> int:
"""
Set partner parameter.
Args:
parameter: Parameter to set
value: Value to set
Returns:
0 on success
"""
# Some parameters cannot be set
if parameter == Parameter.RemotePort:
raise RuntimeError(f"Cannot set parameter {parameter}")
if parameter == Parameter.LocalPort:
self.local_port = value
logger.debug(f"Setting parameter {parameter} to {value}")
return 0
[docs]
def set_recv_callback(self) -> int:
"""
Sets the user callback for incoming data.
Returns:
0 on success
"""
logger.debug("set_recv_callback called")
return 0
[docs]
def set_send_callback(self) -> int:
"""
Sets the user callback for completed async sends.
Returns:
0 on success
"""
logger.debug("set_send_callback called")
return 0
[docs]
def set_send_data(self, data: bytes) -> None:
"""
Set data to be sent by b_send() or as_b_send().
Args:
data: Data to send
"""
self._send_data = data
[docs]
def get_recv_data(self) -> Optional[bytes]:
"""
Get data received by b_recv().
Returns:
Received data or None
"""
return self._recv_data
def _connect_to_remote(self) -> None:
"""Connect to remote partner (active mode)."""
if not self.remote_ip:
raise S7ConnectionError("Remote IP not specified for active partner")
self._connection = ISOTCPConnection(
host=self.remote_ip, port=self.port, local_tsap=self.local_tsap, remote_tsap=self.remote_tsap
)
self._connection.connect()
self._socket = self._connection.socket
self.connected = True
logger.info(f"Connected to remote partner at {self.remote_ip}:{self.port}")
def _start_listening(self) -> None:
"""Start listening for incoming connections (passive mode)."""
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Try to use SO_REUSEPORT if available (Linux, macOS) for faster port reuse
if hasattr(socket, "SO_REUSEPORT"):
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self._server_socket.bind((self.local_ip, self.port))
self._server_socket.listen(1)
self._server_socket.settimeout(1.0) # Allow periodic check
logger.info(f"Partner listening on {self.local_ip}:{self.port}")
# Start accept thread
accept_thread = threading.Thread(target=self._accept_connection, daemon=True)
accept_thread.start()
def _accept_connection(self) -> None:
"""Accept incoming connection in passive mode."""
if self._server_socket is None:
return
while self.running and not self._stop_event.is_set():
try:
client_sock, addr = self._server_socket.accept()
# Create connection object
self._socket = client_sock
self._connection = ISOTCPConnection(
host=addr[0], port=addr[1], local_tsap=self.local_tsap, remote_tsap=self.remote_tsap
)
self._connection.socket = client_sock
self._connection.connected = True
self.connected = True
logger.info(f"Partner connection accepted from {addr}")
break
except socket.timeout:
continue
except Exception as e:
if self.running:
logger.error(f"Accept failed: {e}")
break
def _async_processor(self) -> None:
"""Background thread for processing async operations."""
while not self._stop_event.is_set():
# Process async sends
try:
data = self._async_send_queue.get(timeout=0.1)
try:
# Temporarily set send data and call b_send
old_data = self._send_data
self._send_data = data
result = self.b_send()
self._send_data = old_data
self._async_send_result = result
if self._send_callback_fn:
self._send_callback_fn(result)
except Exception as e:
self._async_send_result = -1
logger.error(f"Async send failed: {e}")
finally:
self._async_send_in_progress = False
except Empty:
pass
except Exception:
break
def _build_partner_data_pdu(self, data: bytes) -> bytes:
"""
Build partner data PDU.
Args:
data: Data to send
Returns:
PDU bytes
"""
# S7 partner data PDU format:
# Header + Data
header = struct.pack(
">BBHH",
0x32, # Protocol ID (S7)
0x07, # Partner PDU type
len(data), # Data length high
0x0000, # Reserved
)
return header + data
def _parse_partner_data_pdu(self, pdu: bytes) -> bytes:
"""
Parse partner data PDU.
Args:
pdu: PDU bytes
Returns:
Extracted data
"""
if len(pdu) < 6:
raise S7Error("Invalid partner PDU: too short")
# Skip header
return pdu[6:]
def _build_partner_ack(self) -> bytes:
"""Build partner acknowledgment PDU."""
return struct.pack(
">BBHH",
0x32, # Protocol ID
0x08, # ACK type
0x0000, # Reserved
0x0000, # Status OK
)
def _parse_partner_ack(self, pdu: bytes) -> None:
"""Parse partner acknowledgment PDU."""
if len(pdu) < 6:
raise S7Error("Invalid partner ACK: too short")
protocol_id, pdu_type = struct.unpack(">BB", pdu[:2])
if pdu_type != 0x08:
raise S7Error(f"Expected partner ACK, got {pdu_type:#02x}")
[docs]
def __enter__(self) -> "Partner":
"""Context manager entry."""
return self
[docs]
def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
"""Context manager exit."""
self.destroy()
[docs]
def __del__(self) -> None:
"""Destructor."""
try:
self.stop()
except Exception:
pass