"""
S7 data types and conversion utilities.
Handles S7-specific data types, endianness conversion, and address encoding.
"""
import struct
from enum import IntEnum
from typing import List, NoReturn, Sequence, Tuple, Union
def _assert_never(value: NoReturn) -> NoReturn:
"""Exhaustive type check helper (equivalent to typing.assert_never for Python <3.11)."""
raise AssertionError(f"Unhandled value: {value!r}")
def _validate_bit_addr(bit_addr: int) -> None:
"""Validate that a bit address is in the valid range 0-7."""
if not 0 <= bit_addr <= 7:
raise ValueError(f"Bit address must be 0-7, got {bit_addr}")
[docs]
class S7Area(IntEnum):
"""S7 memory area identifiers."""
PE = 0x81 # Process Input (Peripheral Input)
PA = 0x82 # Process Output (Peripheral Output)
MK = 0x83 # Memory/Merkers (Flags)
DB = 0x84 # Data Blocks
CT = 0x1C # Counters
TM = 0x1D # Timers
[docs]
class S7WordLen(IntEnum):
"""S7 data word length identifiers."""
BIT = 0x01 # Single bit
BYTE = 0x02 # 8-bit byte
CHAR = 0x03 # 8-bit character
WORD = 0x04 # 16-bit word
INT = 0x05 # 16-bit signed integer
DWORD = 0x06 # 32-bit double word
DINT = 0x07 # 32-bit signed integer
REAL = 0x08 # 32-bit IEEE float
COUNTER = 0x1C # Counter value
TIMER = 0x1D # Timer value
[docs]
class S7DataTypes:
"""S7 data type conversion utilities."""
# Word length to byte size mapping
WORD_LEN_SIZE = {
S7WordLen.BIT: 1, # Bit operations use 1 byte
S7WordLen.BYTE: 1, # 1 byte
S7WordLen.CHAR: 1, # 1 byte
S7WordLen.WORD: 2, # 2 bytes
S7WordLen.INT: 2, # 2 bytes
S7WordLen.DWORD: 4, # 4 bytes
S7WordLen.DINT: 4, # 4 bytes
S7WordLen.REAL: 4, # 4 bytes
S7WordLen.COUNTER: 2, # 2 bytes
S7WordLen.TIMER: 2, # 2 bytes
}
[docs]
@staticmethod
def get_size_bytes(word_len: S7WordLen, count: int = 1) -> int:
"""Get total size in bytes for given word length and count."""
return S7DataTypes.WORD_LEN_SIZE[word_len] * count
[docs]
@staticmethod
def encode_address(area: S7Area, db_number: int, start: int, word_len: S7WordLen, count: int) -> bytes:
"""
Encode S7 address into parameter format.
Returns 12-byte parameter section for read/write operations.
"""
# Parameter format for read/write operations
# Byte 0: Specification type (0x12 for address specification)
# Byte 1: Length of following address specification (0x0A = 10 bytes)
# Byte 2: Syntax ID (0x10 = S7-Any)
# Byte 3: Transport size (word length)
# Bytes 4-5: Count (number of items)
# Bytes 6-7: DB number (for DB area) or 0
# Bytes 8: Area code
# Bytes 9-11: Start address (byte.bit format)
if start < 0:
raise ValueError(f"Start address must be non-negative, got {start}")
# Convert start address to byte.bit format
if word_len == S7WordLen.BIT:
# For bit access: byte address + bit offset
byte_addr = start // 8
bit_addr = start % 8
address = (byte_addr << 3) | bit_addr
else:
# For word access: convert to bit address
address = start * 8
address_bytes = struct.pack(">I", address)[1:] # 3-byte address (big-endian)
return struct.pack(
">BBBBHHB3s",
0x12, # Specification type
0x0A, # Length of address spec
0x10, # Syntax ID (S7-Any)
word_len, # Transport size
count, # Count
db_number if area == S7Area.DB else 0, # DB number
area, # Area code
address_bytes, # 3-byte address (big-endian)
)
[docs]
@staticmethod
def decode_s7_data(data: bytes, word_len: S7WordLen, count: int) -> List[Union[bool, int, float]]:
"""
Decode S7 data from bytes to Python values.
Handles Siemens big-endian byte order.
"""
values: List[Union[bool, int, float]] = []
offset = 0
for i in range(count):
if word_len == S7WordLen.BIT:
# Extract single bit
byte_val = data[offset]
values.append(bool(byte_val))
offset += 1
elif word_len == S7WordLen.BYTE or word_len == S7WordLen.CHAR:
# 8-bit values
values.append(data[offset])
offset += 1
elif word_len == S7WordLen.WORD or word_len == S7WordLen.COUNTER or word_len == S7WordLen.TIMER:
# 16-bit unsigned values (big-endian)
value = struct.unpack(">H", data[offset : offset + 2])[0]
values.append(value)
offset += 2
elif word_len == S7WordLen.INT:
# 16-bit signed values (big-endian)
value = struct.unpack(">h", data[offset : offset + 2])[0]
values.append(value)
offset += 2
elif word_len == S7WordLen.DWORD:
# 32-bit unsigned values (big-endian)
value = struct.unpack(">I", data[offset : offset + 4])[0]
values.append(value)
offset += 4
elif word_len == S7WordLen.DINT:
# 32-bit signed values (big-endian)
value = struct.unpack(">i", data[offset : offset + 4])[0]
values.append(value)
offset += 4
elif word_len == S7WordLen.REAL:
# 32-bit IEEE float (big-endian)
value = struct.unpack(">f", data[offset : offset + 4])[0]
values.append(value)
offset += 4
else:
_assert_never(word_len)
return values
[docs]
@staticmethod
def encode_s7_data(values: Sequence[Union[bool, int, float]], word_len: S7WordLen) -> bytes:
"""
Encode Python values to S7 data bytes.
Handles Siemens big-endian byte order.
"""
data = bytearray()
for value in values:
if word_len == S7WordLen.BIT:
# Single bit to byte
data.append(0x01 if value else 0x00)
elif word_len == S7WordLen.BYTE or word_len == S7WordLen.CHAR:
# 8-bit values
data.append(int(value) & 0xFF)
elif word_len == S7WordLen.WORD or word_len == S7WordLen.COUNTER or word_len == S7WordLen.TIMER:
# 16-bit unsigned values (big-endian)
data.extend(struct.pack(">H", int(value) & 0xFFFF))
elif word_len == S7WordLen.INT:
# 16-bit signed values (big-endian)
data.extend(struct.pack(">h", int(value)))
elif word_len == S7WordLen.DWORD:
# 32-bit unsigned values (big-endian)
data.extend(struct.pack(">I", int(value) & 0xFFFFFFFF))
elif word_len == S7WordLen.DINT:
# 32-bit signed values (big-endian)
data.extend(struct.pack(">i", int(value)))
elif word_len == S7WordLen.REAL:
# 32-bit IEEE float (big-endian)
data.extend(struct.pack(">f", float(value)))
else:
_assert_never(word_len)
return bytes(data)
[docs]
@staticmethod
def parse_address(address_str: str) -> Tuple[S7Area, int, int]:
"""
Parse S7 address string to area, DB number, and offset.
Examples:
- "DB1.DBX0.0" -> (DB, 1, 0)
- "M10.5" -> (MK, 0, 85) # bit 5 of byte 10 = bit 85
- "IW20" -> (PE, 0, 20)
"""
address_str = address_str.upper().strip()
# Data Block addresses: DB1.DBX0.0, DB1.DBW10, etc.
if address_str.startswith("DB"):
db_part, addr_part = address_str.split(".", 1)
db_number = int(db_part[2:])
if addr_part.startswith("DBX"):
# Bit address: DBX10.5
if "." in addr_part:
byte_addr, bit_addr = addr_part[3:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
else:
offset = int(addr_part[3:]) * 8
elif addr_part.startswith("DBB"):
# Byte address: DBB10
offset = int(addr_part[3:])
elif addr_part.startswith("DBW"):
# Word address: DBW10
offset = int(addr_part[3:])
elif addr_part.startswith("DBD"):
# Double word address: DBD10
offset = int(addr_part[3:])
else:
raise ValueError(f"Invalid DB address format: {address_str}")
return S7Area.DB, db_number, offset
# Memory/Flag addresses: M10.5, MW20, etc.
elif address_str.startswith("M"):
if "." in address_str:
# Bit address: M10.5
byte_addr, bit_addr = address_str[1:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
elif address_str.startswith("MW"):
# Word address: MW20
offset = int(address_str[2:])
elif address_str.startswith("MD"):
# Double word address: MD20
offset = int(address_str[2:])
else:
# Byte address: M10
offset = int(address_str[1:])
return S7Area.MK, 0, offset
# Input addresses: I0.0, IW10, etc.
elif address_str.startswith("I"):
if "." in address_str:
# Bit address: I0.0
byte_addr, bit_addr = address_str[1:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
elif address_str.startswith("IW"):
# Word address: IW10
offset = int(address_str[2:])
elif address_str.startswith("ID"):
# Double word address: ID10
offset = int(address_str[2:])
else:
# Byte address: I10
offset = int(address_str[1:])
return S7Area.PE, 0, offset
# Output addresses: Q0.0, QW10, etc.
elif address_str.startswith("Q"):
if "." in address_str:
# Bit address: Q0.0
byte_addr, bit_addr = address_str[1:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
elif address_str.startswith("QW"):
# Word address: QW10
offset = int(address_str[2:])
elif address_str.startswith("QD"):
# Double word address: QD10
offset = int(address_str[2:])
else:
# Byte address: Q10
offset = int(address_str[1:])
return S7Area.PA, 0, offset
else:
raise ValueError(f"Unsupported address format: {address_str}")