"""
Snap7 server used for mimicking a siemens 7 server.
"""
import re
import time
from ctypes import (
c_char,
byref,
sizeof,
c_int,
c_int32,
c_uint32,
c_void_p,
CFUNCTYPE,
POINTER,
)
from _ctypes import CFuncPtr
import struct
import logging
from typing import Any, Callable, Optional, Tuple, cast, Type
from types import TracebackType
from ..common import ipv4, load_library
from ..error import check_error, error_wrap
from ..protocol import Snap7CliProtocol
from ..type import SrvEvent, Parameter, cpu_statuses, server_statuses, SrvArea, longword, WordLen, S7Object, CDataArrayType
logger = logging.getLogger(__name__)
[docs]
class Server:
"""
A fake S7 server.
"""
_lib: Snap7CliProtocol
_s7_server: S7Object
_read_callback = None
_callback: Optional[Callable[..., Any]] = None
[docs]
def __init__(self, log: bool = True):
"""Create a fake S7 server. set log to false if you want to disable
event logging to python logging.
Args:
log: `True` for enabling the event logging.
"""
self._lib: Snap7CliProtocol = load_library()
self.create()
if log:
self._set_log_callback()
def __enter__(self) -> "Server":
return self
def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.destroy()
def __del__(self) -> None:
self.destroy()
[docs]
def event_text(self, event: SrvEvent) -> str:
"""Returns a textual explanation of a given event object
Args:
event: an PSrvEvent struct object
Returns:
The error string
"""
logger.debug(f"error text for {hex(event.EvtCode)}")
len_ = 1024
text_type = c_char * len_
text = text_type()
error = self._lib.Srv_EventText(byref(event), byref(text), len_)
check_error(error)
return text.value.decode("ascii")
[docs]
def create(self) -> None:
"""Create the server."""
logger.info("creating server")
self._lib.Srv_Create.restype = S7Object # type: ignore[attr-defined]
self._s7_server = S7Object(self._lib.Srv_Create())
@error_wrap(context="server")
def register_area(self, area: SrvArea, index: int, userdata: CDataArrayType) -> int:
"""Shares a memory area with the server. That memory block will be
visible by the clients.
Args:
area: memory area to register.
index: number of area to write.
userdata: buffer with the data to write.
Returns:
Error code from snap7 library.
"""
size = sizeof(userdata)
logger.info(f"registering area {area}, index {index}, size {size}")
return self._lib.Srv_RegisterArea(self._s7_server, area.value, index, byref(userdata), size)
@error_wrap(context="server")
def set_events_callback(self, call_back: Callable[..., Any]) -> int:
"""Sets the user callback that the Server object has to call when an
event is created.
"""
logger.info("setting event callback")
callback_wrap: Callable[..., Any] = CFUNCTYPE(None, c_void_p, POINTER(SrvEvent), c_int)
def wrapper(_: Optional[c_void_p], event: SrvEvent, __: int) -> int:
"""Wraps python function into a ctypes function
Args:
_: not used
event: pointer to snap7 event struct
__: not used
Returns:
Should return an int
"""
logger.info(f"callback event: {self.event_text(event.contents)}")
call_back(event.contents)
return 0
self._callback = cast(type[CFuncPtr], callback_wrap(wrapper))
data = c_void_p()
return self._lib.Srv_SetEventsCallback(self._s7_server, self._callback, data)
@error_wrap(context="server")
def set_read_events_callback(self, call_back: Callable[..., Any]) -> int:
"""Sets the user callback that the Server object has to call when a Read
event is created.
Args:
call_back: a callback function that accepts an event argument.
"""
logger.info("setting read event callback")
callback_wrapper: Callable[..., Any] = CFUNCTYPE(None, c_void_p, POINTER(SrvEvent), c_int)
def wrapper(_: Optional[c_void_p], event: SrvEvent, __: int) -> int:
"""Wraps python function into a ctypes function
Args:
_: data, not used
event: pointer to snap7 event struct
__: size, not used
Returns:
Should return an int
"""
logger.info(f"callback event: {self.event_text(event.contents)}")
call_back(event.contents)
return 0
self._read_callback = callback_wrapper(wrapper)
return self._lib.Srv_SetReadEventsCallback(self._s7_server, self._read_callback)
def _set_log_callback(self) -> None:
"""Sets a callback that logs the events"""
logger.debug("setting up event logger")
def log_callback(event: SrvEvent) -> None:
logger.info(f"callback event: {self.event_text(event)}")
self.set_events_callback(log_callback)
@error_wrap(context="server")
def start(self, tcp_port: int = 102) -> int:
"""Starts the server.
Args:
tcp_port: port that the server will listen. Optional.
"""
if tcp_port != 102:
logger.info(f"setting server TCP port to {tcp_port}")
self.set_param(Parameter.LocalPort, tcp_port)
logger.info(f"starting server on 0.0.0.0:{tcp_port}")
return self._lib.Srv_Start(self._s7_server)
@error_wrap(context="server")
def stop(self) -> int:
"""Stop the server."""
logger.info("stopping server")
return self._lib.Srv_Stop(self._s7_server)
[docs]
def destroy(self) -> None:
"""Destroy the server."""
logger.info("destroying server")
if self._lib and self._s7_server is not None:
return self._lib.Srv_Destroy(byref(self._s7_server))
self._s7_server = None # type: ignore[assignment]
return None
[docs]
def get_status(self) -> Tuple[str, str, int]:
"""Reads the server status, the Virtual CPU status and the number of
the clients connected.
Returns:
Server status, cpu status, client count
"""
logger.debug("get server status")
server_status = c_int()
cpu_status = c_int()
clients_count = c_int()
error = self._lib.Srv_GetStatus(self._s7_server, byref(server_status), byref(cpu_status), byref(clients_count))
check_error(error)
logger.debug(f"status server {server_status.value} cpu {cpu_status.value} clients {clients_count.value}")
return server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value
@error_wrap(context="server")
def unregister_area(self, area: SrvArea, index: int) -> int:
"""Unregisters a memory area previously registered with Srv_RegisterArea().
Notes:
That memory block will be no longer visible by the clients.
Args:
area: memory area.
index: number of the memory area.
Returns:
Error code from snap7 library.
"""
return self._lib.Srv_UnregisterArea(self._s7_server, area.value, index)
@error_wrap(context="server")
def unlock_area(self, area: SrvArea, index: int) -> int:
"""Unlocks a previously locked shared memory area.
Args:
area: memory area.
index: number of the memory area.
Returns:
Error code from snap7 library.
"""
logger.debug(f"unlocking area code {area} index {index}")
return self._lib.Srv_UnlockArea(self._s7_server, area.value, index)
@error_wrap(context="server")
def lock_area(self, area: SrvArea, index: int) -> int:
"""Locks a shared memory area.
Args:
area: memory area.
index: number of the memory area.
Returns:
Error code from snap7 library.
"""
logger.debug(f"locking area code {area} index {index}")
return self._lib.Srv_LockArea(self._s7_server, area.value, index)
@error_wrap(context="server")
def start_to(self, ip: str, tcp_port: int = 102) -> int:
"""Start server on a specific interface.
Args:
ip: IPV4 address where the server is located.
tcp_port: port that the server will listen on.
Raises:
:obj:`ValueError`: if the `ivp4` is not a valid IPV4
"""
if tcp_port != 102:
logger.info(f"setting server TCP port to {tcp_port}")
self.set_param(Parameter.LocalPort, tcp_port)
if not re.match(ipv4, ip):
raise ValueError(f"{ip} is invalid ipv4")
logger.info(f"starting server to {ip}:102")
return self._lib.Srv_StartTo(self._s7_server, ip.encode())
@error_wrap(context="server")
def set_param(self, parameter: Parameter, value: int) -> int:
"""Sets an internal Server object parameter.
Args:
parameter: the parameter to set
value: value to be set.
Returns:
Error code from snap7 library.
"""
logger.debug(f"setting param number {parameter} to {value}")
return self._lib.Srv_SetParam(self._s7_server, parameter, byref(c_int(value)))
@error_wrap(context="server")
def set_mask(self, kind: int, mask: int) -> int:
"""Writes the specified filter mask.
Args:
kind:
mask:
Returns:
Error code from snap7 library.
"""
logger.debug(f"setting mask kind {kind} to {mask}")
return self._lib.Srv_SetMask(self._s7_server, kind, mask)
@error_wrap(context="server")
def set_cpu_status(self, status: int) -> int:
"""Sets the Virtual CPU status.
Args:
status: :obj:`cpu_statuses` object type.
Returns:
Error code from snap7 library.
Raises:
:obj:`ValueError`: if `status` is not in :obj:`cpu_statuses`.
"""
if status not in cpu_statuses:
raise ValueError(f"The cpu state ({status}) is invalid")
logger.debug(f"setting cpu status to {status}")
return self._lib.Srv_SetCpuStatus(self._s7_server, status)
[docs]
def pick_event(self) -> Optional[SrvEvent]:
"""Extracts an event (if available) from the Events queue.
Returns:
Server event.
"""
logger.debug("checking event queue")
event = SrvEvent()
ready = c_int32()
code = self._lib.Srv_PickEvent(self._s7_server, byref(event), byref(ready))
check_error(code)
if ready:
logger.debug(f"one event ready: {event}")
return event
logger.debug("no events ready")
return None
[docs]
def get_param(self, number: int) -> int:
"""Reads an internal Server object parameter.
Args:
number: number of the parameter to be set.
Returns:
Value of the parameter.
"""
logger.debug(f"retrieving param number {number}")
value = c_int()
code = self._lib.Srv_GetParam(self._s7_server, number, byref(value))
check_error(code)
return value.value
[docs]
def get_mask(self, kind: int) -> c_uint32:
"""Reads the specified filter mask.
Args:
kind:
Returns:
Mask
"""
logger.debug(f"retrieving mask kind {kind}")
mask = longword()
code = self._lib.Srv_GetMask(self._s7_server, kind, byref(mask))
check_error(code)
return mask
@error_wrap(context="server")
def clear_events(self) -> int:
"""Empties the Event queue.
Returns:
Error code from snap7 library.
"""
logger.debug("clearing event queue")
return self._lib.Srv_ClearEvents(self._s7_server)
[docs]
def mainloop(tcp_port: int = 1102, init_standard_values: bool = False) -> None:
"""Init a fake Snap7 server with some default values.
Args:
tcp_port: port that the server will listen.
init_standard_values: if `True` will init some defaults values to be read on DB0.
"""
server = Server()
size = 100
db_data: CDataArrayType = (WordLen.Byte.ctype * size)()
pa_data: CDataArrayType = (WordLen.Byte.ctype * size)()
tm_data: CDataArrayType = (WordLen.Byte.ctype * size)()
ct_data: CDataArrayType = (WordLen.Byte.ctype * size)()
server.register_area(SrvArea.DB, 1, db_data)
server.register_area(SrvArea.PA, 1, pa_data)
server.register_area(SrvArea.TM, 1, tm_data)
server.register_area(SrvArea.CT, 1, ct_data)
if init_standard_values:
logger.info("initialising with standard values")
ba = _init_standard_values()
userdata = WordLen.Byte.ctype * len(ba)
server.register_area(SrvArea.DB, 0, userdata.from_buffer(ba))
server.start(tcp_port=tcp_port)
while True:
while True:
event = server.pick_event()
if event:
logger.info(server.event_text(event))
else:
break
time.sleep(1)
def _init_standard_values() -> bytearray:
"""Standard values
* Boolean
BYTE BIT VALUE
0 0 True
0 1 False
0 2 True
0 3 False
0 4 True
0 5 False
0 6 True
0 7 False
* Small int
BYTE VALUE
10 -128
11 0
12 100
13 127
* Unsigned small int
BYTE VALUE
20 0
21 255
* Int
BYTE VALUE
30 -32768
32 -1234
34 0
36 1234
38 32767
* Double int
BYTE VALUE
40 -2147483648
44 -32768
48 0
52 32767
56 2147483647
* Real
BYTE VALUE
60 -3.402823e38
64 -3.402823e12
68 -175494351e-38
72 -1.175494351e-12
76 0.0
80 1.175494351e-38
84 1.175494351e-12
88 3.402823466e12
92 3.402823466e38
* String
BYTE VALUE
100 254|37|the brown fox jumps over the lazy dog
* Word
BYTE VALUE
400 \x00\x00
404 \x12\x34
408 \xab\xcd
412 \xff\xff
* Double Word
BYTE VALUE
500 \x00\x00\x00\x00
508 \x12\x34\x56\x78
516 \x12\x34\xab\xcd
524 \xff\xff\xff\xff
"""
ba = bytearray(1000)
# 1. Bool 1 byte
ba[0] = 0b10101010
# 2. Small int 1 byte
ba[10 : 10 + 1] = struct.pack(">b", -128)
ba[11 : 11 + 1] = struct.pack(">b", 0)
ba[12 : 12 + 1] = struct.pack(">b", 100)
ba[13 : 13 + 1] = struct.pack(">b", 127)
# 3. Unsigned small int 1 byte
ba[20 : 20 + 1] = struct.pack("B", 0)
ba[21 : 21 + 1] = struct.pack("B", 255)
# 4. Int 2 bytes
ba[30 : 30 + 2] = struct.pack(">h", -32768)
ba[32 : 32 + 2] = struct.pack(">h", -1234)
ba[34 : 34 + 2] = struct.pack(">h", 0)
ba[36 : 36 + 2] = struct.pack(">h", 1234)
ba[38 : 38 + 2] = struct.pack(">h", 32767)
# 5. DInt 4 bytes
ba[40 : 40 + 4] = struct.pack(">i", -2147483648)
ba[44 : 44 + 4] = struct.pack(">i", -32768)
ba[48 : 48 + 4] = struct.pack(">i", 0)
ba[52 : 52 + 4] = struct.pack(">i", 32767)
ba[56 : 56 + 4] = struct.pack(">i", 2147483647)
# 6. Real 4 bytes
ba[60 : 60 + 4] = struct.pack(">f", -3.402823e38)
ba[64 : 64 + 4] = struct.pack(">f", -3.402823e12)
ba[68 : 68 + 4] = struct.pack(">f", -175494351e-38)
ba[72 : 72 + 4] = struct.pack(">f", -1.175494351e-12)
ba[76 : 76 + 4] = struct.pack(">f", 0.0)
ba[80 : 80 + 4] = struct.pack(">f", 1.175494351e-38)
ba[84 : 84 + 4] = struct.pack(">f", 1.175494351e-12)
ba[88 : 88 + 4] = struct.pack(">f", 3.402823466e12)
ba[92 : 92 + 4] = struct.pack(">f", 3.402823466e38)
# 7. String 1 byte per char
string = "the brown fox jumps over the lazy dog" # len = 37
ba[100] = 254
ba[101] = len(string)
for letter, i in zip(string, range(102, 102 + len(string) + 1)):
ba[i] = ord(letter)
# 8. WORD 4 bytes
ba[400 : 400 + 4] = b"\x00\x00"
ba[404 : 404 + 4] = b"\x12\x34"
ba[408 : 408 + 4] = b"\xab\xcd"
ba[412 : 412 + 4] = b"\xff\xff"
# # 9 DWORD 8 bytes
ba[500 : 500 + 8] = b"\x00\x00\x00\x00"
ba[508 : 508 + 8] = b"\x12\x34\x56\x78"
ba[516 : 516 + 8] = b"\x12\x34\xab\xcd"
ba[524 : 524 + 8] = b"\xff\xff\xff\xff"
return ba