"""
S7 protocol implementation.
Handles S7 PDU encoding/decoding and protocol operations.
"""
import struct
import logging
from datetime import datetime
from typing import List, Dict, Any
from enum import IntEnum
from .datatypes import S7Area, S7WordLen, S7DataTypes
from .error import S7ProtocolError, S7StalePacketError, S7PacketLostError, get_protocol_error_message
logger = logging.getLogger(__name__)
[docs]
class S7Function(IntEnum):
"""S7 protocol function codes."""
READ_AREA = 0x04
WRITE_AREA = 0x05
REQUEST_DOWNLOAD = 0x1A
DOWNLOAD_BLOCK = 0x1B
DOWNLOAD_ENDED = 0x1C
START_UPLOAD = 0x1D
UPLOAD = 0x1E
END_UPLOAD = 0x1F
PLC_CONTROL = 0x28
PLC_STOP = 0x29
SETUP_COMMUNICATION = 0xF0
[docs]
class S7PDUType(IntEnum):
"""S7 PDU type codes."""
REQUEST = 0x01
ACK = 0x02 # Acknowledge without data (e.g., write responses)
ACK_DATA = 0x03 # Acknowledge with data (e.g., read responses)
USERDATA = 0x07
[docs]
class S7UserDataGroup(IntEnum):
"""S7 USER_DATA type groups (from s7_types.h)."""
PROGRAMMER = 0x01 # grProgrammer
CYCLIC_DATA = 0x02 # grCyclicData
BLOCK_INFO = 0x03 # grBlocksInfo
SZL = 0x04 # grSZL
SECURITY = 0x05 # grPassword
TIME = 0x07 # grClock
[docs]
class S7UserDataSubfunction(IntEnum):
"""S7 USER_DATA subfunctions."""
# Block info subfunctions
LIST_ALL = 0x01 # SFun_ListAll
LIST_BLOCKS_OF_TYPE = 0x02 # SFun_ListBoT
BLOCK_INFO = 0x03 # SFun_BlkInfo
# SZL subfunctions
READ_SZL = 0x01 # SFun_ReadSZL
SYSTEM_STATE = 0x02 # System state request
# Clock subfunctions
GET_CLOCK = 0x01
SET_CLOCK = 0x02
# S7 data section return codes with human-readable descriptions
S7_RETURN_CODES: Dict[int, str] = {
0x00: "Reserved",
0x01: "Hardware error",
0x03: "Accessing the object not allowed",
0x05: "Invalid address",
0x06: "Data type not supported",
0x07: "Data type inconsistent",
0x0A: "Object does not exist",
0x10: "Invalid block type number",
0x11: "Block not found in storage medium",
0x12: "Block already exists",
0x13: "Block is protected",
0x14: "Block download without proper block first",
0x19: "Block download sequence error",
0x1A: "Insufficient working memory",
0x1B: "Insufficient load memory",
0x1C: "Not enough work retentive data (instance DBs)",
0x1D: "Interface error",
0x1E: "Delete block refused",
0x20: "Invalid parameter",
0x21: "PG resource error (max connections reached)",
0xFF: "Success",
}
[docs]
def get_return_code_description(return_code: int) -> str:
"""Get human-readable description for S7 return code."""
if return_code in S7_RETURN_CODES:
return S7_RETURN_CODES[return_code]
return "Unknown error"
[docs]
class S7Protocol:
"""
S7 protocol implementation.
Handles encoding and decoding of S7 PDUs for communication with Siemens PLCs.
"""
def __init__(self) -> None:
self.sequence = 0 # Message sequence counter
def _next_sequence(self) -> int:
"""Get next sequence number for S7 PDU."""
self.sequence = (self.sequence + 1) & 0xFFFF
return self.sequence
[docs]
def validate_pdu_reference(self, response_sequence: int) -> None:
"""Validate the PDU reference number from a response.
Compares the response sequence number against the expected (current) sequence.
Args:
response_sequence: Sequence number from the response PDU.
Raises:
S7StalePacketError: If response is older than expected (stale).
S7PacketLostError: If response is ahead of expected (packet loss).
"""
if response_sequence < self.sequence:
raise S7StalePacketError(f"Stale packet: expected sequence {self.sequence}, got {response_sequence}")
elif response_sequence > self.sequence:
raise S7PacketLostError(f"Packet lost: expected sequence {self.sequence}, got {response_sequence}")
[docs]
def build_read_request(self, area: S7Area, db_number: int, start: int, word_len: S7WordLen, count: int) -> bytes:
"""
Build S7 read request PDU.
Args:
area: Memory area to read from
db_number: DB number (for DB area)
start: Start address/offset
word_len: Data word length
count: Number of items to read
Returns:
Complete S7 PDU
"""
# S7 Header (12 bytes)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
0x000E, # Parameter length (14 bytes)
0x0000, # Data length (no data for read)
)
# Parameter section (14 bytes)
parameters = struct.pack(
">BBB",
S7Function.READ_AREA, # Function code
0x01, # Item count
0x12, # Variable specification
)
# Add address specification
address_spec = S7DataTypes.encode_address(area, db_number, start, word_len, count)
parameters += address_spec[1:] # Skip first byte (already included as 0x12)
return header + parameters
[docs]
def build_write_request(self, area: S7Area, db_number: int, start: int, word_len: S7WordLen, data: bytes) -> bytes:
"""
Build S7 write request PDU.
Args:
area: Memory area to write to
db_number: DB number (for DB area)
start: Start address/offset
word_len: Data word length
data: Data to write
Returns:
Complete S7 PDU
"""
# Calculate count from data length
item_size = S7DataTypes.get_size_bytes(word_len, 1)
count = len(data) // item_size
# Parameter length: function + item count + address spec
param_len = 3 + 11 # 14 bytes total
# Data length: transport size + data
data_len = 4 + len(data) # Transport size (4 bytes) + actual data
# S7 Header
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
param_len, # Parameter length
data_len, # Data length
)
# Parameter section
parameters = struct.pack(
">BBB",
S7Function.WRITE_AREA, # Function code
0x01, # Item count
0x12, # Variable specification
)
# Add address specification
address_spec = S7DataTypes.encode_address(area, db_number, start, word_len, count)
parameters += address_spec[1:] # Skip first byte
# Map word_len to data section transport size
# Data section uses different transport size codes than address specification:
# - 0x03 = BIT
# - 0x04 = BYTE/WORD/DWORD (byte-oriented data)
# - 0x05 = INT
# - 0x06 = DINT
# - 0x07 = REAL
# - 0x09 = OCTET STRING
transport_size_map = {
S7WordLen.BIT: 0x03,
S7WordLen.BYTE: 0x04,
S7WordLen.CHAR: 0x04,
S7WordLen.WORD: 0x04,
S7WordLen.INT: 0x05,
S7WordLen.DWORD: 0x04,
S7WordLen.DINT: 0x06,
S7WordLen.REAL: 0x07,
S7WordLen.COUNTER: 0x04,
S7WordLen.TIMER: 0x04,
}
transport_size = transport_size_map.get(word_len, 0x04)
# Data section
data_section = (
struct.pack(
">BBH",
0x00, # Reserved/Error
transport_size, # Transport size (proper S7 data section format)
len(data) * 8, # Bit length (data length in bits)
)
+ data
)
return header + parameters + data_section
[docs]
def build_setup_communication_request(self, max_amq_caller: int = 1, max_amq_callee: int = 1, pdu_length: int = 480) -> bytes:
"""
Build S7 setup communication request.
This negotiates communication parameters with the PLC.
"""
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
0x0008, # Parameter length (8 bytes)
0x0000, # Data length
)
parameters = struct.pack(
">BBHHH",
S7Function.SETUP_COMMUNICATION, # Function code
0x00, # Reserved
max_amq_caller, # Max AMQ caller
max_amq_callee, # Max AMQ callee
pdu_length, # PDU length
)
return header + parameters
[docs]
def build_plc_control_request(self, operation: str) -> bytes:
"""
Build PLC control request.
Args:
operation: Control operation ('stop', 'hot_start', 'cold_start')
Returns:
Complete S7 PDU for PLC control
"""
# Map operations to S7 control codes
control_codes = {
"stop": 0x29, # PLC_STOP
"hot_start": 0x28, # PLC_CONTROL (warm restart)
"cold_start": 0x28, # PLC_CONTROL (cold restart)
}
if operation not in control_codes:
raise ValueError(f"Unknown PLC control operation: {operation}")
function_code = control_codes[operation]
# Build control-specific parameters
if operation == "stop":
# Simple stop command
param_data = struct.pack(">B", function_code)
else:
# Start commands with restart type
restart_type = 1 if operation == "hot_start" else 2 # 1=warm, 2=cold
param_data = struct.pack(">BB", function_code, restart_type)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
[docs]
def check_control_response(self, response: Dict[str, Any]) -> None:
"""
Check PLC control response for errors.
Args:
response: Parsed S7 response
Raises:
S7ProtocolError: If control operation failed
"""
# For now, just check that we got a response
# In a full implementation, we would check specific error codes
if response.get("error_code", 0) != 0:
raise S7ProtocolError(f"PLC control failed with error: {response['error_code']}")
[docs]
def build_compress_request(self) -> bytes:
"""
Build PLC control request for memory compression.
Uses PI service "_MSZL" (compress memory).
Returns:
Complete S7 PDU for compress request
"""
# PI service command for compress
pi_service = b"_MSZL"
# Parameter section: function code + PI service
# Format: func(1) + unknown(7) + pi_len(1) + pi_service
param_data = (
struct.pack(
">BBBBBBBBB",
S7Function.PLC_CONTROL, # 0x28
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
len(pi_service), # PI service length
)
+ pi_service
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
[docs]
def build_copy_ram_to_rom_request(self) -> bytes:
"""
Build PLC control request for copying RAM to ROM.
Uses PI service "_MSZL" with file system parameters.
Returns:
Complete S7 PDU for copy RAM to ROM request
"""
# PI service command for copy RAM to ROM
# Uses EP parameter for target file system
pi_service = b"_MSZL"
file_id = b"P" # P = passive file system (ROM)
# Parameter section with file system identifier
param_data = (
struct.pack(
">BBBBBBBBB",
S7Function.PLC_CONTROL, # 0x28
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
len(file_id), # File ID length
len(pi_service), # PI service length
)
+ file_id
+ pi_service
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
# ========================================================================
# Block Transfer PDU Builders (Upload/Download)
# ========================================================================
[docs]
def build_start_upload_request(self, block_type: int, block_num: int) -> bytes:
"""
Build start upload request.
Args:
block_type: Block type code (0x38=OB, 0x41=DB, 0x42=SDB, 0x43=FC, 0x44=SFC, 0x45=FB, 0x46=SFB)
block_num: Block number
Returns:
Complete S7 PDU for start upload request
"""
# Block address string: e.g., "0A00001P" for DB1
# Format: block_type (2 hex) + block_num (5 digits) + file_system (1 char)
block_addr = f"{block_type:02X}{block_num:05d}A".encode("ascii")
# Parameters: function + status + reserved + upload_id + block_addr_len + block_addr
param_data = (
struct.pack(
">BBBIB",
S7Function.START_UPLOAD, # Function code
0x00, # Status
0x00, # Reserved (error code)
0x00000000, # Upload ID (0 for start)
len(block_addr), # Block address length
)
+ block_addr
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
[docs]
def build_upload_request(self, upload_id: int) -> bytes:
"""
Build upload request to get block data.
Args:
upload_id: Upload ID from start upload response
Returns:
Complete S7 PDU for upload request
"""
param_data = struct.pack(
">BBBI",
S7Function.UPLOAD, # Function code
0x00, # Status
0x00, # Reserved
upload_id, # Upload ID
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
[docs]
def build_end_upload_request(self, upload_id: int) -> bytes:
"""
Build end upload request.
Args:
upload_id: Upload ID from start upload response
Returns:
Complete S7 PDU for end upload request
"""
param_data = struct.pack(
">BBBI",
S7Function.END_UPLOAD, # Function code
0x00, # Status
0x00, # Reserved
upload_id, # Upload ID
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
[docs]
def parse_start_upload_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse start upload response.
Returns:
Dictionary with upload_id and block_length
"""
result = {"upload_id": 0, "block_length": 0}
raw_params = response.get("raw_parameters", b"")
if len(raw_params) >= 8:
# Parse: function + status + reserved + upload_id
result["upload_id"] = struct.unpack(">I", raw_params[4:8])[0]
if len(raw_params) > 8:
# Block length string follows
len_field = raw_params[8]
if len(raw_params) > 9 + len_field:
length_str = raw_params[9 : 9 + len_field]
try:
result["block_length"] = int(length_str)
except ValueError:
pass
return result
[docs]
def parse_upload_response(self, response: Dict[str, Any]) -> bytes:
"""
Parse upload response and extract block data.
Returns:
Block data bytes
"""
data_info = response.get("data", {})
raw_data: bytes = data_info.get("data", b"")
# Skip the data header if present (length + unknown bytes)
if len(raw_data) > 2:
return raw_data
return b""
[docs]
def build_download_request(self, block_type: int, block_num: int, block_data: bytes) -> bytes:
"""
Build request download request.
Args:
block_type: Block type code
block_num: Block number
block_data: Block data to download
Returns:
Complete S7 PDU for request download
"""
# Block address string
block_addr = f"{block_type:02X}{block_num:05d}P".encode("ascii")
# Block length as string
length_str = f"{len(block_data):06d}".encode("ascii")
# Parameters
param_data = (
struct.pack(
">BBBBB",
S7Function.REQUEST_DOWNLOAD, # Function code
0x00, # Status
0x00, # Reserved
0x00, # Reserved
len(block_addr), # Block address length
)
+ block_addr
+ struct.pack(">B", len(length_str))
+ length_str
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
[docs]
def build_delete_block_request(self, block_type: int, block_num: int) -> bytes:
"""
Build delete block request.
Uses PLC_CONTROL with PI service "_DELE" for block deletion.
Args:
block_type: Block type code
block_num: Block number
Returns:
Complete S7 PDU for delete block request
"""
# PI service for delete
pi_service = b"_DELE"
# Block specification: type + number + filesystem
block_spec = f"{block_type:02X}{block_num:05d}P".encode("ascii")
# Parameter section
param_data = (
struct.pack(
">BBBBBBBBB",
S7Function.PLC_CONTROL, # 0x28
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
0x00, # Reserved
len(block_spec), # Block spec length
len(pi_service), # PI service length
0x00, # Reserved
)
+ block_spec
+ pi_service
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
0x0000, # Data length
)
return header + param_data
# ========================================================================
# USER_DATA PDU Builders (Chunk 3 of protocol implementation)
# ========================================================================
[docs]
def build_list_blocks_request(self) -> bytes:
"""
Build USER_DATA request for listing all blocks.
Returns:
Complete S7 PDU for list blocks request
"""
# USER_DATA PDU format:
# - S7 header (10 bytes)
# - Parameter section (8 bytes for USER_DATA)
# - Data section (4 bytes for list blocks)
# Parameter section for USER_DATA request
# Format: header + method + type|group + subfunction + seq
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
0x43, # Type (4=request) | Group (3=grBlocksInfo)
S7UserDataSubfunction.LIST_ALL, # Subfunction (0x01 = list all)
0x00, # DataRef (0x00 for initial request)
)
# Data section: return code placeholder
data_section = struct.pack(
">BBH",
0x0A, # Return value (request)
0x00, # Transport size
0x0000, # Length (0 for request)
)
# S7 header for USER_DATA
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def build_list_blocks_of_type_request(self, block_type: int) -> bytes:
"""
Build USER_DATA request for listing blocks of a specific type.
Args:
block_type: Block type code (e.g., 0x41 for DB)
Returns:
Complete S7 PDU for list blocks of type request
"""
# Parameter section for USER_DATA request
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
0x43, # Type (4=request) | Group (3=grBlocksInfo)
S7UserDataSubfunction.LIST_BLOCKS_OF_TYPE, # Subfunction (0x02)
0x00, # DataRef (0x00 for initial request)
)
# Data section: block type (0x30 prefix + type per Snap7 C format)
data_section = struct.pack(
">BBHBBBB",
0x0A, # Return value (request)
0x00, # Transport size
0x0004, # Length (4 bytes)
0x30, # Block type indicator
block_type, # Block type code
0x0A, # Trailing bytes per Snap7 C
0x00,
)
# S7 header for USER_DATA
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def parse_list_blocks_response(self, response: Dict[str, Any]) -> Dict[str, int]:
"""
Parse list blocks response and extract block counts.
Args:
response: Parsed S7 response
Returns:
Dictionary mapping block type names to counts
"""
result = {
"OBCount": 0,
"FBCount": 0,
"FCCount": 0,
"SFBCount": 0,
"SFCCount": 0,
"DBCount": 0,
"SDBCount": 0,
}
data_info = response.get("data", {})
raw_data = data_info.get("data", b"")
if not raw_data:
return result
# Parse block entries (4 bytes each: 0x30 | type | count_hi | count_lo)
# Block type codes
type_to_name = {
0x38: "OBCount", # Organization Block
0x41: "DBCount", # Data Block
0x42: "SDBCount", # System Data Block
0x43: "FCCount", # Function
0x44: "SFCCount", # System Function
0x45: "FBCount", # Function Block
0x46: "SFBCount", # System Function Block
}
offset = 0
while offset + 4 <= len(raw_data):
indicator = raw_data[offset]
block_type = raw_data[offset + 1]
count = struct.unpack(">H", raw_data[offset + 2 : offset + 4])[0]
if indicator == 0x30 and block_type in type_to_name:
result[type_to_name[block_type]] = count
offset += 4
return result
[docs]
def parse_list_blocks_of_type_response(self, response: Dict[str, Any]) -> List[int]:
"""
Parse list blocks of type response and extract block numbers.
Args:
response: Parsed S7 response
Returns:
List of block numbers
"""
result: List[int] = []
data_info = response.get("data", {})
raw_data = data_info.get("data", b"")
if not raw_data:
return result
# Parse block entries (4 bytes each per TDataFunGetBotItem:
# BlockNum(2) + Unknown(1) + BlockLang(1))
offset = 0
while offset + 4 <= len(raw_data):
block_num = struct.unpack(">H", raw_data[offset : offset + 2])[0]
result.append(block_num)
offset += 4
return result
[docs]
def build_get_block_info_request(self, block_type: int, block_num: int) -> bytes:
"""
Build USER_DATA request for getting block information.
Args:
block_type: Block type code (0x38=OB, 0x41=DB, 0x42=SDB, 0x43=FC, 0x44=SFC, 0x45=FB, 0x46=SFB)
block_num: Block number
Returns:
Complete S7 PDU for get block info request
"""
# Parameter section for USER_DATA block info request
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
0x43, # Type (4=request) | Group (3=grBlocksInfo)
S7UserDataSubfunction.BLOCK_INFO, # Subfunction (0x03)
0x00, # DataRef (0x00 for initial request)
)
# Data section: [0x30, type, 'A', ASCII_num(5)] per Snap7 C format
# Block number is 5-digit zero-padded ASCII (e.g., 1 -> "00001")
block_num_ascii = f"{block_num:05d}".encode("ascii")
data_payload = struct.pack(">BB", 0x30, block_type) + b"A" + block_num_ascii
data_section = (
struct.pack(
">BBH",
0x0A, # Return value (request)
0x00, # Transport size
len(data_payload), # Length
)
+ data_payload
)
# S7 header for USER_DATA
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def parse_get_block_info_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse get block info response.
Args:
response: Parsed S7 response
Returns:
Dictionary with block info fields
"""
result: Dict[str, Any] = {
"block_type": 0,
"block_number": 0,
"block_lang": 0,
"block_flags": 0,
"mc7_size": 0,
"load_size": 0,
"local_data": 0,
"sbb_length": 0,
"checksum": 0,
"version": 0,
"code_date": b"",
"intf_date": b"",
"author": b"",
"family": b"",
"header": b"",
}
data_info = response.get("data", {})
raw_data = data_info.get("data", b"")
if len(raw_data) < 78:
return result
# Parse block info structure per TResDataBlockInfo layout
result["block_type"] = raw_data[1]
result["block_flags"] = raw_data[9]
result["block_lang"] = raw_data[10]
result["block_number"] = struct.unpack(">H", raw_data[12:14])[0]
result["load_size"] = struct.unpack(">I", raw_data[14:18])[0]
result["sbb_length"] = struct.unpack(">H", raw_data[34:36])[0]
result["local_data"] = struct.unpack(">H", raw_data[38:40])[0]
result["mc7_size"] = struct.unpack(">H", raw_data[40:42])[0]
result["version"] = raw_data[66]
result["checksum"] = struct.unpack(">H", raw_data[68:70])[0]
# Dates (6 bytes each: ms(2) + days(2) + reserved(2))
result["code_date"] = raw_data[22:28]
result["intf_date"] = raw_data[28:34]
# Strings (8 bytes each)
result["author"] = raw_data[42:50]
result["family"] = raw_data[50:58]
result["header"] = raw_data[58:66]
return result
[docs]
def build_read_szl_request(self, szl_id: int, szl_index: int) -> bytes:
"""
Build USER_DATA request for reading SZL (System Status List).
Args:
szl_id: SZL identifier
szl_index: SZL index
Returns:
Complete S7 PDU for read SZL request
"""
# Parameter section for USER_DATA SZL request
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
0x44, # Type (4=request) | Group (4=grSZL)
S7UserDataSubfunction.READ_SZL, # Subfunction (0x01)
0x00, # DataRef (0x00 for initial request)
)
# Data section: SZL ID and Index
data_section = struct.pack(
">BBHHH",
0x0A, # Return value (request)
0x00, # Transport size
0x0004, # Length (4 bytes for ID + Index)
szl_id, # SZL ID
szl_index, # SZL Index
)
# S7 header for USER_DATA
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def build_userdata_followup_request(self, group: int, subfunction: int, sequence_number: int) -> bytes:
"""
Build USERDATA follow-up request for multi-packet responses.
Args:
group: USERDATA group (e.g., 4 for SZL, 3 for block info)
subfunction: Subfunction code
sequence_number: Sequence number from the previous response
Returns:
Complete S7 PDU for follow-up request
"""
# Parameter section: same as initial but with DataRef = sequence_number
type_group = 0x40 | (group & 0x0F) # Type 4 (request) | group
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
type_group, # Type | Group
subfunction, # Subfunction
sequence_number, # DataRef from previous response
)
# Minimal data section for follow-up
data_section = struct.pack(
">BBH",
0x0A, # Return value (request)
0x00, # Transport size
0x0000, # Length (0 bytes)
)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def parse_read_szl_response(self, response: Dict[str, Any], first_fragment: bool = True) -> Dict[str, Any]:
"""
Parse read SZL response.
Args:
response: Parsed S7 response
first_fragment: If True (default), parse SZL header (ID+Index).
If False, treat all data as raw payload (follow-up fragments).
Returns:
Dictionary with SZL ID, Index, and data
"""
result: Dict[str, Any] = {
"szl_id": 0,
"szl_index": 0,
"data": b"",
}
data_info = response.get("data", {})
raw_data = data_info.get("data", b"")
if first_fragment:
if len(raw_data) < 4:
return result
# Parse SZL header: ID (2) + Index (2)
result["szl_id"] = struct.unpack(">H", raw_data[0:2])[0]
result["szl_index"] = struct.unpack(">H", raw_data[2:4])[0]
result["data"] = raw_data[4:]
else:
# Follow-up fragments don't include SZL header
result["data"] = raw_data
return result
[docs]
def build_get_clock_request(self) -> bytes:
"""
Build USER_DATA request for reading PLC clock.
Returns:
Complete S7 PDU for get clock request
"""
# Parameter section for USER_DATA clock request
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
0x47, # Type (4=request) | Group (7=grClock)
S7UserDataSubfunction.GET_CLOCK, # Subfunction (0x01)
0x00, # DataRef (0x00 for initial request)
)
# Data section: empty for get clock
data_section = struct.pack(
">BBH",
0x0A, # Return value (request)
0x00, # Transport size
0x0000, # Length (0 bytes)
)
# S7 header for USER_DATA
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def build_set_clock_request(self, dt: "datetime") -> bytes:
"""
Build USER_DATA request for setting PLC clock.
Args:
dt: Datetime to set
Returns:
Complete S7 PDU for set clock request
"""
# Convert datetime to BCD format
# BCD encoding: each decimal digit is stored in a nibble
def to_bcd(value: int) -> int:
return ((value // 10) << 4) | (value % 10)
year = dt.year % 100 # Only last 2 digits
bcd_time = struct.pack(
">BBBBBBBB",
0x00, # Reserved
to_bcd(year), # Year (BCD)
to_bcd(dt.month), # Month (BCD)
to_bcd(dt.day), # Day (BCD)
to_bcd(dt.hour), # Hour (BCD)
to_bcd(dt.minute), # Minute (BCD)
to_bcd(dt.second), # Second (BCD)
(dt.weekday() + 1) & 0x0F, # Day of week (1=Monday)
)
# Parameter section for USER_DATA clock request
param_data = struct.pack(
">BBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x04, # Length of following data
0x11, # Method (0x11 = request)
0x47, # Type (4=request) | Group (7=grClock)
S7UserDataSubfunction.SET_CLOCK, # Subfunction (0x02)
0x00, # DataRef (0x00 for initial request)
)
# Data section with BCD time
data_section = (
struct.pack(
">BBH",
0x0A, # Return value (request)
0x00, # Transport size
len(bcd_time), # Length
)
+ bcd_time
)
# S7 header for USER_DATA
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type (0x07)
0x0000, # Reserved
self._next_sequence(), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
[docs]
def parse_get_clock_response(self, response: Dict[str, Any]) -> "datetime":
"""
Parse get clock response.
Args:
response: Parsed S7 response
Returns:
Datetime from PLC
"""
from datetime import datetime as dt_class
data_info = response.get("data", {})
raw_data = data_info.get("data", b"")
if len(raw_data) < 8:
# Return current time if no valid data
return dt_class.now().replace(microsecond=0)
# Parse BCD time
def from_bcd(value: int) -> int:
return ((value >> 4) * 10) + (value & 0x0F)
# Skip first byte (reserved)
year = from_bcd(raw_data[1])
month = from_bcd(raw_data[2])
day = from_bcd(raw_data[3])
hour = from_bcd(raw_data[4])
minute = from_bcd(raw_data[5])
second = from_bcd(raw_data[6])
# Determine century (assume 2000s for years 0-99)
full_year = 2000 + year if year < 90 else 1900 + year
try:
return dt_class(full_year, month, day, hour, minute, second)
except ValueError:
return dt_class.now().replace(microsecond=0)
[docs]
def build_cpu_state_request(self) -> bytes:
"""
Build CPU state request.
Returns:
Complete S7 PDU for CPU state query
"""
# Simple CPU state request - in real S7 this would be a userdata function
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.REQUEST, # PDU type
0x0000, # Reserved
self._next_sequence(), # Sequence
0x0001, # Parameter length
0x0000, # Data length
)
# Use a custom function code for CPU state
parameters = struct.pack(">B", 0x04) # Use READ_AREA function for simplicity
return header + parameters
[docs]
def parse_response(self, pdu: bytes) -> Dict[str, Any]:
"""
Parse S7 response PDU.
Args:
pdu: Complete S7 PDU
Returns:
Parsed response data
"""
if len(pdu) < 10:
raise S7ProtocolError("PDU too short for S7 response header")
# First peek at PDU type to determine header size
pdu_type = pdu[1]
if pdu_type == S7PDUType.USERDATA:
# USERDATA PDUs have a 10-byte header (no error_class/error_code in header)
if len(pdu) < 10:
raise S7ProtocolError("PDU too short for USERDATA header")
header = struct.unpack(">BBHHHH", pdu[:10])
protocol_id, pdu_type, reserved, sequence, param_len, data_len = header
error_class = 0
error_code = 0
offset = 10
else:
# ACK/ACK_DATA PDUs have a 12-byte header (with error_class/error_code)
if len(pdu) < 12:
raise S7ProtocolError("PDU too short for ACK/ACK_DATA header")
header = struct.unpack(">BBHHHHBB", pdu[:12])
protocol_id, pdu_type, reserved, sequence, param_len, data_len, error_class, error_code = header
offset = 12
if protocol_id != 0x32:
raise S7ProtocolError(f"Invalid protocol ID: {protocol_id:#02x}")
# Accept ACK (write responses), ACK_DATA (read responses), and USERDATA response types
if pdu_type not in (S7PDUType.ACK, S7PDUType.ACK_DATA, S7PDUType.USERDATA):
raise S7ProtocolError(f"Expected response PDU, got {pdu_type}")
combined_error = (error_class << 8) | error_code
if error_class != 0:
error_msg = get_protocol_error_message(combined_error)
raise S7ProtocolError(
f"S7 protocol error (class={error_class:#04x}, code={error_code:#04x}): {error_msg}",
error_code=combined_error,
)
response = {
"sequence": sequence,
"param_length": param_len,
"data_length": data_len,
"parameters": None,
"data": None,
"error_code": combined_error,
}
# Parse parameters if present
if param_len > 0:
if offset + param_len > len(pdu):
raise S7ProtocolError("Parameter section extends beyond PDU")
param_data = pdu[offset : offset + param_len]
response["parameters"] = self._parse_parameters(param_data)
offset += param_len
# Parse data if present
if data_len > 0:
if offset + data_len > len(pdu):
raise S7ProtocolError("Data section extends beyond PDU")
data_section = pdu[offset : offset + data_len]
response["data"] = self._parse_data_section(data_section)
return response
def _parse_parameters(self, param_data: bytes) -> Dict[str, Any]:
"""Parse S7 parameter section."""
if len(param_data) < 1:
return {}
# Detect USERDATA response parameters:
# byte 0 = 0x00 (reserved), len >= 12, byte 2 = 0x12, byte 4 = 0x12 (method=response)
if param_data[0] == 0x00 and len(param_data) >= 12 and param_data[2] == 0x12 and param_data[4] == 0x12:
return self._parse_userdata_response_params(param_data)
function_code = param_data[0]
if function_code == S7Function.READ_AREA:
return self._parse_read_response_params(param_data)
elif function_code == S7Function.WRITE_AREA:
return self._parse_write_response_params(param_data)
elif function_code == S7Function.SETUP_COMMUNICATION:
return self._parse_setup_comm_response_params(param_data)
else:
return {"function_code": function_code}
def _parse_read_response_params(self, param_data: bytes) -> Dict[str, Any]:
"""Parse read area response parameters."""
if len(param_data) < 2:
raise S7ProtocolError("Read response parameters too short")
function_code = param_data[0]
item_count = param_data[1]
return {"function_code": function_code, "item_count": item_count}
def _parse_write_response_params(self, param_data: bytes) -> Dict[str, Any]:
"""Parse write area response parameters."""
if len(param_data) < 2:
raise S7ProtocolError("Write response parameters too short")
function_code = param_data[0]
item_count = param_data[1]
return {"function_code": function_code, "item_count": item_count}
def _parse_setup_comm_response_params(self, param_data: bytes) -> Dict[str, Any]:
"""Parse setup communication response parameters."""
if len(param_data) < 8:
raise S7ProtocolError("Setup communication response parameters too short")
function_code, reserved, max_amq_caller, max_amq_callee, pdu_length = struct.unpack(">BBHHH", param_data[:8])
return {
"function_code": function_code,
"max_amq_caller": max_amq_caller,
"max_amq_callee": max_amq_callee,
"pdu_length": pdu_length,
}
def _parse_userdata_response_params(self, param_data: bytes) -> Dict[str, Any]:
"""Parse USERDATA response parameter section (12 bytes).
Layout:
[0] Reserved (0x00)
[1] Parameter count (0x01)
[2] Type header (0x12)
[3] Length of following data (0x08 for response)
[4] Method (0x12 = response)
[5] Type (high nibble) | Group (low nibble)
[6] Subfunction
[7] Sequence number (used as DataRef in follow-up)
[8] Data unit reference
[9] Last data unit (0x00 = last, non-zero = more)
[10-11] Error code
"""
type_group = param_data[5]
group = type_group & 0x0F
subfunction = param_data[6]
sequence_number = param_data[7]
last_data_unit = param_data[9]
error_code = struct.unpack(">H", param_data[10:12])[0]
if error_code != 0:
error_msg = get_protocol_error_message(error_code)
logger.warning(f"USERDATA response error {error_code:#06x}: {error_msg}")
return {
"group": group,
"subfunction": subfunction,
"sequence_number": sequence_number,
"last_data_unit": last_data_unit,
"error_code": error_code,
}
def _parse_data_section(self, data_section: bytes) -> Dict[str, Any]:
"""Parse S7 data section."""
if len(data_section) == 1:
# Simple return code (for write responses)
return {"return_code": data_section[0], "transport_size": 0, "data_length": 0, "data": b""}
elif len(data_section) >= 4:
# Full data header
return_code = data_section[0]
transport_size = data_section[1]
data_length = struct.unpack(">H", data_section[2:4])[0]
# Extract actual data - length interpretation depends on transport_size
# Transport size 0x09 (octet string): byte length (USERDATA responses)
# Transport size 0x00: byte length (USERDATA requests)
# Transport size 0x04 (byte): bit length (READ_AREA responses)
if transport_size in (0x00, 0x09):
# USERDATA uses byte length directly
actual_data = data_section[4 : 4 + data_length]
else:
# READ_AREA responses use bit length
actual_data = data_section[4 : 4 + (data_length // 8)]
return {"return_code": return_code, "transport_size": transport_size, "data_length": data_length, "data": actual_data}
else:
return {"raw_data": data_section}
[docs]
def check_write_response(self, response: Dict[str, Any]) -> None:
"""
Check write operation response for errors.
Args:
response: Parsed S7 response
Raises:
S7ProtocolError: If write operation failed
"""
# First check for errors in the response header
# S7-1200/1500 returns error codes in the header for write failures
header_error = response.get("error_code", 0)
if header_error != 0:
error_msg = f"Write operation failed with S7 error code: {header_error:#06x}"
raise S7ProtocolError(error_msg)
# For successful writes, check the data section return code if present
if response.get("data"):
data_info = response["data"]
return_code = data_info.get("return_code", 0xFF) # Default to success
if return_code != 0xFF: # 0xFF = Success
desc = get_return_code_description(return_code)
raise S7ProtocolError(f"Write operation failed: {desc} (0x{return_code:02x})")
# If no data and no header error, the write was successful (ACK without data)