"""
Snap7 server used for mimicking a siemens 7 server.
"""
import logging
import re
import time
import ctypes
from typing import Tuple, Optional, Callable, Any
import snap7
import snap7.types
from snap7.common import check_error, load_library, ipv4
logger = logging.getLogger(__name__)
[docs]def error_wrap(func):
"""Parses a s7 error code returned the decorated function."""
def f(*args, **kw):
code = func(*args, **kw)
check_error(code, context="server")
return f
[docs]class Server:
"""
A fake S7 server.
"""
def __init__(self, log: bool = True):
"""
Create a fake S7 server. set log to false if you want to disable
event logging to python logging.
"""
self._read_callback = None
self._callback = Optional[Callable[..., Any]]
self.pointer = None
self.library = load_library()
self.create()
if log:
self._set_log_callback()
def __del__(self):
self.destroy()
[docs] def event_text(self, event: snap7.types.SrvEvent) -> str:
"""Returns a textual explanation of a given event object
:param event: an PSrvEvent struct object
:returns: the error string
"""
logger.debug(f"error text for {hex(event.EvtCode)}")
len_ = 1024
text_type = ctypes.c_char * len_
text = text_type()
error = self.library.Srv_EventText(ctypes.byref(event),
ctypes.byref(text), len_)
check_error(error)
return text.value.decode('ascii')
[docs] def create(self):
"""
create the server.
"""
logger.info("creating server")
self.library.Srv_Create.restype = snap7.types.S7Object
self.pointer = snap7.types.S7Object(self.library.Srv_Create())
@error_wrap
def register_area(self, area_code: int, index: int, userdata):
"""Shares a memory area with the server. That memory block will be
visible by the clients.
"""
size = ctypes.sizeof(userdata)
logger.info(f"registering area {area_code}, index {index}, size {size}")
size = ctypes.sizeof(userdata)
return self.library.Srv_RegisterArea(self.pointer, area_code, index, ctypes.byref(userdata), size)
@error_wrap
def set_events_callback(self, call_back: Callable[..., Any]) -> int:
"""Sets the user callback that the Server object has to call when an
event is created.
"""
logger.info("setting event callback")
callback_wrap: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(snap7.types.SrvEvent), ctypes.c_int)
def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: snap7.types.SrvEvent, size: int) -> int:
"""
Wraps python function into a ctypes function
:param usrptr: not used
:param pevent: pointer to snap7 event struct
:param size:
:returns: should return an int
"""
logger.info(f"callback event: {self.event_text(pevent.contents)}")
call_back(pevent.contents)
return 0
self._callback = callback_wrap(wrapper)
usrPtr = ctypes.c_void_p()
return self.library.Srv_SetEventsCallback(self.pointer, self._callback, usrPtr)
@error_wrap
def set_read_events_callback(self, call_back: Callable[..., Any]):
"""
Sets the user callback that the Server object has to call when a Read
event is created.
:param call_back: a callback function that accepts a pevent argument.
"""
logger.info("setting read event callback")
callback_wrapper: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p,
ctypes.POINTER(snap7.types.SrvEvent),
ctypes.c_int)
def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: snap7.types.SrvEvent, size: int) -> int:
"""
Wraps python function into a ctypes function
:param usrptr: not used
:param pevent: pointer to snap7 event struct
:param size:
:returns: should return an int
"""
logger.info(f"callback event: {self.event_text(pevent.contents)}")
call_back(pevent.contents)
return 0
self._read_callback = callback_wrapper(wrapper)
return self.library.Srv_SetReadEventsCallback(self.pointer,
self._read_callback)
def _set_log_callback(self):
"""Sets a callback that logs the events
"""
logger.debug("setting up event logger")
def log_callback(event):
logger.info(f"callback event: {self.event_text(event)}")
self.set_events_callback(log_callback)
@error_wrap
def start(self, tcpport: int = 102):
"""
start the server.
"""
if tcpport != 102:
logger.info(f"setting server TCP port to {tcpport}")
self.set_param(snap7.types.LocalPort, tcpport)
logger.info(f"starting server on 0.0.0.0:{tcpport}")
return self.library.Srv_Start(self.pointer)
@error_wrap
def stop(self):
"""
stop the server.
"""
logger.info("stopping server")
return self.library.Srv_Stop(self.pointer)
[docs] def destroy(self):
"""
destroy the server.
"""
logger.info("destroying server")
if self.library:
self.library.Srv_Destroy(ctypes.byref(self.pointer))
[docs] def get_status(self) -> Tuple[str, str, int]:
"""Reads the server status, the Virtual CPU status and the number of
the clients connected.
:returns: server status, cpu status, client count
"""
logger.debug("get server status")
server_status = ctypes.c_int()
cpu_status = ctypes.c_int()
clients_count = ctypes.c_int()
error = self.library.Srv_GetStatus(self.pointer, ctypes.byref(server_status),
ctypes.byref(cpu_status),
ctypes.byref(clients_count))
check_error(error)
logger.debug(f"status server {server_status.value} cpu {cpu_status.value} clients {clients_count.value}")
return (
snap7.types.server_statuses[server_status.value],
snap7.types.cpu_statuses[cpu_status.value],
clients_count.value
)
@error_wrap
def unregister_area(self, area_code: int, index: int):
"""'Unshares' a memory area previously shared with Srv_RegisterArea().
That memory block will be no longer visible by the clients.
"""
return self.library.Srv_UnregisterArea(self.pointer, area_code, index)
@error_wrap
def unlock_area(self, code: int, index: int):
"""Unlocks a previously locked shared memory area.
"""
logger.debug(f"unlocking area code {code} index {index}")
return self.library.Srv_UnlockArea(self.pointer, code, index)
@error_wrap
def lock_area(self, code: int, index: int):
"""Locks a shared memory area.
"""
logger.debug(f"locking area code {code} index {index}")
return self.library.Srv_LockArea(self.pointer, code, index)
@error_wrap
def start_to(self, ip: str, tcpport: int = 102):
"""
start server on a specific interface.
"""
if tcpport != 102:
logger.info(f"setting server TCP port to {tcpport}")
self.set_param(snap7.types.LocalPort, tcpport)
assert re.match(ipv4, ip), f'{ip} is invalid ipv4'
logger.info(f"starting server to {ip}:102")
return self.library.Srv_StartTo(self.pointer, ip)
@error_wrap
def set_param(self, number: int, value: int):
"""Sets an internal Server object parameter.
"""
logger.debug(f"setting param number {number} to {value}")
return self.library.Srv_SetParam(self.pointer, number,
ctypes.byref(ctypes.c_int(value)))
@error_wrap
def set_mask(self, kind: int, mask: int):
"""Writes the specified filter mask.
"""
logger.debug(f"setting mask kind {kind} to {mask}")
return self.library.Srv_SetMask(self.pointer, kind, mask)
@error_wrap
def set_cpu_status(self, status: int):
"""Sets the Virtual CPU status.
"""
assert status in snap7.types.cpu_statuses, f'unknown cpu state {status}'
logger.debug(f"setting cpu status to {status}")
return self.library.Srv_SetCpuStatus(self.pointer, status)
[docs] def pick_event(self) -> Optional[snap7.types.SrvEvent]:
"""Extracts an event (if available) from the Events queue.
"""
logger.debug("checking event queue")
event = snap7.types.SrvEvent()
ready = ctypes.c_int32()
code = self.library.Srv_PickEvent(self.pointer, ctypes.byref(event),
ctypes.byref(ready))
check_error(code)
if ready:
logger.debug(f"one event ready: {event}")
return event
logger.debug("no events ready")
return None
[docs] def get_param(self, number) -> int:
"""Reads an internal Server object parameter.
"""
logger.debug(f"retreiving param number {number}")
value = ctypes.c_int()
code = self.library.Srv_GetParam(self.pointer, number,
ctypes.byref(value))
check_error(code)
return value.value
[docs] def get_mask(self, kind: int) -> ctypes.c_uint32:
"""Reads the specified filter mask.
"""
logger.debug(f"retrieving mask kind {kind}")
mask = snap7.types.longword()
code = self.library.Srv_GetMask(self.pointer, kind, ctypes.byref(mask))
check_error(code)
return mask
@error_wrap
def clear_events(self) -> int:
"""Empties the Event queue.
"""
logger.debug("clearing event queue")
return self.library.Srv_ClearEvents(self.pointer)
def mainloop(tcpport: int = 1102):
server = snap7.server.Server()
size = 100
DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
PAdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
TMdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
CTdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
server.register_area(snap7.types.srvAreaDB, 1, DBdata)
server.register_area(snap7.types.srvAreaPA, 1, PAdata)
server.register_area(snap7.types.srvAreaTM, 1, TMdata)
server.register_area(snap7.types.srvAreaCT, 1, CTdata)
server.start(tcpport=tcpport)
while True:
while True:
event = server.pick_event()
if event:
logger.info(server.event_text(event))
else:
break
time.sleep(1)