"""
ISO on TCP connection management (RFC 1006).
Implements TPKT (Transport Service on top of TCP) and COTP (Connection Oriented
Transport Protocol) layers for S7 communication.
"""
import socket
import struct
import logging
from enum import IntEnum
from typing import Optional, Type
from types import TracebackType
from .error import S7ConnectionError, S7TimeoutError
[docs]
class TPDUSize(IntEnum):
"""TPDU sizes per ISO 8073 / RFC 905.
The value is the exponent: actual size = 2^value bytes.
"""
S_128 = 0x07
S_256 = 0x08
S_512 = 0x09
S_1024 = 0x0A
S_2048 = 0x0B
S_4096 = 0x0C
S_8192 = 0x0D
logger = logging.getLogger(__name__)
[docs]
class ISOTCPConnection:
"""
ISO on TCP connection implementation.
Handles the transport layer for S7 communication including:
- TCP socket management
- TPKT framing (RFC 1006)
- COTP connection setup and data transfer
- PDU size negotiation
"""
# COTP PDU types
COTP_CR = 0xE0 # Connection Request
COTP_CC = 0xD0 # Connection Confirm
COTP_DR = 0x80 # Disconnect Request
COTP_DC = 0xC0 # Disconnect Confirm
COTP_DT = 0xF0 # Data Transfer
COTP_ED = 0x10 # Expedited Data
COTP_AK = 0x60 # Data Acknowledgment
COTP_EA = 0x20 # Expedited Acknowledgment
COTP_RJ = 0x50 # Reject
COTP_ER = 0x70 # Error
# COTP parameter codes (ISO 8073)
COTP_PARAM_PDU_SIZE = 0xC0
COTP_PARAM_CALLING_TSAP = 0xC1
COTP_PARAM_CALLED_TSAP = 0xC2
[docs]
def __init__(
self,
host: str,
port: int = 102,
local_tsap: int = 0x0100,
remote_tsap: int = 0x0102,
tpdu_size: TPDUSize = TPDUSize.S_1024,
):
"""
Initialize ISO TCP connection.
Args:
host: Target PLC IP address
port: TCP port (default 102 for S7)
local_tsap: Local Transport Service Access Point
remote_tsap: Remote Transport Service Access Point
tpdu_size: TPDU size to request during COTP negotiation
"""
self.host = host
self.port = port
self.local_tsap = local_tsap
self.remote_tsap = remote_tsap
self.tpdu_size = tpdu_size
self.socket: Optional[socket.socket] = None
self.connected = False
self.pdu_size = 240 # Default PDU size, negotiated during connection
self.timeout = 5.0 # Default timeout in seconds
# Connection parameters
self.src_ref = 0x0001 # Source reference
self.dst_ref = 0x0000 # Destination reference (assigned by peer)
[docs]
def connect(self, timeout: float = 5.0) -> None:
"""
Establish ISO on TCP connection.
Args:
timeout: Connection timeout in seconds
"""
self.timeout = timeout
try:
# Step 1: TCP connection
self._tcp_connect()
# Step 2: ISO connection (COTP handshake)
self._iso_connect()
self.connected = True
logger.info(f"Connected to {self.host}:{self.port}, PDU size: {self.pdu_size}")
except Exception as e:
self.disconnect()
if isinstance(e, (S7ConnectionError, S7TimeoutError)):
raise
else:
raise S7ConnectionError(f"Connection failed: {e}")
[docs]
def disconnect(self) -> None:
"""Disconnect from S7 device."""
if self.socket:
try:
if self.connected:
# Send COTP disconnect request
self._send_cotp_disconnect()
self.socket.close()
except Exception:
pass # Ignore errors during disconnect
finally:
self.socket = None
self.connected = False
logger.info(f"Disconnected from {self.host}:{self.port}")
[docs]
def send_data(self, data: bytes) -> None:
"""
Send data over ISO connection.
Args:
data: S7 PDU data to send
"""
if not self.connected or self.socket is None:
raise S7ConnectionError("Not connected")
# Wrap data in COTP Data Transfer PDU
cotp_data = self._build_cotp_dt(data)
# Wrap in TPKT frame
tpkt_frame = self._build_tpkt(cotp_data)
# Send over TCP
try:
self.socket.sendall(tpkt_frame)
logger.debug(f"Sent {len(tpkt_frame)} bytes")
except socket.error as e:
self.connected = False
raise S7ConnectionError(f"Send failed: {e}")
[docs]
def receive_data(self) -> bytes:
"""
Receive data from ISO connection.
Returns:
S7 PDU data
"""
if not self.connected:
raise S7ConnectionError("Not connected")
try:
# Receive TPKT header (4 bytes)
tpkt_header = self._recv_exact(4)
# Parse TPKT header
version, reserved, length = struct.unpack(">BBH", tpkt_header)
if version != 3:
raise S7ConnectionError(f"Invalid TPKT version: {version}")
# Receive remaining data
remaining = length - 4
if remaining <= 0:
raise S7ConnectionError("Invalid TPKT length")
payload = self._recv_exact(remaining)
# Parse COTP header and extract data
return self._parse_cotp_data(payload)
except socket.timeout:
self.connected = False
raise S7TimeoutError("Receive timeout")
except socket.error as e:
self.connected = False
raise S7ConnectionError(f"Receive failed: {e}")
def _tcp_connect(self) -> None:
"""Establish TCP connection."""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout)
try:
self.socket.connect((self.host, self.port))
logger.debug(f"TCP connected to {self.host}:{self.port}")
except socket.error as e:
raise S7ConnectionError(f"TCP connection failed: {e}")
def _iso_connect(self) -> None:
"""Establish ISO connection using COTP handshake."""
if self.socket is None:
raise S7ConnectionError("Socket not initialized")
# Send Connection Request
cr_pdu = self._build_cotp_cr()
tpkt_frame = self._build_tpkt(cr_pdu)
self.socket.sendall(tpkt_frame)
logger.debug("Sent COTP Connection Request")
# Receive Connection Confirm
tpkt_header = self._recv_exact(4)
version, reserved, length = struct.unpack(">BBH", tpkt_header)
if version != 3:
raise S7ConnectionError(f"Invalid TPKT version in response: {version}")
payload = self._recv_exact(length - 4)
self._parse_cotp_cc(payload)
logger.debug("Received COTP Connection Confirm")
def _build_tpkt(self, payload: bytes) -> bytes:
"""
Build TPKT frame.
TPKT Header (4 bytes):
- Version (1 byte): Always 3
- Reserved (1 byte): Always 0
- Length (2 bytes): Total frame length including header
"""
length = len(payload) + 4
return struct.pack(">BBH", 3, 0, length) + payload
def _build_cotp_cr(self) -> bytes:
"""
Build COTP Connection Request PDU.
COTP CR format:
- PDU Length: Length of COTP header (excluding this byte)
- PDU Type: 0xE0 (Connection Request)
- Destination Reference: 2 bytes
- Source Reference: 2 bytes
- Class/Option: 1 byte
- Parameters: Variable length
"""
# Basic COTP CR without parameters
base_pdu = struct.pack(
">BBHHB",
6, # PDU length (header without parameters)
self.COTP_CR, # PDU type
0x0000, # Destination reference (0 for CR)
self.src_ref, # Source reference
0x00, # Class/option (Class 0, no extended formats)
)
# Add TSAP parameters
tsap_length = 2 # TSAP values are 2 bytes (unsigned short)
# Calling TSAP (local)
calling_tsap = struct.pack(">BBH", self.COTP_PARAM_CALLING_TSAP, tsap_length, self.local_tsap)
# Called TSAP (remote)
called_tsap = struct.pack(">BBH", self.COTP_PARAM_CALLED_TSAP, tsap_length, self.remote_tsap)
# PDU Size parameter (ISO 8073 code, e.g. 0x0A = 1024 bytes)
pdu_size_param = struct.pack(">BBB", self.COTP_PARAM_PDU_SIZE, 1, self.tpdu_size)
parameters = calling_tsap + called_tsap + pdu_size_param
# Update PDU length to include parameters
total_length = 6 + len(parameters)
pdu = struct.pack(">B", total_length) + base_pdu[1:] + parameters
return pdu
def _parse_cotp_cc(self, data: bytes) -> None:
"""
Parse COTP Connection Confirm PDU.
Extracts destination reference and negotiated PDU size.
"""
if len(data) < 7:
raise S7ConnectionError("Invalid COTP CC: too short")
pdu_len, pdu_type, dst_ref, src_ref, class_opt = struct.unpack(">BBHHB", data[:7])
if pdu_type != self.COTP_CC:
raise S7ConnectionError(f"Expected COTP CC, got {pdu_type:#02x}")
self.dst_ref = dst_ref
# Parse parameters if present
if len(data) > 7:
self._parse_cotp_parameters(data[7:])
def _parse_cotp_parameters(self, params: bytes) -> None:
"""Parse COTP parameters from Connection Confirm."""
offset = 0
while offset < len(params):
if offset + 2 > len(params):
break
param_code = params[offset]
param_len = params[offset + 1]
if offset + 2 + param_len > len(params):
break
param_data = params[offset + 2 : offset + 2 + param_len]
if param_code == self.COTP_PARAM_PDU_SIZE:
# PDU Size parameter
if param_len == 1:
# ISO 8073 code: size = 2^code
self.pdu_size = 1 << param_data[0]
elif param_len == 2:
# Raw 2-byte value
self.pdu_size = struct.unpack(">H", param_data)[0]
logger.debug(f"Negotiated PDU size: {self.pdu_size}")
else:
logger.debug(f"Unsupported COTP parameter: code={param_code:#04x}, length={param_len}")
offset += 2 + param_len
def _build_cotp_dt(self, data: bytes) -> bytes:
"""
Build COTP Data Transfer PDU.
COTP DT format:
- PDU Length: 2 (fixed for DT)
- PDU Type: 0xF0 (Data Transfer)
- EOT + Number: 0x80 (End of TSDU, sequence number 0)
- Data: Variable length
"""
header = struct.pack(">BBB", 2, self.COTP_DT, 0x80)
return header + data
def _parse_cotp_data(self, cotp_pdu: bytes) -> bytes:
"""
Parse COTP Data Transfer PDU and extract S7 data.
"""
if len(cotp_pdu) < 3:
raise S7ConnectionError("Invalid COTP DT: too short")
pdu_len, pdu_type, eot_num = struct.unpack(">BBB", cotp_pdu[:3])
if pdu_type != self.COTP_DT:
raise S7ConnectionError(f"Expected COTP DT, got {pdu_type:#02x}")
return cotp_pdu[3:] # Return data portion
def _send_cotp_disconnect(self) -> None:
"""Send COTP Disconnect Request."""
if self.socket is None:
return # Nothing to disconnect
dr_pdu = struct.pack(
">BBHHBB",
6, # PDU length
self.COTP_DR, # PDU type
self.dst_ref, # Destination reference
self.src_ref, # Source reference
0x00, # Reason (normal disconnect)
0x00, # Additional info
)
tpkt_frame = self._build_tpkt(dr_pdu)
try:
self.socket.sendall(tpkt_frame)
except socket.error:
pass # Ignore errors during disconnect
def _recv_exact(self, size: int) -> bytes:
"""
Receive exactly the specified number of bytes.
Args:
size: Number of bytes to receive
Returns:
Received data
Raises:
S7ConnectionError: If connection is lost
S7TimeoutError: If timeout occurs
"""
if self.socket is None:
raise S7ConnectionError("Socket not initialized")
data = bytearray()
while len(data) < size:
try:
chunk = self.socket.recv(size - len(data))
if not chunk:
self.connected = False
raise S7ConnectionError("Connection closed by peer")
data.extend(chunk)
except socket.timeout:
self.connected = False
raise S7TimeoutError("Receive timeout")
except socket.error as e:
self.connected = False
raise S7ConnectionError(f"Receive error: {e}")
return bytes(data)
[docs]
def check_connection(self) -> bool:
"""Check if the TCP connection is still alive.
Uses a non-blocking socket peek to detect broken connections.
"""
if not self.connected or self.socket is None:
return False
try:
original_timeout = self.socket.gettimeout()
self.socket.settimeout(0)
try:
data = self.socket.recv(1, socket.MSG_PEEK)
if not data:
self.connected = False
return False
return True
except BlockingIOError:
# No data available but connection is still alive
return True
except (socket.error, OSError):
self.connected = False
return False
finally:
self.socket.settimeout(original_timeout)
except Exception:
return False
[docs]
def __enter__(self) -> "ISOTCPConnection":
"""Context manager entry."""
return self
[docs]
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Context manager exit."""
self.disconnect()