"""
Pure Python S7 server implementation.
Provides a complete S7 server emulator without dependencies on the Snap7 C library.
"""
import socket
import struct
import threading
import time
import logging
from typing import Dict, Optional, List, Callable, Any, Tuple, Type, Union
from types import TracebackType
from enum import IntEnum
from ctypes import Array, c_char
from ..s7protocol import S7Protocol, S7Function, S7PDUType, S7UserDataGroup, S7UserDataSubfunction
from ..datatypes import S7Area, S7WordLen
from ..error import S7ConnectionError, S7ProtocolError
from ..type import SrvArea, SrvEvent, Parameter
logger = logging.getLogger(__name__)
[docs]
class ServerState(IntEnum):
"""S7 server states."""
STOPPED = 0
RUNNING = 1
ERROR = 2
[docs]
class CPUState(IntEnum):
"""S7 CPU states."""
UNKNOWN = 0
RUN = 8
STOP = 4
[docs]
class Server:
"""
Pure Python S7 server implementation.
Emulates a Siemens S7 PLC for testing and development purposes.
Examples:
>>> import snap7
>>> server = snap7.Server()
>>> server.start()
>>> # ... register areas and handle clients
>>> server.stop()
"""
[docs]
def __init__(self, log: bool = True, **kwargs: object) -> None:
"""
Initialize S7 server.
Args:
log: Enable event logging
**kwargs: Ignored. Kept for backwards compatibility.
"""
self.server_socket: Optional[socket.socket] = None
self.server_thread: Optional[threading.Thread] = None
self.running = False
self.port = 102
self.host = "0.0.0.0"
# Server state
self.state = ServerState.STOPPED
self.cpu_state = CPUState.STOP
self.client_count = 0
# Memory areas
self.memory_areas: Dict[Tuple[S7Area, int], bytearray] = {}
self.area_locks: Dict[Tuple[S7Area, int], threading.Lock] = {}
# Protocol handler
self.protocol = S7Protocol()
# Event callbacks
self.event_callback: Optional[Callable[[SrvEvent], None]] = None
self.read_callback: Optional[Callable[[SrvEvent], None]] = None
# Client connections
self.clients: List[threading.Thread] = []
self.client_lock = threading.Lock()
# Event queue for pick_event
self._event_queue: List[SrvEvent] = []
# Logging
self._log_enabled = log
if log:
self._set_log_callback()
logger.info("S7Server initialized (pure Python implementation)")
[docs]
def create(self) -> None:
"""Create the server (no-op for compatibility)."""
pass
[docs]
def destroy(self) -> None:
"""Destroy the server."""
self.stop()
[docs]
def start(self, tcp_port: int = 102) -> int:
"""
Start the S7 server.
Args:
tcp_port: TCP port to listen on
Returns:
0 on success
"""
if self.running:
raise S7ConnectionError("Server is already running")
self.port = tcp_port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Try to use SO_REUSEPORT if available (Linux, macOS) for faster port reuse
if hasattr(socket, "SO_REUSEPORT"):
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
self.state = ServerState.RUNNING
self.cpu_state = CPUState.RUN
# Start server thread
self.server_thread = threading.Thread(target=self._server_loop, daemon=True)
self.server_thread.start()
# Add startup event to queue
startup_event = SrvEvent()
startup_event.EvtCode = 0x00010000 # Server started
self._event_queue.append(startup_event)
logger.info(f"S7 Server started on {self.host}:{self.port}")
return 0
except Exception as e:
self.running = False
self.state = ServerState.ERROR
if self.server_socket:
self.server_socket.close()
self.server_socket = None
raise S7ConnectionError(f"Failed to start server: {e}")
[docs]
def stop(self) -> int:
"""
Stop the S7 server.
Returns:
0 on success
"""
if not self.running:
return 0
self.running = False
self.state = ServerState.STOPPED
self.cpu_state = CPUState.STOP
# Close server socket
if self.server_socket:
self.server_socket.close()
self.server_socket = None
# Wait for server thread to finish
if self.server_thread and self.server_thread.is_alive():
self.server_thread.join(timeout=5.0)
# Close all client connections
with self.client_lock:
for client_thread in self.clients[:]:
if client_thread.is_alive():
client_thread.join(timeout=1.0)
self.clients.clear()
self.client_count = 0
logger.info("S7 Server stopped")
return 0
[docs]
def register_area(self, area: SrvArea, index: int, userdata: Union[bytearray, "Array[c_char]"]) -> int:
"""
Register a memory area with the server.
Args:
area: Memory area type
index: Area index/number
userdata: Initial data for the area (bytearray or ctypes array)
Returns:
0 on success
"""
# Map SrvArea to S7Area
area_mapping = {
SrvArea.PE: S7Area.PE,
SrvArea.PA: S7Area.PA,
SrvArea.MK: S7Area.MK,
SrvArea.DB: S7Area.DB,
SrvArea.CT: S7Area.CT,
SrvArea.TM: S7Area.TM,
}
s7_area = area_mapping.get(area)
if s7_area is None:
raise ValueError(f"Unsupported area: {area}")
# Convert ctypes array to bytearray if needed
if isinstance(userdata, bytearray):
data = userdata
else:
data = bytearray(userdata)
area_key = (s7_area, index)
self.memory_areas[area_key] = data
self.area_locks[area_key] = threading.Lock()
logger.info(f"Registered area {area.name} index {index}, size {len(data)}")
return 0
[docs]
def unregister_area(self, area: SrvArea, index: int) -> int:
"""
Unregister a memory area.
Args:
area: Memory area type
index: Area index
Returns:
0 on success
"""
area_mapping = {
SrvArea.PE: S7Area.PE,
SrvArea.PA: S7Area.PA,
SrvArea.MK: S7Area.MK,
SrvArea.DB: S7Area.DB,
SrvArea.CT: S7Area.CT,
SrvArea.TM: S7Area.TM,
}
s7_area = area_mapping.get(area)
if s7_area is None:
return 0
area_key = (s7_area, index)
if area_key in self.memory_areas:
del self.memory_areas[area_key]
del self.area_locks[area_key]
logger.info(f"Unregistered area {area.name} index {index}")
return 0
[docs]
def lock_area(self, area: SrvArea, index: int) -> int:
"""
Lock a memory area.
Args:
area: Memory area type
index: Area index
Returns:
0 on success
Raises:
RuntimeError: If area is not registered
"""
area_mapping = {
SrvArea.PE: S7Area.PE,
SrvArea.PA: S7Area.PA,
SrvArea.MK: S7Area.MK,
SrvArea.DB: S7Area.DB,
SrvArea.CT: S7Area.CT,
SrvArea.TM: S7Area.TM,
}
s7_area = area_mapping.get(area)
if s7_area is None:
raise RuntimeError(f"Invalid area: {area}")
area_key = (s7_area, index)
if area_key not in self.area_locks:
raise RuntimeError(f"Area {area.name} index {index} not registered")
self.area_locks[area_key].acquire()
return 0
[docs]
def unlock_area(self, area: SrvArea, index: int) -> int:
"""
Unlock a memory area.
Args:
area: Memory area type
index: Area index
Returns:
0 on success
"""
area_mapping = {
SrvArea.PE: S7Area.PE,
SrvArea.PA: S7Area.PA,
SrvArea.MK: S7Area.MK,
SrvArea.DB: S7Area.DB,
SrvArea.CT: S7Area.CT,
SrvArea.TM: S7Area.TM,
}
s7_area = area_mapping.get(area)
if s7_area is None:
return 1
area_key = (s7_area, index)
if area_key in self.area_locks:
try:
self.area_locks[area_key].release()
except RuntimeError:
pass # Lock not held
return 0
[docs]
def get_status(self) -> Tuple[str, str, int]:
"""
Get server status.
Returns:
Tuple of (server_status, cpu_status, client_count)
"""
server_status_names = {ServerState.STOPPED: "Stopped", ServerState.RUNNING: "Running", ServerState.ERROR: "Error"}
cpu_status_names = {CPUState.UNKNOWN: "Unknown", CPUState.RUN: "Run", CPUState.STOP: "Stop"}
return (
server_status_names.get(self.state, "Unknown"),
cpu_status_names.get(self.cpu_state, "Unknown"),
self.client_count,
)
[docs]
def set_events_callback(self, callback: Callable[[SrvEvent], Any]) -> int:
"""
Set callback for server events.
Args:
callback: Event callback function
Returns:
0 on success
"""
self.event_callback = callback
logger.info("Event callback set")
return 0
[docs]
def set_read_events_callback(self, callback: Callable[[SrvEvent], Any]) -> int:
"""
Set callback for read events.
Args:
callback: Read event callback function
Returns:
0 on success
"""
self.read_callback = callback
logger.info("Read event callback set")
return 0
[docs]
def set_rw_area_callback(self, callback: Callable[[Any], int]) -> int:
"""
Set callback for read/write area operations.
This is a stub for API compatibility with the C library's Srv_SetRWAreaCallback.
In the native implementation, read/write operations are handled directly.
Args:
callback: RW area callback function
Returns:
0 on success
"""
logger.debug("set_rw_area_callback called (stub for API compatibility)")
return 0
[docs]
def event_text(self, event: SrvEvent) -> str:
"""
Get event text description.
Args:
event: Server event
Returns:
Event description string
"""
event_texts = {
0x00004000: "Read operation completed",
0x00004001: "Write operation completed",
0x00008000: "Client connected",
0x00008001: "Client disconnected",
}
return event_texts.get(event.EvtCode, f"Event code: {event.EvtCode:#08x}")
[docs]
def get_mask(self, mask_kind: int) -> int:
"""
Get event mask.
Args:
mask_kind: Mask type (0=Event, 1=Log)
Returns:
Event mask value
"""
if mask_kind == 0: # mkEvent
return 0xFFFFFFFF
elif mask_kind == 1: # mkLog
return 0xFFFFFFFF
else:
raise ValueError(f"Invalid mask kind: {mask_kind}")
[docs]
def set_mask(self, kind: int = 0, mask: int = 0) -> int:
"""
Set event mask.
Args:
kind: Mask type (0=Event, 1=Log)
mask: Mask value
Returns:
0 on success
"""
logger.debug(f"Set mask {kind} = {mask:#08x}")
return 0
[docs]
def set_param(self, param: Parameter, value: int) -> int:
"""
Set server parameter.
Args:
param: Parameter type
value: Parameter value
Returns:
0 on success
"""
if param == Parameter.LocalPort:
self.port = value
logger.debug(f"Set parameter {param} = {value}")
return 0
[docs]
def get_param(self, param: Parameter) -> int:
"""
Get server parameter.
Args:
param: Parameter type
Returns:
Parameter value
Raises:
RuntimeError: If parameter is not valid for server
"""
# Client-only parameters should raise exception
client_only = [
Parameter.RemotePort,
Parameter.PingTimeout,
Parameter.SendTimeout,
Parameter.RecvTimeout,
Parameter.SrcRef,
Parameter.DstRef,
Parameter.SrcTSap,
Parameter.PDURequest,
]
if param in client_only:
raise RuntimeError(f"Parameter {param} not valid for server")
param_values = {
Parameter.LocalPort: self.port,
Parameter.WorkInterval: 100,
Parameter.MaxClients: 1024,
}
return param_values.get(param, 0)
[docs]
def start_to(self, ip: str, tcp_port: int = 102) -> int:
"""
Start server on a specific interface.
Args:
ip: IP address to bind to
tcp_port: TCP port to listen on
Returns:
0 on success
"""
# Validate IP address
try:
socket.inet_aton(ip)
except socket.error:
raise ValueError(f"Invalid IP address: {ip}")
# If already running, stop first
if self.running:
self.stop()
self.host = ip
return self.start(tcp_port if tcp_port != 102 else self.port)
[docs]
def set_cpu_status(self, status: int) -> int:
"""
Set CPU status.
Args:
status: CPU status code (0=Unknown, 4=Stop, 8=Run)
Returns:
0 on success
Raises:
ValueError: If status is invalid
"""
if status not in [0, 4, 8]:
raise ValueError(f"Invalid CPU status: {status}")
if status == 8: # RUN
self.cpu_state = CPUState.RUN
elif status == 4: # STOP
self.cpu_state = CPUState.STOP
else:
self.cpu_state = CPUState.UNKNOWN
return 0
[docs]
def pick_event(self) -> Union[SrvEvent, bool]:
"""
Pick an event from the queue.
Returns:
Server event if available, False if no events
"""
if self._event_queue:
return self._event_queue.pop(0)
return False
[docs]
def clear_events(self) -> int:
"""
Clear event queue.
Returns:
0 on success
"""
self._event_queue.clear()
return 0
def _set_log_callback(self) -> None:
"""Set up default logging callback."""
def log_callback(event: SrvEvent) -> None:
event_text = self.event_text(event)
logger.info(f"Server event: {event_text}")
self.set_events_callback(log_callback)
def _server_loop(self) -> None:
"""Main server loop to accept client connections."""
try:
while self.running and self.server_socket:
try:
self.server_socket.settimeout(0.1) # Short timeout for responsive shutdown
client_socket, address = self.server_socket.accept()
logger.info(f"Client connected from {address}")
# Start client handler thread
client_thread = threading.Thread(target=self._handle_client, args=(client_socket, address), daemon=True)
with self.client_lock:
self.clients.append(client_thread)
self.client_count += 1
client_thread.start()
except socket.timeout:
continue # Check running flag again
except OSError:
if self.running: # Only log if we're supposed to be running
logger.warning("Server socket error in accept loop")
break
except Exception as e:
logger.error(f"Server loop error: {e}")
finally:
self.running = False
self.state = ServerState.STOPPED
def _handle_client(self, client_socket: socket.socket, address: Tuple[str, int]) -> None:
"""Handle a single client connection."""
try:
# Create ISO connection wrapper and establish connection
connection = ServerISOConnection(client_socket)
# Handle ISO connection setup
if not connection.accept_connection():
logger.warning(f"Failed to establish ISO connection with {address}")
return
logger.info(f"ISO connection established with {address}")
while self.running:
try:
# Receive S7 request
request_data = connection.receive_data()
# Process request and generate response
response_data = self._process_request(request_data, address)
# Send response
if response_data:
connection.send_data(response_data)
except socket.timeout:
continue
except (ConnectionResetError, ConnectionAbortedError):
logger.info(f"Client {address} disconnected")
break
except Exception as e:
logger.error(f"Error handling client {address}: {e}")
break
except Exception as e:
logger.error(f"Client handler error for {address}: {e}")
finally:
try:
client_socket.close()
except OSError:
pass
with self.client_lock:
current_thread = threading.current_thread()
if current_thread in self.clients:
self.clients.remove(current_thread)
self.client_count = max(0, self.client_count - 1)
logger.info(f"Client {address} handler finished")
def _process_request(self, request_data: bytes, client_address: Tuple[str, int]) -> Optional[bytes]:
"""
Process an S7 request and generate response.
Args:
request_data: Raw S7 PDU data
client_address: Client address for logging
Returns:
Response PDU data or None
"""
try:
# Parse S7 request
request = self._parse_request(request_data)
# Check PDU type first
pdu_type = request.get("pdu_type", S7PDUType.REQUEST)
if pdu_type == S7PDUType.USERDATA:
# Handle USER_DATA PDU (block info, SZL, clock, etc.)
return self._handle_userdata(request, client_address)
# Handle REQUEST PDU (read/write areas, setup, control)
# Extract function code from parameters
if not request.get("parameters"):
return None
params = request["parameters"]
function_code = params.get("function_code")
if function_code == S7Function.SETUP_COMMUNICATION:
return self._handle_setup_communication(request)
elif function_code == S7Function.READ_AREA:
return self._handle_read_area(request, client_address)
elif function_code == S7Function.WRITE_AREA:
return self._handle_write_area(request, client_address)
elif function_code == S7Function.PLC_CONTROL:
return self._handle_plc_control(request, client_address)
elif function_code == S7Function.PLC_STOP:
return self._handle_plc_stop(request, client_address)
elif function_code == S7Function.START_UPLOAD:
return self._handle_start_upload(request, client_address)
elif function_code == S7Function.UPLOAD:
return self._handle_upload(request, client_address)
elif function_code == S7Function.END_UPLOAD:
return self._handle_end_upload(request, client_address)
elif function_code == S7Function.REQUEST_DOWNLOAD:
return self._handle_request_download(request, client_address)
elif function_code == S7Function.DOWNLOAD_BLOCK:
return self._handle_download_block(request, client_address)
elif function_code == S7Function.DOWNLOAD_ENDED:
return self._handle_download_ended(request, client_address)
else:
logger.warning(f"Unsupported function code: {function_code}")
return self._build_error_response(request, 0x8001) # Function not supported
except Exception as e:
logger.error(f"Error processing request: {e}")
return None
def _handle_setup_communication(self, request: Dict[str, Any]) -> bytes:
"""Handle setup communication request."""
params = request["parameters"]
pdu_length = params.get("pdu_length", 480)
# Build response with error bytes
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence (echo)
0x0008, # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
parameters = struct.pack(
">BBHHH",
S7Function.SETUP_COMMUNICATION, # Function code
0x00, # Reserved
1, # Max AMQ caller
1, # Max AMQ callee
min(pdu_length, 480), # PDU length (limited)
)
return header + parameters
def _handle_read_area(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""Handle read area request."""
try:
# Parse address specification from request parameters
addr_info = self._parse_read_address(request)
if not addr_info:
return self._build_error_response(request, 0x8001) # Invalid address
area, db_number, start, count = addr_info
# Read data from registered memory area
read_data = self._read_from_memory_area(area, db_number, start, count)
if read_data is None:
return self._build_error_response(request, 0x8404) # Area not found
# Calculate data length - need to include transport header + data
data_len = 4 + len(read_data) # Transport header (4 bytes) + data
# Build successful response
# S7 response header includes error class + error code
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence (echo)
0x0002, # Parameter length
data_len, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
# Parameters
parameters = struct.pack(
">BB",
S7Function.READ_AREA, # Function code
0x01, # Item count
)
# Data section
data_section = (
struct.pack(
">BBH",
0xFF, # Return code (success)
0x04, # Transport size (04 = byte data)
len(read_data) * 8, # Data length in bits
)
+ read_data
)
# Trigger read event callback
if self.read_callback:
event = SrvEvent()
event.EvtTime = int(time.time())
event.EvtSender = 0
event.EvtCode = 0x00004000 # Read event
event.EvtRetCode = 0
event.EvtParam1 = 1 # Area
event.EvtParam2 = 0 # Offset
event.EvtParam3 = len(read_data) # Size
event.EvtParam4 = 0
try:
self.read_callback(event)
except Exception as e:
logger.error(f"Error in read callback: {e}")
return header + parameters + data_section
except Exception as e:
logger.error(f"Error handling read request: {e}")
return self._build_error_response(request, 0x8000)
def _parse_read_address(self, request: Dict[str, Any]) -> Optional[Tuple[S7Area, int, int, int]]:
"""
Parse read address from request parameters.
Returns:
Tuple of (area, db_number, start, byte_count) or None if invalid
"""
try:
params = request.get("parameters", {})
if params.get("function_code") != S7Function.READ_AREA:
return None
# Check if we have parsed address specification
addr_spec = params.get("address_spec", {})
if addr_spec:
area = addr_spec.get("area", S7Area.DB)
db_number = addr_spec.get("db_number", 1)
start = addr_spec.get("start", 0)
count = addr_spec.get("count", 4)
word_len = addr_spec.get("word_len", S7WordLen.BYTE)
# Convert count to bytes based on word length
if word_len in [S7WordLen.TIMER, S7WordLen.COUNTER, S7WordLen.WORD]:
byte_count = count * 2 # 16-bit items
elif word_len in [S7WordLen.DWORD, S7WordLen.REAL]:
byte_count = count * 4 # 32-bit items
elif word_len == S7WordLen.BIT:
byte_count = 1 # Single bit needs at least 1 byte
else:
byte_count = count # Bytes
logger.debug(
f"Parsed address: area={area}, db={db_number}, start={start}, count={count}, word_len={word_len}, byte_count={byte_count}"
)
return (area, db_number, start, byte_count)
# Fallback to defaults if parsing failed
logger.warning("Using default address values - address parsing may have failed")
return (S7Area.DB, 1, 0, 4)
except Exception as e:
logger.error(f"Error parsing read address: {e}")
return None
def _read_from_memory_area(self, area: S7Area, db_number: int, start: int, count: int) -> Optional[bytearray]:
"""
Read data from registered memory area.
Args:
area: Memory area to read from
db_number: DB number (for DB areas)
start: Start offset
count: Number of bytes to read
Returns:
Data read from memory area or None if area not found
"""
try:
area_key = (area, db_number)
if area_key not in self.memory_areas:
logger.warning(f"Memory area {area}#{db_number} not registered")
# Return dummy data if area not found (for compatibility)
return bytearray([0x42, 0xFF, 0x12, 0x34])[:count]
# Get area data with thread safety
with self.area_locks[area_key]:
area_data = self.memory_areas[area_key]
# Check bounds
if start >= len(area_data):
logger.warning(f"Start address {start} beyond area size {len(area_data)}")
return bytearray([0x00] * count)
# Read requested data, padding with zeros if needed
end = min(start + count, len(area_data))
read_data = bytearray(area_data[start:end])
# Pad with zeros if we didn't read enough
if len(read_data) < count:
read_data.extend([0x00] * (count - len(read_data)))
logger.debug(f"Read {len(read_data)} bytes from {area}#{db_number} at offset {start}")
return read_data
except Exception as e:
logger.error(f"Error reading from memory area: {e}")
return bytearray([0x00] * count)
def _handle_write_area(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""Handle write area request."""
try:
# Parse address specification from request parameters
addr_info = self._parse_write_address(request)
if not addr_info:
return self._build_error_response(request, 0x8001) # Invalid address
area, db_number, start, count, write_data = addr_info
# Write data to registered memory area
success = self._write_to_memory_area(area, db_number, start, write_data)
if not success:
return self._build_error_response(request, 0x8404) # Area not found or write error
# Build successful response with error bytes
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence (echo)
0x0002, # Parameter length
0x0001, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
# Parameters
parameters = struct.pack(
">BB",
S7Function.WRITE_AREA, # Function code
0x01, # Item count
)
# Data section (write response)
data_section = b"\xff" # Success return code
return header + parameters + data_section
except Exception as e:
logger.error(f"Error handling write request: {e}")
return self._build_error_response(request, 0x8000)
def _handle_plc_control(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""Handle PLC control request (start, compress, copy_ram_to_rom)."""
try:
params = request.get("parameters", {})
pi_service = params.get("pi_service", b"")
# Check for PI service operations
if pi_service == b"_MSZL":
file_id = params.get("file_id", b"")
if file_id == b"P":
# Copy RAM to ROM
logger.info(f"Copy RAM to ROM requested from {client_address}")
else:
# Compress memory
logger.info(f"Compress memory requested from {client_address}")
elif len(params) >= 2:
# Has restart type parameter - start operation
restart_type = params.get("restart_type", 1)
if restart_type == 1:
logger.info("PLC Hot Start requested")
else:
logger.info("PLC Cold Start requested")
# Set CPU to running state
self.cpu_state = CPUState.RUN
else:
logger.info("PLC Start requested")
self.cpu_state = CPUState.RUN
# Build successful response
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence (echo)
0x0001, # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
parameters = struct.pack(">B", S7Function.PLC_CONTROL)
return header + parameters
except Exception as e:
logger.error(f"Error handling PLC control request: {e}")
return self._build_error_response(request, 0x8000)
def _handle_plc_stop(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""Handle PLC stop request."""
try:
logger.info("PLC Stop requested")
# Set CPU to stopped state
self.cpu_state = CPUState.STOP
# Build successful response with error bytes
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence (echo)
0x0001, # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
parameters = struct.pack(">B", S7Function.PLC_STOP)
return header + parameters
except Exception as e:
logger.error(f"Error handling PLC stop request: {e}")
return self._build_error_response(request, 0x8000)
def _parse_write_address(self, request: Dict[str, Any]) -> Optional[Tuple[S7Area, int, int, int, bytearray]]:
"""
Parse write address from request parameters and data.
Returns:
Tuple of (area, db_number, start, count, write_data) or None if invalid
"""
try:
params = request.get("parameters", {})
if params.get("function_code") != S7Function.WRITE_AREA:
return None
# Check if we have parsed address specification
addr_spec = params.get("address_spec", {})
if not addr_spec:
logger.warning("No address specification in write request")
return None
area = addr_spec.get("area", S7Area.DB)
db_number = addr_spec.get("db_number", 1)
start = addr_spec.get("start", 0)
count = addr_spec.get("count", 0)
# Extract write data from request data section
data_info = request.get("data", {})
write_data = data_info.get("data", b"")
if not write_data:
logger.warning("No write data in request")
return None
logger.debug(
f"Parsed write address: area={area}, db={db_number}, start={start}, count={count}, data_len={len(write_data)}"
)
return (area, db_number, start, count, bytearray(write_data))
except Exception as e:
logger.error(f"Error parsing write address: {e}")
return None
def _write_to_memory_area(self, area: S7Area, db_number: int, start: int, write_data: bytearray) -> bool:
"""
Write data to registered memory area.
Args:
area: Memory area to write to
db_number: DB number (for DB areas)
start: Start offset
write_data: Data to write
Returns:
True if write succeeded, False otherwise
"""
try:
area_key = (area, db_number)
if area_key not in self.memory_areas:
logger.warning(f"Memory area {area}#{db_number} not registered for write")
return False
# Write to area data with thread safety
with self.area_locks[area_key]:
area_data = self.memory_areas[area_key]
# Check bounds
if start >= len(area_data):
logger.warning(f"Write start address {start} beyond area size {len(area_data)}")
return False
# Calculate write range
end = min(start + len(write_data), len(area_data))
actual_write_len = end - start
# Write the data
area_data[start:end] = write_data[:actual_write_len]
logger.debug(f"Wrote {actual_write_len} bytes to {area}#{db_number} at offset {start}")
# If we didn't write all data due to bounds, return error
if actual_write_len < len(write_data):
logger.warning(f"Only wrote {actual_write_len} of {len(write_data)} bytes due to area bounds")
return False
return True
except Exception as e:
logger.error(f"Error writing to memory area: {e}")
return False
def _parse_request(self, pdu: bytes) -> Dict[str, Any]:
"""
Parse S7 request PDU.
Args:
pdu: Complete S7 PDU
Returns:
Parsed request data
"""
if len(pdu) < 10:
raise S7ProtocolError("PDU too short for S7 header")
# Parse S7 header
header = struct.unpack(">BBHHHH", pdu[:10])
protocol_id, pdu_type, reserved, sequence, param_len, data_len = header
if protocol_id != 0x32:
raise S7ProtocolError(f"Invalid protocol ID: {protocol_id:#02x}")
request: Dict[str, Any] = {
"pdu_type": pdu_type,
"sequence": sequence,
"param_length": param_len,
"data_length": data_len,
"parameters": None,
"data": None,
"error_code": 0,
}
offset = 10
# Parse parameters if present
if param_len > 0:
if offset + param_len > len(pdu):
raise S7ProtocolError("Parameter section extends beyond PDU")
param_data = pdu[offset : offset + param_len]
# Store raw parameters for all request types (needed for upload/download parsing)
request["raw_parameters"] = param_data
if pdu_type == S7PDUType.USERDATA:
request["parameters"] = self._parse_userdata_request_parameters(param_data)
else:
request["parameters"] = self._parse_request_parameters(param_data)
offset += param_len
# Parse data if present
if data_len > 0:
if offset + data_len > len(pdu):
raise S7ProtocolError("Data section extends beyond PDU")
data_section = pdu[offset : offset + data_len]
request["data"] = self._parse_data_section(data_section)
return request
def _parse_request_parameters(self, param_data: bytes) -> Dict[str, Any]:
"""Parse S7 request parameter section."""
if len(param_data) < 1:
return {}
function_code = param_data[0]
if function_code == S7Function.SETUP_COMMUNICATION:
if len(param_data) >= 8:
function_code, reserved, max_amq_caller, max_amq_callee, pdu_length = struct.unpack(">BBHHH", param_data[:8])
return {
"function_code": function_code,
"max_amq_caller": max_amq_caller,
"max_amq_callee": max_amq_callee,
"pdu_length": pdu_length,
}
elif function_code == S7Function.READ_AREA:
# Parse read area parameters
if len(param_data) >= 14: # Minimum for read area request
# Function code (1) + item count (1) + address spec (12)
item_count = param_data[1]
# Parse address specification starting at byte 2
if len(param_data) >= 14:
addr_spec = param_data[2:14] # 12 bytes of address specification
logger.debug(f"Extracted address spec from params: {addr_spec.hex()}")
parsed_addr = self._parse_address_specification(addr_spec)
return {"function_code": function_code, "item_count": item_count, "address_spec": parsed_addr}
elif function_code == S7Function.WRITE_AREA:
# Parse write area parameters (same format as read)
if len(param_data) >= 14: # Minimum for write area request
# Function code (1) + item count (1) + address spec (12)
item_count = param_data[1]
# Parse address specification starting at byte 2
if len(param_data) >= 14:
addr_spec = param_data[2:14] # 12 bytes of address specification
logger.debug(f"Extracted write address spec from params: {addr_spec.hex()}")
parsed_addr = self._parse_address_specification(addr_spec)
return {"function_code": function_code, "item_count": item_count, "address_spec": parsed_addr}
elif function_code == S7Function.PLC_CONTROL:
# Parse PLC control parameters
# Format varies: simple start or PI service (compress/copy_ram_to_rom)
if len(param_data) >= 2:
# Check for restart type (simple start)
restart_type = param_data[1]
if restart_type in (1, 2):
return {"function_code": function_code, "restart_type": restart_type}
# Check for PI service (compress/copy_ram_to_rom)
# Format: func(1) + reserved(7) + pi_len(1) + pi_service
# Or: func(1) + reserved(6) + file_id_len(1) + pi_len(1) + file_id + pi_service
if len(param_data) >= 10:
# Look for PI service
pi_len = param_data[8]
if pi_len > 0 and len(param_data) >= 9 + pi_len:
pi_service = param_data[9 : 9 + pi_len]
# Check for file_id (copy_ram_to_rom)
file_id_len = param_data[7]
file_id = b""
if file_id_len > 0 and len(param_data) >= 9 + file_id_len + pi_len:
# Reparse with file_id
file_id = param_data[9 : 9 + file_id_len]
pi_service = param_data[9 + file_id_len : 9 + file_id_len + pi_len]
return {"function_code": function_code, "pi_service": pi_service, "file_id": file_id}
return {"function_code": function_code}
def _parse_userdata_request_parameters(self, param_data: bytes) -> Dict[str, Any]:
"""
Parse USER_DATA request parameters.
USER_DATA parameter format (from C s7_types.h TReqFunTypedParams):
- Byte 0: Reserved (0x00)
- Byte 1: Parameter count (usually 0x01)
- Byte 2: Type/length header (0x12)
- Byte 3: Length (0x04 or 0x08)
- Byte 4: Method (0x11 = request, 0x12 = response)
- Byte 5: Type (high nibble 0x4=req, 0x8=resp) | Group (low nibble)
- Byte 6: Subfunction
- Byte 7: Sequence number
Args:
param_data: Raw parameter bytes
Returns:
Dictionary with parsed USER_DATA parameters
"""
if len(param_data) < 8:
logger.debug(f"USER_DATA parameters too short: {len(param_data)} bytes")
return {}
try:
# Parse USER_DATA header
# Bytes 0-3 are header (reserved, param_count, type_len_header, length)
method = param_data[4]
type_group = param_data[5]
subfunction = param_data[6]
sequence = param_data[7]
# Extract type (high nibble) and group (low nibble)
req_type = (type_group >> 4) & 0x0F
group = type_group & 0x0F
logger.debug(
f"USER_DATA params: method={method:#02x}, type={req_type}, group={group}, subfunc={subfunction}, seq={sequence}"
)
return {
"method": method,
"type": req_type,
"group": group,
"subfunction": subfunction,
"sequence": sequence,
}
except Exception as e:
logger.error(f"Error parsing USER_DATA parameters: {e}")
return {}
def _parse_address_specification(self, addr_spec: bytes) -> Dict[str, Any]:
"""
Parse S7 address specification.
Args:
addr_spec: 12-byte address specification from client request
Returns:
Dictionary with parsed address information
"""
try:
if len(addr_spec) < 12:
logger.error(f"Address spec too short: {len(addr_spec)} bytes, need 12")
return {}
logger.debug(f"Parsing address spec: {addr_spec.hex()} (length: {len(addr_spec)})")
# Address specification format:
# Byte 0: Specification type (0x12)
# Byte 1: Length of following address specification (0x0A = 10 bytes)
# Byte 2: Syntax ID (0x10 = S7-Any)
# Byte 3: Transport size (word length)
# Bytes 4-5: Count (number of items)
# Bytes 6-7: DB number (for DB area) or 0
# Byte 8: Area code
# Bytes 9-11: Start address (3 bytes, big-endian)
spec_type, length, syntax_id, word_len, count, db_number, area_code, address_bytes = struct.unpack(
">BBBBHHB3s", addr_spec
)
# Extract 3-byte address (big-endian)
address = struct.unpack(">I", b"\x00" + address_bytes)[0] # Pad to 4 bytes
# Convert bit address to byte address
if word_len == S7WordLen.BIT:
byte_addr = address // 8
start_address = byte_addr
else:
start_address = address // 8 # Convert bit address to byte address
return {
"area": S7Area(area_code),
"db_number": db_number,
"start": start_address,
"count": count,
"word_len": word_len,
"spec_type": spec_type,
"syntax_id": syntax_id,
}
except Exception as e:
logger.error(f"Error parsing address specification: {e}")
return {}
def _parse_data_section(self, data_section: bytes) -> Dict[str, Any]:
"""Parse S7 data section."""
if len(data_section) == 1:
# Simple return code (for write responses)
return {"return_code": data_section[0], "transport_size": 0, "data_length": 0, "data": b""}
elif len(data_section) >= 4:
# Full data header (for read responses)
return_code = data_section[0]
transport_size = data_section[1]
data_length = struct.unpack(">H", data_section[2:4])[0]
# Extract actual data - length interpretation depends on transport_size
# Transport size 0x09 (octet string): byte length (USERDATA responses)
# Transport size 0x00: byte length (USERDATA requests)
# Transport size 0x04 (byte): bit length (READ_AREA responses)
if transport_size in (0x00, 0x09):
# USERDATA uses byte length directly
actual_data = data_section[4 : 4 + data_length]
else:
# READ_AREA responses use bit length
actual_data = data_section[4 : 4 + (data_length // 8)]
return {"return_code": return_code, "transport_size": transport_size, "data_length": data_length, "data": actual_data}
else:
return {"raw_data": data_section}
def _build_error_response(self, request: Dict[str, Any], error_code: int) -> bytes:
"""Build an error response PDU.
Uses PDU type ACK (0x02) for error responses without data,
matching real S7-1200/1500 PLC behavior.
"""
error_class = (error_code >> 8) & 0xFF
error_byte = error_code & 0xFF
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK, # PDU type (ACK for errors without data)
0x0000, # Reserved
request.get("sequence", 0), # Sequence (echo)
0x0000, # Parameter length
0x0000, # Data length
error_class, # Error class
error_byte, # Error code
)
return header
# ========================================================================
# USER_DATA PDU Handlers (Chunk 1 of protocol implementation)
# ========================================================================
def _handle_userdata(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle USER_DATA PDU requests.
USER_DATA PDUs are used for:
- Block operations (list, info)
- SZL (System Status List) requests
- Clock operations (get/set time)
- Security operations (password)
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU data
"""
try:
# Parse USER_DATA specific parameters
userdata_params = self._parse_userdata_parameters(request)
if not userdata_params:
logger.warning(f"Failed to parse USER_DATA parameters from {client_address}")
return self._build_userdata_error_response(request, 0x8104) # Object does not exist
group = userdata_params.get("group", 0)
subfunction = userdata_params.get("subfunction", 0)
logger.debug(f"USER_DATA request: group={group:#04x}, subfunction={subfunction:#02x}")
# Route to appropriate handler based on group
if group == S7UserDataGroup.BLOCK_INFO:
return self._handle_block_info(request, userdata_params, client_address)
elif group == S7UserDataGroup.SZL:
return self._handle_szl(request, userdata_params, client_address)
elif group == S7UserDataGroup.TIME:
return self._handle_clock(request, userdata_params, client_address)
elif group == S7UserDataGroup.SECURITY:
return self._handle_security(request, userdata_params, client_address)
else:
logger.warning(f"Unsupported USER_DATA group: {group:#04x}")
return self._build_userdata_error_response(request, 0x8104)
except Exception as e:
logger.error(f"Error handling USER_DATA request: {e}")
return self._build_userdata_error_response(request, 0x8000)
def _parse_userdata_parameters(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse USER_DATA specific parameters.
USER_DATA parameter format (from C s7_types.h):
- Byte 0-2: Parameter header
- Byte 3: Parameter length
- Byte 4: Method (0x11 = request, 0x12 = response)
- Byte 5 (high nibble): Type (0x4 = request, 0x8 = response)
- Byte 5 (low nibble): Function group
- Byte 6: Subfunction
- Byte 7: Sequence number
Args:
request: Parsed S7 request
Returns:
Dictionary with parsed USER_DATA parameters
"""
try:
params = request.get("parameters")
if not params:
# Try to get raw parameter data from request
return {}
# If we have raw parameter data in the request, parse it
raw_params = request.get("raw_parameters", b"")
if not raw_params and isinstance(params, dict):
# Already parsed - check if it has userdata fields
if "group" in params:
return params
return {}
if len(raw_params) < 8:
logger.debug(f"USER_DATA parameters too short: {len(raw_params)} bytes")
return {}
# Parse USER_DATA parameter format
# Skip first 4 bytes (header), then:
method = raw_params[4]
type_group = raw_params[5]
subfunction = raw_params[6]
sequence = raw_params[7]
# Extract type (high nibble) and group (low nibble)
req_type = (type_group >> 4) & 0x0F
group = type_group & 0x0F
return {
"method": method,
"type": req_type,
"group": group,
"subfunction": subfunction,
"sequence": sequence,
}
except Exception as e:
logger.error(f"Error parsing USER_DATA parameters: {e}")
return {}
def _handle_block_info(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle block info group requests (grBlocksInfo).
Subfunctions:
- SFun_ListAll (0x01): List all block counts
- SFun_ListBoT (0x02): List blocks of type
- SFun_BlkInfo (0x03): Get block info
Args:
request: Parsed S7 request
userdata_params: Parsed USER_DATA parameters
client_address: Client address
Returns:
Response PDU
"""
subfunction = userdata_params.get("subfunction", 0)
if subfunction == S7UserDataSubfunction.LIST_ALL:
return self._handle_list_all_blocks(request, userdata_params, client_address)
elif subfunction == S7UserDataSubfunction.LIST_BLOCKS_OF_TYPE:
return self._handle_list_blocks_of_type(request, userdata_params, client_address)
elif subfunction == S7UserDataSubfunction.BLOCK_INFO:
return self._handle_get_block_info(request, userdata_params, client_address)
else:
logger.warning(f"Unsupported block info subfunction: {subfunction:#02x}")
return self._build_userdata_error_response(request, 0x8104)
def _handle_szl(self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle SZL (System Status List) requests.
SZL provides system status information about the PLC.
Common SZL IDs:
- 0x001C: Component identification (for get_cpu_info)
- 0x0011: Module identification (for get_order_code)
- 0x0131: Communication parameters (for get_cp_info)
- 0x0232: Protection level (for get_protection)
Args:
request: Parsed S7 request
userdata_params: Parsed USER_DATA parameters
client_address: Client address
Returns:
Response PDU with SZL data
"""
# Extract SZL ID and index from request data
data_section = request.get("data", {})
raw_data = data_section.get("data", b"")
# SZL request data: return_code (1) + transport (1) + length (2) + SZL_ID (2) + Index (2)
if len(raw_data) >= 4:
szl_id = struct.unpack(">H", raw_data[0:2])[0]
szl_index = struct.unpack(">H", raw_data[2:4])[0]
else:
szl_id = 0
szl_index = 0
logger.debug(f"SZL request from {client_address}: ID={szl_id:#06x}, Index={szl_index:#06x}")
# Get SZL data for the requested ID
szl_data = self._get_szl_data(szl_id, szl_index)
if szl_data is None:
logger.debug(f"SZL ID {szl_id:#06x} not available")
return self._build_userdata_error_response(request, 0x8104)
# Build response with SZL header: SZL_ID (2) + Index (2) + data
response_data = struct.pack(">HH", szl_id, szl_index) + szl_data
return self._build_userdata_success_response(request, userdata_params, response_data)
def _get_szl_data(self, szl_id: int, szl_index: int) -> Optional[bytes]:
"""
Get SZL data for a specific ID and index.
Args:
szl_id: SZL identifier
szl_index: SZL index
Returns:
SZL data bytes or None if not available
"""
# SZL 0x001C: Component identification (S7CpuInfo)
if szl_id == 0x001C:
# S7CpuInfo structure fields (each is a null-terminated string)
module_type = b"CPU 315-2 PN/DP\x00"
serial_number = b"S C-C2UR28922012\x00"
as_name = b"SNAP7-SERVER\x00"
copyright_info = b"Original Siemens Equipment\x00"
module_name = b"CPU 315-2 PN/DP\x00"
# Pad to fixed sizes (from C structure)
module_type = module_type.ljust(32, b"\x00")[:32]
serial_number = serial_number.ljust(24, b"\x00")[:24]
as_name = as_name.ljust(24, b"\x00")[:24]
copyright_info = copyright_info.ljust(26, b"\x00")[:26]
module_name = module_name.ljust(24, b"\x00")[:24]
return module_type + serial_number + as_name + copyright_info + module_name
# SZL 0x0011: Module identification (S7OrderCode)
elif szl_id == 0x0011:
order_code = b"6ES7 315-2EH14-0AB0\x00"
version = b"V3.3\x00"
order_code = order_code.ljust(20, b"\x00")[:20]
version = version.ljust(4, b"\x00")[:4]
return order_code + version
# SZL 0x0131: Communication parameters (S7CpInfo)
elif szl_id == 0x0131:
# S7CpInfo structure
max_pdu = 480
max_connections = 32
max_mpi = 12
max_bus = 12
return struct.pack(">HHHH", max_pdu, max_connections, max_mpi, max_bus)
# SZL 0x0232: Protection level (S7Protection)
elif szl_id == 0x0232:
# S7Protection structure
# sch_schal: 1=no password, 2=password level 1, 3=password level 2
# sch_par: protection level during runtime
# sch_rel: protection level during download
# bart_sch: startup protection level
# anl_sch: factory setting protection
return struct.pack(">HHHHH", 1, 0, 0, 0, 0) # No protection
# SZL 0x0000: SZL list
elif szl_id == 0x0000:
# Return list of available SZL IDs
available_ids = [0x0000, 0x0011, 0x001C, 0x0131, 0x0232]
data = b""
for id_val in available_ids:
data += struct.pack(">H", id_val)
return data
return None
def _handle_clock(self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle clock requests (get/set time).
Supports:
- GET_CLOCK (0x01): Returns current server time in BCD format
- SET_CLOCK (0x02): Accepts time setting (logs but doesn't persist)
Args:
request: Parsed S7 request
userdata_params: Parsed USER_DATA parameters
client_address: Client address
Returns:
Response PDU with clock data
"""
subfunction = userdata_params.get("subfunction", 0)
if subfunction == 0x01: # GET_CLOCK
return self._handle_get_clock(request, userdata_params, client_address)
elif subfunction == 0x02: # SET_CLOCK
return self._handle_set_clock(request, userdata_params, client_address)
else:
logger.warning(f"Unknown clock subfunction: {subfunction:#04x}")
return self._build_userdata_error_response(request, 0x8104)
def _handle_get_clock(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle get clock request - returns current server time.
Returns time in BCD format (8 bytes):
- Byte 0: Reserved (0x00)
- Byte 1: Year (BCD, 0-99)
- Byte 2: Month (BCD, 1-12)
- Byte 3: Day (BCD, 1-31)
- Byte 4: Hour (BCD, 0-23)
- Byte 5: Minute (BCD, 0-59)
- Byte 6: Second (BCD, 0-59)
- Byte 7: Day of week (1=Monday)
"""
from datetime import datetime
now = datetime.now()
def to_bcd(value: int) -> int:
return ((value // 10) << 4) | (value % 10)
year = now.year % 100
bcd_time = struct.pack(
">BBBBBBBB",
0x00, # Reserved
to_bcd(year), # Year (BCD)
to_bcd(now.month), # Month (BCD)
to_bcd(now.day), # Day (BCD)
to_bcd(now.hour), # Hour (BCD)
to_bcd(now.minute), # Minute (BCD)
to_bcd(now.second), # Second (BCD)
(now.weekday() + 1) & 0x0F, # Day of week (1=Monday)
)
logger.debug(f"Get clock from {client_address}: returning {now}")
return self._build_userdata_success_response(request, userdata_params, bcd_time)
def _handle_set_clock(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle set clock request - accepts time setting.
The emulator logs the time but doesn't persist it (always returns current time on get).
"""
data_section = request.get("data", {})
raw_data = data_section.get("data", b"")
if len(raw_data) >= 8:
def from_bcd(value: int) -> int:
return ((value >> 4) * 10) + (value & 0x0F)
year = from_bcd(raw_data[1])
month = from_bcd(raw_data[2])
day = from_bcd(raw_data[3])
hour = from_bcd(raw_data[4])
minute = from_bcd(raw_data[5])
second = from_bcd(raw_data[6])
logger.info(
f"Set clock from {client_address}: 20{year:02d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
)
else:
logger.debug(f"Set clock from {client_address}: no time data provided")
# Return success (empty response data)
return self._build_userdata_success_response(request, userdata_params, b"")
def _handle_security(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle security requests (password operations).
Stub implementation - returns success (no password required).
"""
logger.debug(f"Security request from {client_address} (returning success)")
# Return success - emulator doesn't require password
return self._build_userdata_success_response(request, userdata_params, b"")
def _handle_list_all_blocks(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle list all blocks request (SFun_ListAll).
Returns count of each block type (OB, FB, FC, DB, SDB, SFC, SFB).
Response data format (TDataFunListAll):
For each block type (7 types):
- Byte 0: 0x30 (indicator)
- Byte 1: Block type code
- Bytes 2-3: Block count (big-endian)
Args:
request: Parsed S7 request
userdata_params: Parsed USER_DATA parameters
client_address: Client address
Returns:
Response PDU with block counts
"""
logger.debug(f"List all blocks request from {client_address}")
# Count registered DB areas
db_count = sum(1 for (area, _) in self.memory_areas.keys() if area == S7Area.DB)
# Block type codes (from C s7_types.h)
BLOCK_OB = 0x38 # Organization Block
BLOCK_DB = 0x41 # Data Block
BLOCK_SDB = 0x42 # System Data Block
BLOCK_FC = 0x43 # Function
BLOCK_SFC = 0x44 # System Function
BLOCK_FB = 0x45 # Function Block
BLOCK_SFB = 0x46 # System Function Block
# Build response data - 4 bytes per block type, 7 block types
# Format: 0x30 | block_type | count (2 bytes big-endian)
data = b""
for block_type, count in [
(BLOCK_OB, 0), # No OBs in emulator
(BLOCK_FB, 0), # No FBs
(BLOCK_FC, 0), # No FCs
(BLOCK_DB, db_count), # Registered DBs
(BLOCK_SDB, 0), # No SDBs
(BLOCK_SFC, 0), # No SFCs
(BLOCK_SFB, 0), # No SFBs
]:
data += struct.pack(">BBH", 0x30, block_type, count)
logger.debug(f"List all blocks: DB count = {db_count}")
return self._build_userdata_success_response(request, userdata_params, data)
def _handle_list_blocks_of_type(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle list blocks of type request (SFun_ListBoT).
Returns list of block numbers for a specific block type.
Request data contains:
- Block type code to query
Response data format:
- 2 bytes per block: block number (big-endian)
Args:
request: Parsed S7 request
userdata_params: Parsed USER_DATA parameters
client_address: Client address
Returns:
Response PDU with block numbers
"""
logger.debug(f"List blocks of type request from {client_address}")
# Get requested block type from request data section
data_section = request.get("data", {})
raw_data = data_section.get("data", b"")
# Block type code constants
block_db = 0x41 # Data Block
# Handle both new format [0x30, type] and old format [type]
if len(raw_data) >= 2 and raw_data[0] == 0x30:
requested_type = raw_data[1]
elif len(raw_data) > 0:
requested_type = raw_data[0]
else:
requested_type = block_db
# Currently only support DB type (others not implemented in emulator)
if requested_type == block_db:
# Get all registered DB numbers
db_numbers = sorted([idx for (area, idx) in self.memory_areas.keys() if area == S7Area.DB])
# Build response data - 4 bytes per block (TDataFunGetBotItem:
# BlockNum(2) + Unknown(1) + BlockLang(1))
data = b""
for db_num in db_numbers:
data += struct.pack(">HBB", db_num, 0, 0)
logger.debug(f"List blocks of type DB: {db_numbers}")
return self._build_userdata_success_response(request, userdata_params, data)
else:
# Other block types not available in emulator
logger.debug(f"Block type {requested_type:#02x} not available")
return self._build_userdata_success_response(request, userdata_params, b"")
def _handle_get_block_info(
self, request: Dict[str, Any], userdata_params: Dict[str, Any], client_address: Tuple[str, int]
) -> bytes:
"""
Handle get block info request (SFun_BlkInfo).
Returns information about a specific block.
Request data contains:
- Block type code
- Block number
- Block language (optional)
Response data format (TS7BlockInfo):
- Various block metadata fields
Args:
request: Parsed S7 request
userdata_params: Parsed USER_DATA parameters
client_address: Client address
Returns:
Response PDU with block info
"""
logger.debug(f"Get block info request from {client_address}")
# Get requested block from request data section
data_section = request.get("data", {})
raw_data = data_section.get("data", b"")
# Block type code constants
block_db = 0x41 # Data Block
# Parse request: handle new format [0x30, type, 'A', ASCII_num(5)]
# and old format [type, num(2), 0x41]
if len(raw_data) >= 8 and raw_data[0] == 0x30:
# New format: 0x30 + type + 'A' + 5-digit ASCII number
requested_type = raw_data[1]
try:
block_number = int(raw_data[3:8].decode("ascii"))
except (ValueError, UnicodeDecodeError):
block_number = 1
elif len(raw_data) >= 3:
# Old format: type(1) + number(2) + filesystem(1)
requested_type = raw_data[0]
block_number = struct.unpack(">H", raw_data[1:3])[0]
else:
# Default values
requested_type = block_db
block_number = 1
# Check if block exists
if requested_type == block_db:
area_key = (S7Area.DB, block_number)
if area_key in self.memory_areas:
block_size = len(self.memory_areas[area_key])
# Build TResDataBlockInfo structure (78 bytes)
# Layout per Snap7 C s7_types.h
data = bytearray(78)
data[0] = 0x30 # Const
data[1] = requested_type # BlkType
data[9] = 0 # BlkFlags
data[10] = 0 # BlkLang
data[11] = requested_type # SubBlkType
struct.pack_into(">H", data, 12, block_number) # BlkNumber
struct.pack_into(">I", data, 14, block_size) # LoadSize
struct.pack_into(">H", data, 34, 0) # SBBLength
struct.pack_into(">H", data, 38, 0) # LocalData
struct.pack_into(">H", data, 40, block_size) # MC7Size
# Author (8 bytes at offset 42)
data[42:50] = b"SNAP7EMU"
# Family (8 bytes at offset 50)
data[50:58] = b"EMULATOR"
# Header (8 bytes at offset 58)
data[58:60] = b"DB"
data[66] = 1 # Version
logger.debug(f"Get block info for DB{block_number}: size={block_size}")
return self._build_userdata_success_response(request, userdata_params, bytes(data))
else:
logger.debug(f"Block DB{block_number} not found")
return self._build_userdata_error_response(request, 0x8104) # Object not found
else:
# Other block types not available
logger.debug(f"Block type {requested_type:#02x} not available")
return self._build_userdata_error_response(request, 0x8104)
def _build_userdata_error_response(self, request: Dict[str, Any], error_code: int) -> bytes:
"""
Build USER_DATA error response PDU.
Args:
request: Original request
error_code: S7 error code
Returns:
Error response PDU
"""
# USER_DATA response format is different from standard response
# Parameter section (12-byte format per TS7Params7)
param_data = struct.pack(
">BBBBBBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x08, # Length (8 bytes following)
0x12, # Method (response)
0x84, # Type (8=response) | Group (4=SZL, but used for error)
0x01, # Subfunction
0x00, # Sequence
0x00, # Data unit reference
0x00, # Last data unit
0x00, # Error code high
0x00, # Error code low
)
# Data section: return code only (error code in transport format)
data_section = struct.pack(">BBH", (error_code >> 8) & 0xFF, 0x00, 0)
# Build S7 header for USERDATA (10 bytes, no error_class/error_code in header)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type
0x0000, # Reserved
request.get("sequence", 0), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
def _build_userdata_success_response(self, request: Dict[str, Any], userdata_params: Dict[str, Any], data: bytes) -> bytes:
"""
Build USER_DATA success response PDU.
Args:
request: Original request
userdata_params: Parsed USER_DATA parameters
data: Response data
Returns:
Success response PDU
"""
group = userdata_params.get("group", 0)
subfunction = userdata_params.get("subfunction", 0)
seq = userdata_params.get("sequence", 0)
# Parameter section for success response (12-byte format per TS7Params7)
param_data = struct.pack(
">BBBBBBBBBBBB",
0x00, # Reserved
0x01, # Parameter count
0x12, # Type/length header
0x08, # Length (8 bytes following)
0x12, # Method (response)
0x80 | group, # Type (8=response) | Group
subfunction, # Subfunction
seq, # Sequence
0x00, # Data unit reference
0x00, # Last data unit
0x00, # Error code high
0x00, # Error code low
)
# Data section: return code (0xFF = success) + data
data_section = struct.pack(">BBH", 0xFF, 0x09, len(data)) + data
# Build S7 header for USERDATA (10 bytes, no error_class/error_code in header)
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
S7PDUType.USERDATA, # PDU type
0x0000, # Reserved
request.get("sequence", 0), # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
)
return header + param_data + data_section
# ========================================================================
# Block Transfer Handlers (Upload/Download/Delete)
# ========================================================================
def _handle_start_upload(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle start upload request.
Parses the block address and returns upload ID and block length.
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU with upload ID and block length
"""
try:
raw_params = request.get("raw_parameters", b"")
# Parse block address from parameters
# Format: function + status + reserved + upload_id + block_addr_len + block_addr
block_type = 0x41 # Default to DB
block_num = 1
if len(raw_params) >= 10:
addr_len = raw_params[9]
if len(raw_params) >= 10 + addr_len:
block_addr = raw_params[10 : 10 + addr_len]
# Parse block address: type (2 hex) + num (5 digits) + filesystem
try:
block_type = int(block_addr[0:2], 16)
block_num = int(block_addr[2:7])
except (ValueError, IndexError):
pass
logger.info(f"Start upload request from {client_address}: type={block_type:#02x}, num={block_num}")
# Generate upload ID and get block length
upload_id = 1 # Simple upload ID
block_length = 0
# Check if block exists
if block_type == 0x41: # DB
area_key = (S7Area.DB, block_num)
if area_key in self.memory_areas:
block_length = len(self.memory_areas[area_key])
# Store upload context for this client
if not hasattr(self, "_upload_contexts"):
self._upload_contexts: Dict[Tuple[str, int], Dict[str, Any]] = {}
self._upload_contexts[client_address] = {
"upload_id": upload_id,
"block_type": block_type,
"block_num": block_num,
"offset": 0,
}
# Build response: function + status + reserved + upload_id + block_len_string_len + block_len_string
block_len_str = f"{block_length:06d}".encode("ascii")
param_data = (
struct.pack(
">BBBIB",
S7Function.START_UPLOAD,
0x00, # Status
0x00, # Reserved
upload_id,
len(block_len_str),
)
+ block_len_str
)
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence
len(param_data), # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
return header + param_data
except Exception as e:
logger.error(f"Error handling start upload: {e}")
return self._build_error_response(request, 0x8000)
def _handle_upload(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle upload request - return block data.
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU with block data
"""
try:
# Get upload context for this client
if not hasattr(self, "_upload_contexts") or client_address not in self._upload_contexts:
logger.warning(f"Upload request without start_upload from {client_address}")
return self._build_error_response(request, 0x8104)
ctx = self._upload_contexts[client_address]
block_type = ctx["block_type"]
block_num = ctx["block_num"]
# Get block data
block_data = b""
if block_type == 0x41: # DB
area_key = (S7Area.DB, block_num)
if area_key in self.memory_areas:
with self.area_locks[area_key]:
block_data = bytes(self.memory_areas[area_key])
logger.info(f"Upload request from {client_address}: sending {len(block_data)} bytes")
# Build response with data
# Status: 0x00 = more data, 0x01 = last packet
param_data = struct.pack(
">BBBI",
S7Function.UPLOAD,
0x01, # Status: last packet
0x00, # Reserved
ctx["upload_id"],
)
# Data section: length (2 bytes) + unknown (2 bytes) + data
data_section = struct.pack(">HH", len(block_data), 0x00FB) + block_data
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence
len(param_data), # Parameter length
len(data_section), # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
return header + param_data + data_section
except Exception as e:
logger.error(f"Error handling upload: {e}")
return self._build_error_response(request, 0x8000)
def _handle_end_upload(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle end upload request.
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU acknowledging end of upload
"""
try:
# Clean up upload context
if hasattr(self, "_upload_contexts") and client_address in self._upload_contexts:
del self._upload_contexts[client_address]
logger.info(f"End upload from {client_address}")
# Build simple response
param_data = struct.pack(">B", S7Function.END_UPLOAD)
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence
len(param_data), # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
return header + param_data
except Exception as e:
logger.error(f"Error handling end upload: {e}")
return self._build_error_response(request, 0x8000)
def _handle_request_download(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle request download - acknowledge download request.
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU acknowledging download request
"""
try:
raw_params = request.get("raw_parameters", b"")
# Parse block address from parameters
block_type = 0x41 # Default to DB
block_num = 1
if len(raw_params) >= 6:
addr_len = raw_params[5]
if len(raw_params) >= 6 + addr_len:
block_addr = raw_params[6 : 6 + addr_len]
try:
block_type = int(block_addr[0:2], 16)
block_num = int(block_addr[2:7])
except (ValueError, IndexError):
pass
logger.info(f"Request download from {client_address}: type={block_type:#02x}, num={block_num}")
# Store download context
if not hasattr(self, "_download_contexts"):
self._download_contexts: Dict[Tuple[str, int], Dict[str, Any]] = {}
self._download_contexts[client_address] = {
"block_type": block_type,
"block_num": block_num,
"data": bytearray(),
}
# Build response acknowledging download
param_data = struct.pack(">B", S7Function.REQUEST_DOWNLOAD)
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence
len(param_data), # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
return header + param_data
except Exception as e:
logger.error(f"Error handling request download: {e}")
return self._build_error_response(request, 0x8000)
def _handle_download_block(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle download block - receive block data.
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU acknowledging data receipt
"""
try:
# Get download context
if not hasattr(self, "_download_contexts") or client_address not in self._download_contexts:
logger.warning(f"Download block without request_download from {client_address}")
return self._build_error_response(request, 0x8104)
ctx = self._download_contexts[client_address]
# Extract data from request
data_info = request.get("data", {})
block_data = data_info.get("data", b"")
# Append data to context
ctx["data"].extend(block_data)
logger.info(f"Download block from {client_address}: received {len(block_data)} bytes")
# Build response
param_data = struct.pack(">B", S7Function.DOWNLOAD_BLOCK)
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence
len(param_data), # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
return header + param_data
except Exception as e:
logger.error(f"Error handling download block: {e}")
return self._build_error_response(request, 0x8000)
def _handle_download_ended(self, request: Dict[str, Any], client_address: Tuple[str, int]) -> bytes:
"""
Handle download ended - finalize block storage.
Args:
request: Parsed S7 request
client_address: Client address for logging
Returns:
Response PDU confirming download complete
"""
try:
# Get download context
if not hasattr(self, "_download_contexts") or client_address not in self._download_contexts:
logger.warning(f"Download ended without download_block from {client_address}")
return self._build_error_response(request, 0x8104)
ctx = self._download_contexts[client_address]
block_type = ctx["block_type"]
block_num = ctx["block_num"]
block_data = ctx["data"]
# Store block data
if block_type == 0x41: # DB
area_key = (S7Area.DB, block_num)
if area_key in self.memory_areas:
# Update existing area - copy data into existing area without resizing
with self.area_locks[area_key]:
existing_area = self.memory_areas[area_key]
copy_len = min(len(block_data), len(existing_area))
existing_area[0:copy_len] = block_data[0:copy_len]
else:
# Create new area
self.memory_areas[area_key] = bytearray(block_data)
self.area_locks[area_key] = threading.Lock()
logger.info(f"Download ended from {client_address}: stored {len(block_data)} bytes to {block_type:#02x}:{block_num}")
# Clean up context
del self._download_contexts[client_address]
# Build response
param_data = struct.pack(">B", S7Function.DOWNLOAD_ENDED)
header = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA, # PDU type
0x0000, # Reserved
request["sequence"], # Sequence
len(param_data), # Parameter length
0x0000, # Data length
0x00, # Error class (success)
0x00, # Error code (success)
)
return header + param_data
except Exception as e:
logger.error(f"Error handling download ended: {e}")
return self._build_error_response(request, 0x8000)
[docs]
def __enter__(self) -> "Server":
"""Context manager entry."""
return self
[docs]
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Context manager exit."""
self.destroy()
[docs]
class ServerISOConnection:
"""ISO connection wrapper for server-side communication."""
# COTP PDU types
COTP_CR = 0xE0 # Connection Request
COTP_CC = 0xD0 # Connection Confirm
COTP_DR = 0x80 # Disconnect Request
COTP_DC = 0xC0 # Disconnect Confirm
COTP_DT = 0xF0 # Data Transfer
[docs]
def __init__(self, client_socket: socket.socket):
"""Initialize server ISO connection."""
self.socket = client_socket
self.socket.settimeout(5.0)
self.connected = False
self.src_ref = 0x0001 # Server reference
self.dst_ref = 0x0000 # Client reference (assigned during handshake)
[docs]
def accept_connection(self) -> bool:
"""Accept ISO connection from client."""
try:
# Receive COTP Connection Request
tpkt_header = self._recv_exact(4)
version, reserved, length = struct.unpack(">BBH", tpkt_header)
if version != 3:
logger.error(f"Invalid TPKT version: {version}")
return False
payload = self._recv_exact(length - 4)
# Parse COTP Connection Request
if not self._parse_cotp_cr(payload):
return False
# Send COTP Connection Confirm
cc_pdu = self._build_cotp_cc()
tpkt_frame = self._build_tpkt(cc_pdu)
self.socket.sendall(tpkt_frame)
self.connected = True
logger.debug("ISO connection established")
return True
except Exception as e:
logger.error(f"Error accepting ISO connection: {e}")
return False
[docs]
def receive_data(self) -> bytes:
"""Receive data from client."""
# Receive TPKT header (4 bytes)
tpkt_header = self._recv_exact(4)
# Parse TPKT header
version, reserved, length = struct.unpack(">BBH", tpkt_header)
if version != 3:
raise S7ConnectionError(f"Invalid TPKT version: {version}")
# Receive remaining data
remaining = length - 4
if remaining <= 0:
raise S7ConnectionError("Invalid TPKT length")
payload = self._recv_exact(remaining)
# Parse COTP header and extract data
return self._parse_cotp_data(payload)
[docs]
def send_data(self, data: bytes) -> None:
"""Send data to client."""
# Wrap data in COTP Data Transfer PDU
cotp_data = self._build_cotp_dt(data)
# Wrap in TPKT frame
tpkt_frame = self._build_tpkt(cotp_data)
# Send over TCP
self.socket.sendall(tpkt_frame)
def _parse_cotp_cr(self, data: bytes) -> bool:
"""Parse COTP Connection Request."""
if len(data) < 7:
logger.error("COTP CR too short")
return False
pdu_len, pdu_type, dst_ref, src_ref, class_opt = struct.unpack(">BBHHB", data[:7])
if pdu_type != self.COTP_CR:
logger.error(f"Expected COTP CR, got {pdu_type:#02x}")
return False
# Store client reference
self.dst_ref = src_ref
logger.debug(f"Received COTP CR from client ref {src_ref}")
return True
def _build_cotp_cc(self) -> bytes:
"""Build COTP Connection Confirm."""
# Basic COTP CC
base_pdu = struct.pack(
">BBHHB",
6, # PDU length
self.COTP_CC, # PDU type
self.dst_ref, # Destination reference (client's source ref)
self.src_ref, # Source reference (our ref)
0x00, # Class/option
)
return struct.pack(">B", 6) + base_pdu[1:]
def _recv_exact(self, size: int) -> bytes:
"""Receive exactly the specified number of bytes."""
data = bytearray()
while len(data) < size:
chunk = self.socket.recv(size - len(data))
if not chunk:
raise ConnectionResetError("Connection closed by peer")
data.extend(chunk)
return bytes(data)
def _build_tpkt(self, payload: bytes) -> bytes:
"""Build TPKT frame."""
length = len(payload) + 4
return struct.pack(">BBH", 3, 0, length) + payload
def _build_cotp_dt(self, data: bytes) -> bytes:
"""Build COTP Data Transfer PDU."""
header = struct.pack(">BBB", 2, self.COTP_DT, 0x80)
return header + data
def _parse_cotp_data(self, cotp_pdu: bytes) -> bytes:
"""Parse COTP Data Transfer PDU and extract S7 data."""
if len(cotp_pdu) < 3:
raise S7ConnectionError("Invalid COTP DT: too short")
pdu_len, pdu_type, eot_num = struct.unpack(">BBB", cotp_pdu[:3])
if pdu_type != self.COTP_DT:
raise S7ConnectionError(f"Expected COTP DT, got {pdu_type:#02x}")
return cotp_pdu[3:] # Return data portion
[docs]
def mainloop(tcp_port: int = 1102, init_standard_values: bool = False) -> None:
"""
Initialize a pure Python S7 server with default values.
Args:
tcp_port: Port that the server will listen on
init_standard_values: If True, initialize some default values
"""
server = Server()
# Create standard memory areas - need at least 600 bytes for test data
db_size = 600
db_data = bytearray(db_size)
pa_data = bytearray(100)
pe_data = bytearray(100)
mk_data = bytearray(100)
tm_data = bytearray(100)
ct_data = bytearray(100)
# Register memory areas
# DB 0 for test_mainloop.py, DB 1 for other tests
server.register_area(SrvArea.DB, 0, db_data)
server.register_area(SrvArea.DB, 1, bytearray(db_size))
# Register at index 0 (used by most tests) and index 1
server.register_area(SrvArea.PA, 0, pa_data)
server.register_area(SrvArea.PA, 1, bytearray(100))
server.register_area(SrvArea.PE, 0, pe_data)
server.register_area(SrvArea.PE, 1, bytearray(100))
server.register_area(SrvArea.MK, 0, mk_data)
server.register_area(SrvArea.MK, 1, bytearray(100))
server.register_area(SrvArea.TM, 0, tm_data)
server.register_area(SrvArea.TM, 1, bytearray(100))
server.register_area(SrvArea.CT, 0, ct_data)
server.register_area(SrvArea.CT, 1, bytearray(100))
if init_standard_values:
logger.info("Initializing with standard values for tests")
# test_read_booleans: offset 0, expects 0xAA (alternating False/True: 0,1,0,1,0,1,0,1)
db_data[0] = 0xAA # Binary: 10101010
# test_read_small_int: offset 10, expects -128, 0, 100, 127 (signed bytes)
db_data[10] = 0x80 # -128 as signed byte
db_data[11] = 0x00 # 0
db_data[12] = 100 # 100
db_data[13] = 127 # 127
# test_read_unsigned_small_int: offset 20, expects 0, 255
db_data[20] = 0 # 0
db_data[21] = 255 # 255
# test_read_int: offset 30, expects -32768, -1234, 0, 1234, 32767 (signed 16-bit, big-endian)
struct.pack_into(">h", db_data, 30, -32768)
struct.pack_into(">h", db_data, 32, -1234)
struct.pack_into(">h", db_data, 34, 0)
struct.pack_into(">h", db_data, 36, 1234)
struct.pack_into(">h", db_data, 38, 32767)
# test_read_double_int: offset 40, expects -2147483648, -32768, 0, 32767, 2147483647 (signed 32-bit)
struct.pack_into(">i", db_data, 40, -2147483648)
struct.pack_into(">i", db_data, 44, -32768)
struct.pack_into(">i", db_data, 48, 0)
struct.pack_into(">i", db_data, 52, 32767)
struct.pack_into(">i", db_data, 56, 2147483647)
# test_read_real: offset 60, expects various float values (9 floats = 36 bytes)
struct.pack_into(">f", db_data, 60, -3.402823e38)
struct.pack_into(">f", db_data, 64, -3.402823e12)
struct.pack_into(">f", db_data, 68, -175494351e-38)
struct.pack_into(">f", db_data, 72, -1.175494351e-12)
struct.pack_into(">f", db_data, 76, 0.0)
struct.pack_into(">f", db_data, 80, 1.175494351e-38)
struct.pack_into(">f", db_data, 84, 1.175494351e-12)
struct.pack_into(">f", db_data, 88, 3.402823466e12)
struct.pack_into(">f", db_data, 92, 3.402823466e38)
# test_read_string: offset 100, expects "the brown fox jumps over the lazy dog"
# S7 string format: max_len (1 byte), actual_len (1 byte), then string data
test_string = "the brown fox jumps over the lazy dog"
db_data[100] = 254 # Max length
db_data[101] = len(test_string) # Actual length
db_data[102 : 102 + len(test_string)] = test_string.encode("ascii")
# test_read_word: offset 400, expects 0x0000, 0x1234, 0xABCD, 0xFFFF (unsigned 16-bit)
struct.pack_into(">H", db_data, 400, 0x0000)
struct.pack_into(">H", db_data, 404, 0x1234)
struct.pack_into(">H", db_data, 408, 0xABCD)
struct.pack_into(">H", db_data, 412, 0xFFFF)
# test_read_double_word: offset 500, expects 0x00000000, 0x12345678, 0x1234ABCD, 0xFFFFFFFF (unsigned 32-bit)
struct.pack_into(">I", db_data, 500, 0x00000000)
struct.pack_into(">I", db_data, 508, 0x12345678)
struct.pack_into(">I", db_data, 516, 0x1234ABCD)
struct.pack_into(">I", db_data, 524, 0xFFFFFFFF)
# Start server
server.start(tcp_port)
try:
logger.info(f"Pure Python S7 server running on port {tcp_port}")
logger.info("Press Ctrl+C to stop")
# Keep server running
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Stopping server...")
finally:
server.stop()
server.destroy()