"""
Snap7 server used for mimicking a siemens 7 server.
"""
import ctypes
import logging
import re
import snap7.snap7types
from snap7.common import check_error, load_library, ipv4
from snap7 import six
logger = logging.getLogger(__name__)
[docs]def error_wrap(func):
"""Parses a s7 error code returned the decorated function."""
def f(*args, **kw):
code = func(*args, **kw)
check_error(code, context="server")
return f
[docs]class Server(object):
"""
A fake S7 server.
"""
pointer = None
callback = None
library = None
def __init__(self, log=True):
"""
Create a fake S7 server. set log to false if you want to disable
event logging to python logging.
"""
self.library = load_library()
self.create()
if log:
self._set_log_callback()
def __del__(self):
self.destroy()
[docs] def event_text(self, event):
"""Returns a textual explanation of a given event object
:param event: an PSrvEvent struct object
:returns: the error string
"""
logger.debug("error text for %s" % hex(event.EvtCode))
len_ = 1024
text_type = ctypes.c_char * len_
text = text_type()
error = self.library.Srv_EventText(ctypes.byref(event),
ctypes.byref(text), len_)
check_error(error)
if six.PY2:
return text.value
else:
return text.value.decode('ascii')
[docs] def create(self):
"""
create the server.
"""
logger.info("creating server")
self.library.Srv_Create.restype = snap7.snap7types.S7Object
self.pointer = snap7.snap7types.S7Object(self.library.Srv_Create())
@error_wrap
def register_area(self, area_code, index, userdata):
"""Shares a memory area with the server. That memory block will be
visible by the clients.
"""
size = ctypes.sizeof(userdata)
logger.info("registering area %s, index %s, size %s" % (area_code,
index, size))
size = ctypes.sizeof(userdata)
return self.library.Srv_RegisterArea(self.pointer, area_code, index,
ctypes.byref(userdata), size)
@error_wrap
def set_events_callback(self, call_back):
"""Sets the user callback that the Server object has to call when an
event is created.
"""
logger.info("setting event callback")
callback_wrap = ctypes.CFUNCTYPE(None, ctypes.c_void_p,
ctypes.POINTER(snap7.snap7types.SrvEvent),
ctypes.c_int)
def wrapper(usrptr, pevent, size):
"""
Wraps python function into a ctypes function
:param usrptr: not used
:param pevent: pointer to snap7 event struct
:param size:
:returns: should return an int
"""
logger.info("callback event: " + self.event_text(pevent.contents))
call_back(pevent.contents)
return 0
self._callback = callback_wrap(wrapper)
usrPtr = ctypes.c_void_p()
return self.library.Srv_SetEventsCallback(self.pointer, self._callback, usrPtr)
@error_wrap
def set_read_events_callback(self, call_back):
"""
Sets the user callback that the Server object has to call when a Read
event is created.
:param call_back: a callback function that accepts a pevent argument.
"""
logger.info("setting read event callback")
callback_wrapper = ctypes.CFUNCTYPE(None, ctypes.c_void_p,
ctypes.POINTER(snap7.snap7types.SrvEvent),
ctypes.c_int)
def wrapper(usrptr, pevent, size):
"""
Wraps python function into a ctypes function
:param usrptr: not used
:param pevent: pointer to snap7 event struct
:param size:
:returns: should return an int
"""
logger.info("callback event: " + self.event_text(pevent.contents))
call_back(pevent.contents)
return 0
self._read_callback = callback_wrapper(wrapper)
return self.library.Srv_SetReadEventsCallback(self.pointer,
self._read_callback)
def _set_log_callback(self):
"""Sets a callback that logs the events
"""
logger.debug("setting up event logger")
def log_callback(event):
logger.info("callback event: " + self.event_text(event))
self.set_events_callback(log_callback)
@error_wrap
def start(self, tcpport=102):
"""
start the server.
"""
if tcpport != 102:
logger.info("setting server TCP port to %s" % tcpport)
self.set_param(snap7.snap7types.LocalPort, tcpport)
logger.info("starting server on 0.0.0.0:%s" % tcpport)
return self.library.Srv_Start(self.pointer)
@error_wrap
def stop(self):
"""
stop the server.
"""
logger.info("stopping server")
return self.library.Srv_Stop(self.pointer)
[docs] def destroy(self):
"""
destroy the server.
"""
logger.info("destroying server")
if self.library:
self.library.Srv_Destroy(ctypes.byref(self.pointer))
[docs] def get_status(self):
"""Reads the server status, the Virtual CPU status and the number of
the clients connected.
:returns: server status, cpu status, client count
"""
logger.debug("get server status")
server_status = ctypes.c_int()
cpu_status = ctypes.c_int()
clients_count = ctypes.c_int()
error = self.library.Srv_GetStatus(self.pointer, ctypes.byref(server_status),
ctypes.byref(cpu_status),
ctypes.byref(clients_count))
check_error(error)
logger.debug("status server %s cpu %s clients %s" %
(server_status.value, cpu_status.value,
clients_count.value))
return snap7.snap7types.server_statuses[server_status.value], \
snap7.snap7types.cpu_statuses[cpu_status.value], \
clients_count.value
@error_wrap
def unregister_area(self, area_code, index):
"""'Unshares' a memory area previously shared with Srv_RegisterArea().
That memory block will be no longer visible by the clients.
"""
return self.library.Srv_UnregisterArea(self.pointer, area_code, index)
@error_wrap
def unlock_area(self, code, index):
"""Unlocks a previously locked shared memory area.
"""
logger.debug("unlocking area code %s index %s" % (code, index))
return self.library.Srv_UnlockArea(self.pointer, code, index)
@error_wrap
def lock_area(self, code, index):
"""Locks a shared memory area.
"""
logger.debug("locking area code %s index %s" % (code, index))
return self.library.Srv_LockArea(self.pointer, code, index)
@error_wrap
def start_to(self, ip, tcpport=102):
"""
start server on a specific interface.
"""
if tcpport != 102:
logger.info("setting server TCP port to %s" % tcpport)
self.set_param(snap7.snap7types.LocalPort, tcpport)
assert re.match(ipv4, ip), '%s is invalid ipv4' % ip
logger.info("starting server to %s:102" % ip)
return self.library.Srv_Start(self.pointer, ip)
@error_wrap
def set_param(self, number, value):
"""Sets an internal Server object parameter.
"""
logger.debug("setting param number %s to %s" % (number, value))
return self.library.Srv_SetParam(self.pointer, number,
ctypes.byref(ctypes.c_int(value)))
@error_wrap
def set_mask(self, kind, mask):
"""Writes the specified filter mask.
"""
logger.debug("setting mask kind %s to %s" % (kind, mask))
return self.library.Srv_SetMask(self.pointer, kind, mask)
@error_wrap
def set_cpu_status(self, status):
"""Sets the Virtual CPU status.
"""
assert status in snap7.snap7types.cpu_statuses, 'unknown cpu state %s' % status
logger.debug("setting cpu status to %s" % status)
return self.library.Srv_SetCpuStatus(self.pointer, status)
[docs] def pick_event(self):
"""Extracts an event (if available) from the Events queue.
"""
logger.debug("checking event queue")
event = snap7.snap7types.SrvEvent()
ready = ctypes.c_int32()
code = self.library.Srv_PickEvent(self.pointer, ctypes.byref(event),
ctypes.byref(ready))
check_error(code)
if ready:
logger.debug("one event ready: %s" % event)
return event
logger.debug("no events ready")
[docs] def get_param(self, number):
"""Reads an internal Server object parameter.
"""
logger.debug("retreiving param number %s" % number)
value = ctypes.c_int()
code = self.library.Srv_GetParam(self.pointer, number,
ctypes.byref(value))
check_error(code)
return value.value
[docs] def get_mask(self, kind):
"""Reads the specified filter mask.
"""
logger.debug("retrieving mask kind %s" % kind)
mask = snap7.snap7types.longword()
code = self.library.Srv_GetMask(self.pointer, kind, ctypes.byref(mask))
check_error(code)
return mask
@error_wrap
def clear_events(self):
"""Empties the Event queue.
"""
logger.debug("clearing event queue")
return self.library.Srv_ClearEvents(self.pointer)