Source code for snap7.s7protocol

"""
S7 protocol implementation.

Handles S7 PDU encoding/decoding and protocol operations.
"""

import struct
import logging
from datetime import datetime
from typing import List, Dict, Any
from enum import IntEnum

from .datatypes import S7Area, S7WordLen, S7DataTypes
from .error import S7ProtocolError, S7StalePacketError, S7PacketLostError, get_protocol_error_message

logger = logging.getLogger(__name__)


[docs] class S7Function(IntEnum): """S7 protocol function codes.""" READ_AREA = 0x04 WRITE_AREA = 0x05 REQUEST_DOWNLOAD = 0x1A DOWNLOAD_BLOCK = 0x1B DOWNLOAD_ENDED = 0x1C START_UPLOAD = 0x1D UPLOAD = 0x1E END_UPLOAD = 0x1F PLC_CONTROL = 0x28 PLC_STOP = 0x29 SETUP_COMMUNICATION = 0xF0
[docs] class S7PDUType(IntEnum): """S7 PDU type codes.""" REQUEST = 0x01 ACK = 0x02 # Acknowledge without data (e.g., write responses) ACK_DATA = 0x03 # Acknowledge with data (e.g., read responses) USERDATA = 0x07
[docs] class S7UserDataGroup(IntEnum): """S7 USER_DATA type groups (from s7_types.h).""" PROGRAMMER = 0x01 # grProgrammer CYCLIC_DATA = 0x02 # grCyclicData BLOCK_INFO = 0x03 # grBlocksInfo SZL = 0x04 # grSZL SECURITY = 0x05 # grPassword TIME = 0x07 # grClock
[docs] class S7UserDataSubfunction(IntEnum): """S7 USER_DATA subfunctions.""" # Block info subfunctions LIST_ALL = 0x01 # SFun_ListAll LIST_BLOCKS_OF_TYPE = 0x02 # SFun_ListBoT BLOCK_INFO = 0x03 # SFun_BlkInfo # SZL subfunctions READ_SZL = 0x01 # SFun_ReadSZL SYSTEM_STATE = 0x02 # System state request # Clock subfunctions GET_CLOCK = 0x01 SET_CLOCK = 0x02
# S7 data section return codes with human-readable descriptions S7_RETURN_CODES: Dict[int, str] = { 0x00: "Reserved", 0x01: "Hardware error", 0x03: "Accessing the object not allowed", 0x05: "Invalid address", 0x06: "Data type not supported", 0x07: "Data type inconsistent", 0x0A: "Object does not exist", 0x10: "Invalid block type number", 0x11: "Block not found in storage medium", 0x12: "Block already exists", 0x13: "Block is protected", 0x14: "Block download without proper block first", 0x19: "Block download sequence error", 0x1A: "Insufficient working memory", 0x1B: "Insufficient load memory", 0x1C: "Not enough work retentive data (instance DBs)", 0x1D: "Interface error", 0x1E: "Delete block refused", 0x20: "Invalid parameter", 0x21: "PG resource error (max connections reached)", 0xFF: "Success", }
[docs] def get_return_code_description(return_code: int) -> str: """Get human-readable description for S7 return code.""" if return_code in S7_RETURN_CODES: return S7_RETURN_CODES[return_code] return "Unknown error"
[docs] class S7Protocol: """ S7 protocol implementation. Handles encoding and decoding of S7 PDUs for communication with Siemens PLCs. """ def __init__(self) -> None: self.sequence = 0 # Message sequence counter def _next_sequence(self) -> int: """Get next sequence number for S7 PDU.""" self.sequence = (self.sequence + 1) & 0xFFFF return self.sequence
[docs] def validate_pdu_reference(self, response_sequence: int) -> None: """Validate the PDU reference number from a response. Compares the response sequence number against the expected (current) sequence. Args: response_sequence: Sequence number from the response PDU. Raises: S7StalePacketError: If response is older than expected (stale). S7PacketLostError: If response is ahead of expected (packet loss). """ if response_sequence < self.sequence: raise S7StalePacketError(f"Stale packet: expected sequence {self.sequence}, got {response_sequence}") elif response_sequence > self.sequence: raise S7PacketLostError(f"Packet lost: expected sequence {self.sequence}, got {response_sequence}")
[docs] def build_read_request(self, area: S7Area, db_number: int, start: int, word_len: S7WordLen, count: int) -> bytes: """ Build S7 read request PDU. Args: area: Memory area to read from db_number: DB number (for DB area) start: Start address/offset word_len: Data word length count: Number of items to read Returns: Complete S7 PDU """ # S7 Header (12 bytes) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence 0x000E, # Parameter length (14 bytes) 0x0000, # Data length (no data for read) ) # Parameter section (14 bytes) parameters = struct.pack( ">BBB", S7Function.READ_AREA, # Function code 0x01, # Item count 0x12, # Variable specification ) # Add address specification address_spec = S7DataTypes.encode_address(area, db_number, start, word_len, count) parameters += address_spec[1:] # Skip first byte (already included as 0x12) return header + parameters
[docs] def build_write_request(self, area: S7Area, db_number: int, start: int, word_len: S7WordLen, data: bytes) -> bytes: """ Build S7 write request PDU. Args: area: Memory area to write to db_number: DB number (for DB area) start: Start address/offset word_len: Data word length data: Data to write Returns: Complete S7 PDU """ # Calculate count from data length item_size = S7DataTypes.get_size_bytes(word_len, 1) count = len(data) // item_size # Parameter length: function + item count + address spec param_len = 3 + 11 # 14 bytes total # Data length: transport size + data data_len = 4 + len(data) # Transport size (4 bytes) + actual data # S7 Header header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence param_len, # Parameter length data_len, # Data length ) # Parameter section parameters = struct.pack( ">BBB", S7Function.WRITE_AREA, # Function code 0x01, # Item count 0x12, # Variable specification ) # Add address specification address_spec = S7DataTypes.encode_address(area, db_number, start, word_len, count) parameters += address_spec[1:] # Skip first byte # Map word_len to data section transport size # Data section uses different transport size codes than address specification: # - 0x03 = BIT # - 0x04 = BYTE/WORD/DWORD (byte-oriented data) # - 0x05 = INT # - 0x06 = DINT # - 0x07 = REAL # - 0x09 = OCTET STRING transport_size_map = { S7WordLen.BIT: 0x03, S7WordLen.BYTE: 0x04, S7WordLen.CHAR: 0x04, S7WordLen.WORD: 0x04, S7WordLen.INT: 0x05, S7WordLen.DWORD: 0x04, S7WordLen.DINT: 0x06, S7WordLen.REAL: 0x07, S7WordLen.COUNTER: 0x04, S7WordLen.TIMER: 0x04, } transport_size = transport_size_map.get(word_len, 0x04) # Data section data_section = ( struct.pack( ">BBH", 0x00, # Reserved/Error transport_size, # Transport size (proper S7 data section format) len(data) * 8, # Bit length (data length in bits) ) + data ) return header + parameters + data_section
[docs] def build_setup_communication_request(self, max_amq_caller: int = 1, max_amq_callee: int = 1, pdu_length: int = 480) -> bytes: """ Build S7 setup communication request. This negotiates communication parameters with the PLC. """ header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence 0x0008, # Parameter length (8 bytes) 0x0000, # Data length ) parameters = struct.pack( ">BBHHH", S7Function.SETUP_COMMUNICATION, # Function code 0x00, # Reserved max_amq_caller, # Max AMQ caller max_amq_callee, # Max AMQ callee pdu_length, # PDU length ) return header + parameters
[docs] def build_plc_control_request(self, operation: str) -> bytes: """ Build PLC control request. Args: operation: Control operation ('stop', 'hot_start', 'cold_start') Returns: Complete S7 PDU for PLC control """ # Map operations to S7 control codes control_codes = { "stop": 0x29, # PLC_STOP "hot_start": 0x28, # PLC_CONTROL (warm restart) "cold_start": 0x28, # PLC_CONTROL (cold restart) } if operation not in control_codes: raise ValueError(f"Unknown PLC control operation: {operation}") function_code = control_codes[operation] # Build control-specific parameters if operation == "stop": # Simple stop command param_data = struct.pack(">B", function_code) else: # Start commands with restart type restart_type = 1 if operation == "hot_start" else 2 # 1=warm, 2=cold param_data = struct.pack(">BB", function_code, restart_type) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
[docs] def check_control_response(self, response: Dict[str, Any]) -> None: """ Check PLC control response for errors. Args: response: Parsed S7 response Raises: S7ProtocolError: If control operation failed """ # For now, just check that we got a response # In a full implementation, we would check specific error codes if response.get("error_code", 0) != 0: raise S7ProtocolError(f"PLC control failed with error: {response['error_code']}")
[docs] def build_compress_request(self) -> bytes: """ Build PLC control request for memory compression. Uses PI service "_MSZL" (compress memory). Returns: Complete S7 PDU for compress request """ # PI service command for compress pi_service = b"_MSZL" # Parameter section: function code + PI service # Format: func(1) + unknown(7) + pi_len(1) + pi_service param_data = ( struct.pack( ">BBBBBBBBB", S7Function.PLC_CONTROL, # 0x28 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved len(pi_service), # PI service length ) + pi_service ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
[docs] def build_copy_ram_to_rom_request(self) -> bytes: """ Build PLC control request for copying RAM to ROM. Uses PI service "_MSZL" with file system parameters. Returns: Complete S7 PDU for copy RAM to ROM request """ # PI service command for copy RAM to ROM # Uses EP parameter for target file system pi_service = b"_MSZL" file_id = b"P" # P = passive file system (ROM) # Parameter section with file system identifier param_data = ( struct.pack( ">BBBBBBBBB", S7Function.PLC_CONTROL, # 0x28 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved len(file_id), # File ID length len(pi_service), # PI service length ) + file_id + pi_service ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
# ======================================================================== # Block Transfer PDU Builders (Upload/Download) # ========================================================================
[docs] def build_start_upload_request(self, block_type: int, block_num: int) -> bytes: """ Build start upload request. Args: block_type: Block type code (0x38=OB, 0x41=DB, 0x42=SDB, 0x43=FC, 0x44=SFC, 0x45=FB, 0x46=SFB) block_num: Block number Returns: Complete S7 PDU for start upload request """ # Block address string: e.g., "0A00001P" for DB1 # Format: block_type (2 hex) + block_num (5 digits) + file_system (1 char) block_addr = f"{block_type:02X}{block_num:05d}A".encode("ascii") # Parameters: function + status + reserved + upload_id + block_addr_len + block_addr param_data = ( struct.pack( ">BBBIB", S7Function.START_UPLOAD, # Function code 0x00, # Status 0x00, # Reserved (error code) 0x00000000, # Upload ID (0 for start) len(block_addr), # Block address length ) + block_addr ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
[docs] def build_upload_request(self, upload_id: int) -> bytes: """ Build upload request to get block data. Args: upload_id: Upload ID from start upload response Returns: Complete S7 PDU for upload request """ param_data = struct.pack( ">BBBI", S7Function.UPLOAD, # Function code 0x00, # Status 0x00, # Reserved upload_id, # Upload ID ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
[docs] def build_end_upload_request(self, upload_id: int) -> bytes: """ Build end upload request. Args: upload_id: Upload ID from start upload response Returns: Complete S7 PDU for end upload request """ param_data = struct.pack( ">BBBI", S7Function.END_UPLOAD, # Function code 0x00, # Status 0x00, # Reserved upload_id, # Upload ID ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
[docs] def parse_start_upload_response(self, response: Dict[str, Any]) -> Dict[str, Any]: """ Parse start upload response. Returns: Dictionary with upload_id and block_length """ result = {"upload_id": 0, "block_length": 0} raw_params = response.get("raw_parameters", b"") if len(raw_params) >= 8: # Parse: function + status + reserved + upload_id result["upload_id"] = struct.unpack(">I", raw_params[4:8])[0] if len(raw_params) > 8: # Block length string follows len_field = raw_params[8] if len(raw_params) > 9 + len_field: length_str = raw_params[9 : 9 + len_field] try: result["block_length"] = int(length_str) except ValueError: pass return result
[docs] def parse_upload_response(self, response: Dict[str, Any]) -> bytes: """ Parse upload response and extract block data. Returns: Block data bytes """ data_info = response.get("data", {}) raw_data: bytes = data_info.get("data", b"") # Skip the data header if present (length + unknown bytes) if len(raw_data) > 2: return raw_data return b""
[docs] def build_download_request(self, block_type: int, block_num: int, block_data: bytes) -> bytes: """ Build request download request. Args: block_type: Block type code block_num: Block number block_data: Block data to download Returns: Complete S7 PDU for request download """ # Block address string block_addr = f"{block_type:02X}{block_num:05d}P".encode("ascii") # Block length as string length_str = f"{len(block_data):06d}".encode("ascii") # Parameters param_data = ( struct.pack( ">BBBBB", S7Function.REQUEST_DOWNLOAD, # Function code 0x00, # Status 0x00, # Reserved 0x00, # Reserved len(block_addr), # Block address length ) + block_addr + struct.pack(">B", len(length_str)) + length_str ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
[docs] def build_delete_block_request(self, block_type: int, block_num: int) -> bytes: """ Build delete block request. Uses PLC_CONTROL with PI service "_DELE" for block deletion. Args: block_type: Block type code block_num: Block number Returns: Complete S7 PDU for delete block request """ # PI service for delete pi_service = b"_DELE" # Block specification: type + number + filesystem block_spec = f"{block_type:02X}{block_num:05d}P".encode("ascii") # Parameter section param_data = ( struct.pack( ">BBBBBBBBB", S7Function.PLC_CONTROL, # 0x28 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved 0x00, # Reserved len(block_spec), # Block spec length len(pi_service), # PI service length 0x00, # Reserved ) + block_spec + pi_service ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length 0x0000, # Data length ) return header + param_data
# ======================================================================== # USER_DATA PDU Builders (Chunk 3 of protocol implementation) # ========================================================================
[docs] def build_list_blocks_request(self) -> bytes: """ Build USER_DATA request for listing all blocks. Returns: Complete S7 PDU for list blocks request """ # USER_DATA PDU format: # - S7 header (10 bytes) # - Parameter section (8 bytes for USER_DATA) # - Data section (4 bytes for list blocks) # Parameter section for USER_DATA request # Format: header + method + type|group + subfunction + seq param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) 0x43, # Type (4=request) | Group (3=grBlocksInfo) S7UserDataSubfunction.LIST_ALL, # Subfunction (0x01 = list all) 0x00, # DataRef (0x00 for initial request) ) # Data section: return code placeholder data_section = struct.pack( ">BBH", 0x0A, # Return value (request) 0x00, # Transport size 0x0000, # Length (0 for request) ) # S7 header for USER_DATA header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def build_list_blocks_of_type_request(self, block_type: int) -> bytes: """ Build USER_DATA request for listing blocks of a specific type. Args: block_type: Block type code (e.g., 0x41 for DB) Returns: Complete S7 PDU for list blocks of type request """ # Parameter section for USER_DATA request param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) 0x43, # Type (4=request) | Group (3=grBlocksInfo) S7UserDataSubfunction.LIST_BLOCKS_OF_TYPE, # Subfunction (0x02) 0x00, # DataRef (0x00 for initial request) ) # Data section: block type (0x30 prefix + type per Snap7 C format) data_section = struct.pack( ">BBHBBBB", 0x0A, # Return value (request) 0x00, # Transport size 0x0004, # Length (4 bytes) 0x30, # Block type indicator block_type, # Block type code 0x0A, # Trailing bytes per Snap7 C 0x00, ) # S7 header for USER_DATA header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def parse_list_blocks_response(self, response: Dict[str, Any]) -> Dict[str, int]: """ Parse list blocks response and extract block counts. Args: response: Parsed S7 response Returns: Dictionary mapping block type names to counts """ result = { "OBCount": 0, "FBCount": 0, "FCCount": 0, "SFBCount": 0, "SFCCount": 0, "DBCount": 0, "SDBCount": 0, } data_info = response.get("data", {}) raw_data = data_info.get("data", b"") if not raw_data: return result # Parse block entries (4 bytes each: 0x30 | type | count_hi | count_lo) # Block type codes type_to_name = { 0x38: "OBCount", # Organization Block 0x41: "DBCount", # Data Block 0x42: "SDBCount", # System Data Block 0x43: "FCCount", # Function 0x44: "SFCCount", # System Function 0x45: "FBCount", # Function Block 0x46: "SFBCount", # System Function Block } offset = 0 while offset + 4 <= len(raw_data): indicator = raw_data[offset] block_type = raw_data[offset + 1] count = struct.unpack(">H", raw_data[offset + 2 : offset + 4])[0] if indicator == 0x30 and block_type in type_to_name: result[type_to_name[block_type]] = count offset += 4 return result
[docs] def parse_list_blocks_of_type_response(self, response: Dict[str, Any]) -> List[int]: """ Parse list blocks of type response and extract block numbers. Args: response: Parsed S7 response Returns: List of block numbers """ result: List[int] = [] data_info = response.get("data", {}) raw_data = data_info.get("data", b"") if not raw_data: return result # Parse block entries (4 bytes each per TDataFunGetBotItem: # BlockNum(2) + Unknown(1) + BlockLang(1)) offset = 0 while offset + 4 <= len(raw_data): block_num = struct.unpack(">H", raw_data[offset : offset + 2])[0] result.append(block_num) offset += 4 return result
[docs] def build_get_block_info_request(self, block_type: int, block_num: int) -> bytes: """ Build USER_DATA request for getting block information. Args: block_type: Block type code (0x38=OB, 0x41=DB, 0x42=SDB, 0x43=FC, 0x44=SFC, 0x45=FB, 0x46=SFB) block_num: Block number Returns: Complete S7 PDU for get block info request """ # Parameter section for USER_DATA block info request param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) 0x43, # Type (4=request) | Group (3=grBlocksInfo) S7UserDataSubfunction.BLOCK_INFO, # Subfunction (0x03) 0x00, # DataRef (0x00 for initial request) ) # Data section: [0x30, type, 'A', ASCII_num(5)] per Snap7 C format # Block number is 5-digit zero-padded ASCII (e.g., 1 -> "00001") block_num_ascii = f"{block_num:05d}".encode("ascii") data_payload = struct.pack(">BB", 0x30, block_type) + b"A" + block_num_ascii data_section = ( struct.pack( ">BBH", 0x0A, # Return value (request) 0x00, # Transport size len(data_payload), # Length ) + data_payload ) # S7 header for USER_DATA header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def parse_get_block_info_response(self, response: Dict[str, Any]) -> Dict[str, Any]: """ Parse get block info response. Args: response: Parsed S7 response Returns: Dictionary with block info fields """ result: Dict[str, Any] = { "block_type": 0, "block_number": 0, "block_lang": 0, "block_flags": 0, "mc7_size": 0, "load_size": 0, "local_data": 0, "sbb_length": 0, "checksum": 0, "version": 0, "code_date": b"", "intf_date": b"", "author": b"", "family": b"", "header": b"", } data_info = response.get("data", {}) raw_data = data_info.get("data", b"") if len(raw_data) < 78: return result # Parse block info structure per TResDataBlockInfo layout result["block_type"] = raw_data[1] result["block_flags"] = raw_data[9] result["block_lang"] = raw_data[10] result["block_number"] = struct.unpack(">H", raw_data[12:14])[0] result["load_size"] = struct.unpack(">I", raw_data[14:18])[0] result["sbb_length"] = struct.unpack(">H", raw_data[34:36])[0] result["local_data"] = struct.unpack(">H", raw_data[38:40])[0] result["mc7_size"] = struct.unpack(">H", raw_data[40:42])[0] result["version"] = raw_data[66] result["checksum"] = struct.unpack(">H", raw_data[68:70])[0] # Dates (6 bytes each: ms(2) + days(2) + reserved(2)) result["code_date"] = raw_data[22:28] result["intf_date"] = raw_data[28:34] # Strings (8 bytes each) result["author"] = raw_data[42:50] result["family"] = raw_data[50:58] result["header"] = raw_data[58:66] return result
[docs] def build_read_szl_request(self, szl_id: int, szl_index: int) -> bytes: """ Build USER_DATA request for reading SZL (System Status List). Args: szl_id: SZL identifier szl_index: SZL index Returns: Complete S7 PDU for read SZL request """ # Parameter section for USER_DATA SZL request param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) 0x44, # Type (4=request) | Group (4=grSZL) S7UserDataSubfunction.READ_SZL, # Subfunction (0x01) 0x00, # DataRef (0x00 for initial request) ) # Data section: SZL ID and Index data_section = struct.pack( ">BBHHH", 0x0A, # Return value (request) 0x00, # Transport size 0x0004, # Length (4 bytes for ID + Index) szl_id, # SZL ID szl_index, # SZL Index ) # S7 header for USER_DATA header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def build_userdata_followup_request(self, group: int, subfunction: int, sequence_number: int) -> bytes: """ Build USERDATA follow-up request for multi-packet responses. Args: group: USERDATA group (e.g., 4 for SZL, 3 for block info) subfunction: Subfunction code sequence_number: Sequence number from the previous response Returns: Complete S7 PDU for follow-up request """ # Parameter section: same as initial but with DataRef = sequence_number type_group = 0x40 | (group & 0x0F) # Type 4 (request) | group param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) type_group, # Type | Group subfunction, # Subfunction sequence_number, # DataRef from previous response ) # Minimal data section for follow-up data_section = struct.pack( ">BBH", 0x0A, # Return value (request) 0x00, # Transport size 0x0000, # Length (0 bytes) ) header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def parse_read_szl_response(self, response: Dict[str, Any], first_fragment: bool = True) -> Dict[str, Any]: """ Parse read SZL response. Args: response: Parsed S7 response first_fragment: If True (default), parse SZL header (ID+Index). If False, treat all data as raw payload (follow-up fragments). Returns: Dictionary with SZL ID, Index, and data """ result: Dict[str, Any] = { "szl_id": 0, "szl_index": 0, "data": b"", } data_info = response.get("data", {}) raw_data = data_info.get("data", b"") if first_fragment: if len(raw_data) < 4: return result # Parse SZL header: ID (2) + Index (2) result["szl_id"] = struct.unpack(">H", raw_data[0:2])[0] result["szl_index"] = struct.unpack(">H", raw_data[2:4])[0] result["data"] = raw_data[4:] else: # Follow-up fragments don't include SZL header result["data"] = raw_data return result
[docs] def build_get_clock_request(self) -> bytes: """ Build USER_DATA request for reading PLC clock. Returns: Complete S7 PDU for get clock request """ # Parameter section for USER_DATA clock request param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) 0x47, # Type (4=request) | Group (7=grClock) S7UserDataSubfunction.GET_CLOCK, # Subfunction (0x01) 0x00, # DataRef (0x00 for initial request) ) # Data section: empty for get clock data_section = struct.pack( ">BBH", 0x0A, # Return value (request) 0x00, # Transport size 0x0000, # Length (0 bytes) ) # S7 header for USER_DATA header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def build_set_clock_request(self, dt: "datetime") -> bytes: """ Build USER_DATA request for setting PLC clock. Args: dt: Datetime to set Returns: Complete S7 PDU for set clock request """ # Convert datetime to BCD format # BCD encoding: each decimal digit is stored in a nibble def to_bcd(value: int) -> int: return ((value // 10) << 4) | (value % 10) year = dt.year % 100 # Only last 2 digits bcd_time = struct.pack( ">BBBBBBBB", 0x00, # Reserved to_bcd(year), # Year (BCD) to_bcd(dt.month), # Month (BCD) to_bcd(dt.day), # Day (BCD) to_bcd(dt.hour), # Hour (BCD) to_bcd(dt.minute), # Minute (BCD) to_bcd(dt.second), # Second (BCD) (dt.weekday() + 1) & 0x0F, # Day of week (1=Monday) ) # Parameter section for USER_DATA clock request param_data = struct.pack( ">BBBBBBBB", 0x00, # Reserved 0x01, # Parameter count 0x12, # Type/length header 0x04, # Length of following data 0x11, # Method (0x11 = request) 0x47, # Type (4=request) | Group (7=grClock) S7UserDataSubfunction.SET_CLOCK, # Subfunction (0x02) 0x00, # DataRef (0x00 for initial request) ) # Data section with BCD time data_section = ( struct.pack( ">BBH", 0x0A, # Return value (request) 0x00, # Transport size len(bcd_time), # Length ) + bcd_time ) # S7 header for USER_DATA header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.USERDATA, # PDU type (0x07) 0x0000, # Reserved self._next_sequence(), # Sequence len(param_data), # Parameter length len(data_section), # Data length ) return header + param_data + data_section
[docs] def parse_get_clock_response(self, response: Dict[str, Any]) -> "datetime": """ Parse get clock response. Args: response: Parsed S7 response Returns: Datetime from PLC """ from datetime import datetime as dt_class data_info = response.get("data", {}) raw_data = data_info.get("data", b"") if len(raw_data) < 8: # Return current time if no valid data return dt_class.now().replace(microsecond=0) # Parse BCD time def from_bcd(value: int) -> int: return ((value >> 4) * 10) + (value & 0x0F) # Skip first byte (reserved) year = from_bcd(raw_data[1]) month = from_bcd(raw_data[2]) day = from_bcd(raw_data[3]) hour = from_bcd(raw_data[4]) minute = from_bcd(raw_data[5]) second = from_bcd(raw_data[6]) # Determine century (assume 2000s for years 0-99) full_year = 2000 + year if year < 90 else 1900 + year try: return dt_class(full_year, month, day, hour, minute, second) except ValueError: return dt_class.now().replace(microsecond=0)
[docs] def build_cpu_state_request(self) -> bytes: """ Build CPU state request. Returns: Complete S7 PDU for CPU state query """ # Simple CPU state request - in real S7 this would be a userdata function header = struct.pack( ">BBHHHH", 0x32, # Protocol ID S7PDUType.REQUEST, # PDU type 0x0000, # Reserved self._next_sequence(), # Sequence 0x0001, # Parameter length 0x0000, # Data length ) # Use a custom function code for CPU state parameters = struct.pack(">B", 0x04) # Use READ_AREA function for simplicity return header + parameters
[docs] def extract_cpu_state(self, response: Dict[str, Any]) -> str: """ Extract CPU state from response. Args: response: Parsed S7 response Returns: CPU state string in S7CpuStatus format (e.g., 'S7CpuStatusRun') """ # Map internal states to S7 status format for API compatibility with master branch # The cpu_statuses dict in type.py uses: {0: "S7CpuStatusUnknown", 4: "S7CpuStatusStop", 8: "S7CpuStatusRun"} return "S7CpuStatusRun" # Default state for pure Python server
[docs] def parse_response(self, pdu: bytes) -> Dict[str, Any]: """ Parse S7 response PDU. Args: pdu: Complete S7 PDU Returns: Parsed response data """ if len(pdu) < 10: raise S7ProtocolError("PDU too short for S7 response header") # First peek at PDU type to determine header size pdu_type = pdu[1] if pdu_type == S7PDUType.USERDATA: # USERDATA PDUs have a 10-byte header (no error_class/error_code in header) if len(pdu) < 10: raise S7ProtocolError("PDU too short for USERDATA header") header = struct.unpack(">BBHHHH", pdu[:10]) protocol_id, pdu_type, reserved, sequence, param_len, data_len = header error_class = 0 error_code = 0 offset = 10 else: # ACK/ACK_DATA PDUs have a 12-byte header (with error_class/error_code) if len(pdu) < 12: raise S7ProtocolError("PDU too short for ACK/ACK_DATA header") header = struct.unpack(">BBHHHHBB", pdu[:12]) protocol_id, pdu_type, reserved, sequence, param_len, data_len, error_class, error_code = header offset = 12 if protocol_id != 0x32: raise S7ProtocolError(f"Invalid protocol ID: {protocol_id:#02x}") # Accept ACK (write responses), ACK_DATA (read responses), and USERDATA response types if pdu_type not in (S7PDUType.ACK, S7PDUType.ACK_DATA, S7PDUType.USERDATA): raise S7ProtocolError(f"Expected response PDU, got {pdu_type}") combined_error = (error_class << 8) | error_code if error_class != 0: error_msg = get_protocol_error_message(combined_error) raise S7ProtocolError( f"S7 protocol error (class={error_class:#04x}, code={error_code:#04x}): {error_msg}", error_code=combined_error, ) response = { "sequence": sequence, "param_length": param_len, "data_length": data_len, "parameters": None, "data": None, "error_code": combined_error, } # Parse parameters if present if param_len > 0: if offset + param_len > len(pdu): raise S7ProtocolError("Parameter section extends beyond PDU") param_data = pdu[offset : offset + param_len] response["parameters"] = self._parse_parameters(param_data) offset += param_len # Parse data if present if data_len > 0: if offset + data_len > len(pdu): raise S7ProtocolError("Data section extends beyond PDU") data_section = pdu[offset : offset + data_len] response["data"] = self._parse_data_section(data_section) return response
def _parse_parameters(self, param_data: bytes) -> Dict[str, Any]: """Parse S7 parameter section.""" if len(param_data) < 1: return {} # Detect USERDATA response parameters: # byte 0 = 0x00 (reserved), len >= 12, byte 2 = 0x12, byte 4 = 0x12 (method=response) if param_data[0] == 0x00 and len(param_data) >= 12 and param_data[2] == 0x12 and param_data[4] == 0x12: return self._parse_userdata_response_params(param_data) function_code = param_data[0] if function_code == S7Function.READ_AREA: return self._parse_read_response_params(param_data) elif function_code == S7Function.WRITE_AREA: return self._parse_write_response_params(param_data) elif function_code == S7Function.SETUP_COMMUNICATION: return self._parse_setup_comm_response_params(param_data) else: return {"function_code": function_code} def _parse_read_response_params(self, param_data: bytes) -> Dict[str, Any]: """Parse read area response parameters.""" if len(param_data) < 2: raise S7ProtocolError("Read response parameters too short") function_code = param_data[0] item_count = param_data[1] return {"function_code": function_code, "item_count": item_count} def _parse_write_response_params(self, param_data: bytes) -> Dict[str, Any]: """Parse write area response parameters.""" if len(param_data) < 2: raise S7ProtocolError("Write response parameters too short") function_code = param_data[0] item_count = param_data[1] return {"function_code": function_code, "item_count": item_count} def _parse_setup_comm_response_params(self, param_data: bytes) -> Dict[str, Any]: """Parse setup communication response parameters.""" if len(param_data) < 8: raise S7ProtocolError("Setup communication response parameters too short") function_code, reserved, max_amq_caller, max_amq_callee, pdu_length = struct.unpack(">BBHHH", param_data[:8]) return { "function_code": function_code, "max_amq_caller": max_amq_caller, "max_amq_callee": max_amq_callee, "pdu_length": pdu_length, } def _parse_userdata_response_params(self, param_data: bytes) -> Dict[str, Any]: """Parse USERDATA response parameter section (12 bytes). Layout: [0] Reserved (0x00) [1] Parameter count (0x01) [2] Type header (0x12) [3] Length of following data (0x08 for response) [4] Method (0x12 = response) [5] Type (high nibble) | Group (low nibble) [6] Subfunction [7] Sequence number (used as DataRef in follow-up) [8] Data unit reference [9] Last data unit (0x00 = last, non-zero = more) [10-11] Error code """ type_group = param_data[5] group = type_group & 0x0F subfunction = param_data[6] sequence_number = param_data[7] last_data_unit = param_data[9] error_code = struct.unpack(">H", param_data[10:12])[0] if error_code != 0: error_msg = get_protocol_error_message(error_code) logger.warning(f"USERDATA response error {error_code:#06x}: {error_msg}") return { "group": group, "subfunction": subfunction, "sequence_number": sequence_number, "last_data_unit": last_data_unit, "error_code": error_code, } def _parse_data_section(self, data_section: bytes) -> Dict[str, Any]: """Parse S7 data section.""" if len(data_section) == 1: # Simple return code (for write responses) return {"return_code": data_section[0], "transport_size": 0, "data_length": 0, "data": b""} elif len(data_section) >= 4: # Full data header return_code = data_section[0] transport_size = data_section[1] data_length = struct.unpack(">H", data_section[2:4])[0] # Extract actual data - length interpretation depends on transport_size # Transport size 0x09 (octet string): byte length (USERDATA responses) # Transport size 0x00: byte length (USERDATA requests) # Transport size 0x04 (byte): bit length (READ_AREA responses) if transport_size in (0x00, 0x09): # USERDATA uses byte length directly actual_data = data_section[4 : 4 + data_length] else: # READ_AREA responses use bit length actual_data = data_section[4 : 4 + (data_length // 8)] return {"return_code": return_code, "transport_size": transport_size, "data_length": data_length, "data": actual_data} else: return {"raw_data": data_section}
[docs] def extract_read_data(self, response: Dict[str, Any], word_len: S7WordLen, count: int) -> List[Any]: """ Extract and decode data from read response. Args: response: Parsed S7 response word_len: Expected data word length count: Expected number of items Returns: List of decoded values """ if not response.get("data"): raise S7ProtocolError("No data in response") data_info = response["data"] return_code = data_info.get("return_code", 0) if return_code != 0xFF: # 0xFF = Success desc = get_return_code_description(return_code) raise S7ProtocolError(f"Read operation failed: {desc} (0x{return_code:02x})") raw_data = data_info.get("data", b"") # Return raw bytes directly - caller handles type conversion return list(raw_data)
[docs] def check_write_response(self, response: Dict[str, Any]) -> None: """ Check write operation response for errors. Args: response: Parsed S7 response Raises: S7ProtocolError: If write operation failed """ # First check for errors in the response header # S7-1200/1500 returns error codes in the header for write failures header_error = response.get("error_code", 0) if header_error != 0: error_msg = f"Write operation failed with S7 error code: {header_error:#06x}" raise S7ProtocolError(error_msg) # For successful writes, check the data section return code if present if response.get("data"): data_info = response["data"] return_code = data_info.get("return_code", 0xFF) # Default to success if return_code != 0xFF: # 0xFF = Success desc = get_return_code_description(return_code) raise S7ProtocolError(f"Write operation failed: {desc} (0x{return_code:02x})")
# If no data and no header error, the write was successful (ACK without data)