Source code for snap7.server

"""
Snap7 server used for mimicking a siemens 7 server.
"""
import re
import time
import ctypes
import struct
import logging
from typing import Any, Tuple, Callable, Optional

from ..common import ipv4, check_error, load_library
from ..types import SrvEvent, LocalPort, cpu_statuses, server_statuses
from ..types import longword, wordlen_to_ctypes, WordLen, S7Object
from ..types import srvAreaDB, srvAreaPA, srvAreaTM, srvAreaCT

logger = logging.getLogger(__name__)


[docs]def error_wrap(func): """Parses a s7 error code returned the decorated function.""" def f(*args, **kw): code = func(*args, **kw) check_error(code, context="server") return f
[docs]class Server: """ A fake S7 server. """
[docs] def __init__(self, log: bool = True): """Create a fake S7 server. set log to false if you want to disable event logging to python logging. Args: log: `True` for enabling the event logging. Optinoal. """ self._read_callback = None self._callback = Optional[Callable[..., Any]] self.pointer = None self.library = load_library() self.create() if log: self._set_log_callback()
def __del__(self): self.destroy()
[docs] def event_text(self, event: SrvEvent) -> str: """Returns a textual explanation of a given event object Args: event: an PSrvEvent struct object Returns: The error string """ logger.debug(f"error text for {hex(event.EvtCode)}") len_ = 1024 text_type = ctypes.c_char * len_ text = text_type() error = self.library.Srv_EventText(ctypes.byref(event), ctypes.byref(text), len_) check_error(error) return text.value.decode('ascii')
[docs] def create(self): """Create the server. """ logger.info("creating server") self.library.Srv_Create.restype = S7Object self.pointer = S7Object(self.library.Srv_Create())
@error_wrap def register_area(self, area_code: int, index: int, userdata): """Shares a memory area with the server. That memory block will be visible by the clients. Args: area_code: memory area to register. index: number of area to write. userdata: buffer with the data to write. Returns: Error code from snap7 library. """ size = ctypes.sizeof(userdata) logger.info(f"registering area {area_code}, index {index}, size {size}") return self.library.Srv_RegisterArea(self.pointer, area_code, index, ctypes.byref(userdata), size) @error_wrap def set_events_callback(self, call_back: Callable[..., Any]) -> int: """Sets the user callback that the Server object has to call when an event is created. """ logger.info("setting event callback") callback_wrap: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(SrvEvent), ctypes.c_int) def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: SrvEvent, size: int) -> int: """Wraps python function into a ctypes function Args: usrptr: not used pevent: pointer to snap7 event struct size: Returns: Should return an int """ logger.info(f"callback event: {self.event_text(pevent.contents)}") call_back(pevent.contents) return 0 self._callback = callback_wrap(wrapper) usrPtr = ctypes.c_void_p() return self.library.Srv_SetEventsCallback(self.pointer, self._callback, usrPtr) @error_wrap def set_read_events_callback(self, call_back: Callable[..., Any]): """Sets the user callback that the Server object has to call when a Read event is created. Args: call_back: a callback function that accepts a pevent argument. """ logger.info("setting read event callback") callback_wrapper: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(SrvEvent), ctypes.c_int) def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: SrvEvent, size: int) -> int: """Wraps python function into a ctypes function Args: usrptr: not used pevent: pointer to snap7 event struct size: Returns: Should return an int """ logger.info(f"callback event: {self.event_text(pevent.contents)}") call_back(pevent.contents) return 0 self._read_callback = callback_wrapper(wrapper) return self.library.Srv_SetReadEventsCallback(self.pointer, self._read_callback) def _set_log_callback(self): """Sets a callback that logs the events""" logger.debug("setting up event logger") def log_callback(event): logger.info(f"callback event: {self.event_text(event)}") self.set_events_callback(log_callback) @error_wrap def start(self, tcpport: int = 102): """Starts the server. Args: tcpport: port that the server will listen. Optional. """ if tcpport != 102: logger.info(f"setting server TCP port to {tcpport}") self.set_param(LocalPort, tcpport) logger.info(f"starting server on 0.0.0.0:{tcpport}") return self.library.Srv_Start(self.pointer) @error_wrap def stop(self): """Stop the server.""" logger.info("stopping server") return self.library.Srv_Stop(self.pointer)
[docs] def destroy(self): """Destroy the server.""" logger.info("destroying server") if self.library: self.library.Srv_Destroy(ctypes.byref(self.pointer))
[docs] def get_status(self) -> Tuple[str, str, int]: """Reads the server status, the Virtual CPU status and the number of the clients connected. Returns: Server status, cpu status, client count """ logger.debug("get server status") server_status = ctypes.c_int() cpu_status = ctypes.c_int() clients_count = ctypes.c_int() error = self.library.Srv_GetStatus(self.pointer, ctypes.byref(server_status), ctypes.byref(cpu_status), ctypes.byref(clients_count)) check_error(error) logger.debug(f"status server {server_status.value} cpu {cpu_status.value} clients {clients_count.value}") return ( server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value )
@error_wrap def unregister_area(self, area_code: int, index: int): """'Unshares' a memory area previously shared with Srv_RegisterArea(). Notes: That memory block will be no longer visible by the clients. Args: area_code: memory area. index: number of the memory area. Returns: Error code from snap7 library. """ return self.library.Srv_UnregisterArea(self.pointer, area_code, index) @error_wrap def unlock_area(self, code: int, index: int): """Unlocks a previously locked shared memory area. Args: code: memory area. index: number of the memory area. Returns: Error code from snap7 library. """ logger.debug(f"unlocking area code {code} index {index}") return self.library.Srv_UnlockArea(self.pointer, code, index) @error_wrap def lock_area(self, code: int, index: int): """Locks a shared memory area. Args: code: memory area. index: number of the memory area. Returns: Error code from snap7 library. """ logger.debug(f"locking area code {code} index {index}") return self.library.Srv_LockArea(self.pointer, code, index) @error_wrap def start_to(self, ip: str, tcpport: int = 102): """Start server on a specific interface. Args: ip: IPV4 address where the server is located. tcpport: port that the server will listening. Raises: :obj:`ValueError`: if the `ivp4` is not a valid IPV4 """ if tcpport != 102: logger.info(f"setting server TCP port to {tcpport}") self.set_param(LocalPort, tcpport) if not re.match(ipv4, ip): raise ValueError(f"{ip} is invalid ipv4") logger.info(f"starting server to {ip}:102") return self.library.Srv_StartTo(self.pointer, ip.encode()) @error_wrap def set_param(self, number: int, value: int): """Sets an internal Server object parameter. Args: number: number of the parameter. value: value to be set. Returns: Error code from snap7 library. """ logger.debug(f"setting param number {number} to {value}") return self.library.Srv_SetParam(self.pointer, number, ctypes.byref(ctypes.c_int(value))) @error_wrap def set_mask(self, kind: int, mask: int): """Writes the specified filter mask. Args: kind: mask: Returns: Error code from snap7 library. """ logger.debug(f"setting mask kind {kind} to {mask}") return self.library.Srv_SetMask(self.pointer, kind, mask) @error_wrap def set_cpu_status(self, status: int): """Sets the Virtual CPU status. Args: status: :obj:`cpu_statuses` object type. Returns: Error code from snap7 library. Raises: :obj:`ValueError`: if `status` is not in :obj:`cpu_statuses`. """ if status not in cpu_statuses: raise ValueError(f"The cpu state ({status}) is invalid") logger.debug(f"setting cpu status to {status}") return self.library.Srv_SetCpuStatus(self.pointer, status)
[docs] def pick_event(self) -> Optional[SrvEvent]: """Extracts an event (if available) from the Events queue. Returns: Server event. """ logger.debug("checking event queue") event = SrvEvent() ready = ctypes.c_int32() code = self.library.Srv_PickEvent(self.pointer, ctypes.byref(event), ctypes.byref(ready)) check_error(code) if ready: logger.debug(f"one event ready: {event}") return event logger.debug("no events ready") return None
[docs] def get_param(self, number) -> int: """Reads an internal Server object parameter. Args: number: number of the parameter to be set. Returns: Value of the parameter. """ logger.debug(f"retreiving param number {number}") value = ctypes.c_int() code = self.library.Srv_GetParam(self.pointer, number, ctypes.byref(value)) check_error(code) return value.value
[docs] def get_mask(self, kind: int) -> ctypes.c_uint32: """Reads the specified filter mask. Args: kind: Returns: Mask """ logger.debug(f"retrieving mask kind {kind}") mask = longword() code = self.library.Srv_GetMask(self.pointer, kind, ctypes.byref(mask)) check_error(code) return mask
@error_wrap def clear_events(self) -> int: """Empties the Event queue. Returns: Error code from snap7 library. """ logger.debug("clearing event queue") return self.library.Srv_ClearEvents(self.pointer)
[docs]def mainloop(tcpport: int = 1102, init_standard_values: bool = False): """Init a fake Snap7 server with some default values. Args: tcpport: port that the server will listen. init_standard_values: if `True` will init some defaults values to be read on DB0. """ server = Server() size = 100 DBdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() PAdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() TMdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() CTdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() server.register_area(srvAreaDB, 1, DBdata) server.register_area(srvAreaPA, 1, PAdata) server.register_area(srvAreaTM, 1, TMdata) server.register_area(srvAreaCT, 1, CTdata) if init_standard_values: ba = _init_standard_values() DBdata = wordlen_to_ctypes[WordLen.Byte.value] * len(ba) DBdata = DBdata.from_buffer(ba) server.register_area(srvAreaDB, 0, DBdata) server.start(tcpport=tcpport) while True: while True: event = server.pick_event() if event: logger.info(server.event_text(event)) else: break time.sleep(1)
def _init_standard_values() -> bytearray: ''' Standard values * Boolean BYTE BIT VALUE 0 0 True 0 1 False 0 2 True 0 3 False 0 4 True 0 5 False 0 6 True 0 7 False * Small int BYTE VALUE 10 -128 11 0 12 100 13 127 * Unsigned small int BYTE VALUE 20 0 21 255 * Int BYTE VALUE 30 -32768 32 -1234 34 0 36 1234 38 32767 * Double int BYTE VALUE 40 -2147483648 44 -32768 48 0 52 32767 56 2147483647 * Real BYTE VALUE 60 -3.402823e38 64 -3.402823e12 68 -175494351e-38 72 -1.175494351e-12 76 0.0 80 1.175494351e-38 84 1.175494351e-12 88 3.402823466e12 92 3.402823466e38 * String BYTE VALUE 100 254|37|the brown fox jumps over the lazy dog * Word BYTE VALUE 400 \x00\x00 404 \x12\x34 408 \xAB\xCD 412 \xFF\xFF * Double Word BYTE VALUE 500 \x00\x00\x00\x00 508 \x12\x34\x56\x78 516 \x12\x34\xAB\xCD 524 \xFF\xFF\xFF\xFF ''' ba = bytearray(1000) # 1. Bool 1 byte ba[0] = 0b10101010 # 2. Small int 1 byte ba[10:10 + 1] = struct.pack(">b", -128) ba[11:11 + 1] = struct.pack(">b", 0) ba[12:12 + 1] = struct.pack(">b", 100) ba[13:13 + 1] = struct.pack(">b", 127) # 3. Unsigned small int 1 byte ba[20:20 + 1] = struct.pack("B", 0) ba[21:21 + 1] = struct.pack("B", 255) # 4. Int 2 bytes ba[30:30 + 2] = struct.pack(">h", -32768) ba[32:32 + 2] = struct.pack(">h", -1234) ba[34:34 + 2] = struct.pack(">h", 0) ba[36:36 + 2] = struct.pack(">h", 1234) ba[38:38 + 2] = struct.pack(">h", 32767) # 5. DInt 4 bytes ba[40:40 + 4] = struct.pack(">i", -2147483648) ba[44:44 + 4] = struct.pack(">i", -32768) ba[48:48 + 4] = struct.pack(">i", 0) ba[52:52 + 4] = struct.pack(">i", 32767) ba[56:56 + 4] = struct.pack(">i", 2147483647) # 6. Real 4 bytes ba[60:60 + 4] = struct.pack(">f", -3.402823e38) ba[64:64 + 4] = struct.pack(">f", -3.402823e12) ba[68:68 + 4] = struct.pack(">f", -175494351e-38) ba[72:72 + 4] = struct.pack(">f", -1.175494351e-12) ba[76:76 + 4] = struct.pack(">f", 0.0) ba[80:80 + 4] = struct.pack(">f", 1.175494351e-38) ba[84:84 + 4] = struct.pack(">f", 1.175494351e-12) ba[88:88 + 4] = struct.pack(">f", 3.402823466e12) ba[92:92 + 4] = struct.pack(">f", 3.402823466e38) # 7. String 1 byte per char string = "the brown fox jumps over the lazy dog" # len = 37 ba[100] = 254 ba[101] = len(string) for letter, i in zip(string, range(102, 102 + len(string) + 1)): ba[i] = ord(letter) # 8. WORD 4 bytes ba[400:400 + 4] = b"\x00\x00" ba[404:404 + 4] = b"\x12\x34" ba[408:408 + 4] = b"\xAB\xCD" ba[412:412 + 4] = b"\xFF\xFF" # # 9 DWORD 8 bytes ba[500:500 + 8] = b"\x00\x00\x00\x00" ba[508:508 + 8] = b"\x12\x34\x56\x78" ba[516:516 + 8] = b"\x12\x34\xAB\xCD" ba[524:524 + 8] = b"\xFF\xFF\xFF\xFF" return ba