"""
Python equivalent for snap7 specific types.
"""
from _ctypes import Array
from ctypes import (
c_int16,
c_int8,
c_int32,
c_void_p,
c_ubyte,
c_uint64,
c_uint16,
c_uint32,
Structure,
POINTER,
c_char,
c_byte,
c_int,
c_uint8,
)
from datetime import datetime, date, timedelta
from enum import IntEnum
from typing import Dict, Union, Literal
CDataArrayType = Union[
Array[c_byte], Array[c_int], Array[c_int16], Array[c_int32], Array[c_uint8], Array[c_uint16], Array[c_uint32]
]
CDataType = Union[type[c_int8], type[c_int16], type[c_int32], type[c_uint8], type[c_uint16], type[c_uint32]]
ValueType = Union[int, float, str, datetime, bytearray, bytes, date, timedelta]
Context = Literal["client", "server", "partner"]
S7Object = c_void_p
buffer_size = 65536
# noinspection PyTypeChecker
buffer_type = c_ubyte * buffer_size
time_t = c_uint64
word = c_uint16
longword = c_uint32
# mask types
mkEvent = 0
mkLog = 1
[docs]
class Parameter(IntEnum):
"""
The snap7 parameter types
"""
LocalPort = 1
RemotePort = 2
PingTimeout = 3
SendTimeout = 4
RecvTimeout = 5
WorkInterval = 6
SrcRef = 7
DstRef = 8
SrcTSap = 9
PDURequest = 10
MaxClients = 11
BSendTimeout = 12
BRecvTimeout = 13
RecoveryTime = 14
KeepAliveTime = 15
@property
def ctype(self) -> CDataType:
map_: Dict[int, CDataType] = {
self.LocalPort: c_uint16,
self.RemotePort: c_uint16,
self.PingTimeout: c_int32,
self.SendTimeout: c_int32,
self.RecvTimeout: c_int32,
self.WorkInterval: c_int32,
self.SrcRef: c_uint16,
self.DstRef: c_uint16,
self.SrcTSap: c_uint16,
self.PDURequest: c_int32,
self.MaxClients: c_int32,
self.BSendTimeout: c_int32,
self.BRecvTimeout: c_int32,
self.RecoveryTime: c_uint32,
self.KeepAliveTime: c_uint32,
}
return map_[self]
# Area ID
# Word Length
[docs]
class WordLen(IntEnum):
"""
The snap7 word length types
"""
Bit = 0x01
Byte = 0x02
Char = 0x03
Word = 0x04
Int = 0x05
DWord = 0x06
DInt = 0x07
Real = 0x08
Counter = 0x1C
Timer = 0x1D
@property
def ctype(self) -> CDataType:
map_: Dict[WordLen, CDataType] = {
WordLen.Bit: c_int16,
WordLen.Byte: c_int8,
WordLen.Word: c_int16,
WordLen.DWord: c_int32,
WordLen.Real: c_int32,
WordLen.Counter: c_int16,
WordLen.Timer: c_int16,
}
return map_[self]
[docs]
class Area(IntEnum):
"""
The snap7 area types
"""
PE = 0x81
PA = 0x82
MK = 0x83
DB = 0x84
CT = 0x1C
TM = 0x1D
def wordlen(self) -> WordLen:
if self == Area.TM:
return WordLen.Timer
elif self == Area.CT:
return WordLen.Counter
return WordLen.Byte
# backwards compatible alias
Areas = Area
[docs]
class SrvArea(IntEnum):
"""
The snap7 server area types
NOTE: these values are DIFFERENT from the normal area IDs.
"""
PE = 0
PA = 1
MK = 2
CT = 3
TM = 4
DB = 5
[docs]
class Block(IntEnum):
"""
The snap7 block type
"""
OB = 0x38
DB = 0x41
SDB = 0x42
FC = 0x43
SFC = 0x44
FB = 0x45
SFB = 0x46
@property
def ctype(self) -> c_int:
return c_int(self)
server_statuses = {
0: "SrvStopped",
1: "SrvRunning",
2: "SrvError",
}
cpu_statuses = {
0: "S7CpuStatusUnknown",
4: "S7CpuStatusStop",
8: "S7CpuStatusRun",
}
[docs]
class SrvEvent(Structure):
"""
The snap7 server event structure
"""
_fields_ = [
("EvtTime", time_t),
("EvtSender", c_int),
("EvtCode", longword),
("EvtRetCode", word),
("EvtParam1", word),
("EvtParam2", word),
("EvtParam3", word),
("EvtParam4", word),
]
def __str__(self) -> str:
return (
f"<event time: {self.EvtTime} sender: {self.EvtSender} code: {self.EvtCode} "
f"return code: {self.EvtRetCode} param1: {self.EvtParam1} param2:{self.EvtParam2} "
f"param3: {self.EvtParam3} param4: {self.EvtParam4}>"
)
[docs]
class BlocksList(Structure):
"""
The snap7 block list structure
"""
_fields_ = [
("OBCount", c_int32),
("FBCount", c_int32),
("FCCount", c_int32),
("SFBCount", c_int32),
("SFCCount", c_int32),
("DBCount", c_int32),
("SDBCount", c_int32),
]
def __str__(self) -> str:
return (
f"<block list count OB: {self.OBCount} FB: {self.FBCount} FC: {self.FCCount} SFB: {self.SFBCount} "
f"SFC: {hex(self.SFCCount)} DB: {self.DBCount} SDB: {self.SDBCount}>"
)
# noinspection PyTypeChecker
[docs]
class TS7BlockInfo(Structure):
_fields_ = [
("BlkType", c_int32),
("BlkNumber", c_int32),
("BlkLang", c_int32),
("BlkFlags", c_int32),
("MC7Size", c_int32),
("LoadSize", c_int32),
("LocalData", c_int32),
("SBBLength", c_int32),
("CheckSum", c_int32),
("Version", c_int32),
("CodeDate", c_char * 11),
("IntfDate", c_char * 11),
("Author", c_char * 9),
("Family", c_char * 9),
("Header", c_char * 9),
]
def __str__(self) -> str:
return f"""\
Block type: {self.BlkType}
Block number: {self.BlkNumber}
Block language: {self.BlkLang}
Block flags: {self.BlkFlags}
MC7Size: {self.MC7Size}
Load memory size: {self.LoadSize}
Local data: {self.LocalData}
SBB Length: {self.SBBLength}
Checksum: {self.CheckSum}
Version: {self.Version}
Code date: {self.CodeDate}
Interface date: {self.IntfDate}
Author: {self.Author}
Family: {self.Family}
Header: {self.Header}"""
class S7DataItem(Structure):
""" """
_pack_ = 1
_layout_ = "ms"
_fields_ = [
("Area", c_int32),
("WordLen", c_int32),
("Result", c_int32),
("DBNumber", c_int32),
("Start", c_int32),
("Amount", c_int32),
("pData", POINTER(c_uint8)),
]
def __str__(self) -> str:
return (
f"<S7DataItem Area: {self.Area} WordLen: {self.WordLen} Result: {self.Result} "
f"DBNumber: {self.DBNumber} Start: {self.Start} Amount: {self.Amount} pData: {self.pData}>"
)
# noinspection PyTypeChecker
[docs]
class S7CpuInfo(Structure):
"""
S7CpuInfo class for handling CPU with :
- ModuleTypeName => Model of S7-CPU
- SerialNumber => SN of the S7-CPU
- ASName => Family Class of the S7-CPU
- Copyright => Siemens Copyright
- ModuleName => TIA project name or for other S7-CPU, same as ModuleTypeName
"""
_fields_ = [
("ModuleTypeName", c_char * 33),
("SerialNumber", c_char * 25),
("ASName", c_char * 25),
("Copyright", c_char * 27),
("ModuleName", c_char * 25),
]
def __str__(self) -> str:
return (
f"<S7CpuInfo ModuleTypeName: {self.ModuleTypeName} SerialNumber: {self.SerialNumber} "
f"ASName: {self.ASName} Copyright: {self.Copyright} ModuleName: {self.ModuleName}>"
)
[docs]
class S7SZL(Structure):
"""See §33.1 of System Software for S7-300/400 System and Standard Functions"""
_fields_ = [("Header", S7SZLHeader), ("Data", c_ubyte * (0x4000 - 4))]
def __str__(self) -> str:
return f"<S7SZL Header: {self.S7SZHeader}, Data: {self.Data}>"
[docs]
class S7SZLList(Structure):
_fields_ = [("Header", S7SZLHeader), ("List", word * (0x4000 - 2))]
[docs]
class S7OrderCode(Structure):
_fields_ = [("OrderCode", c_char * 21), ("V1", c_ubyte), ("V2", c_ubyte), ("V3", c_ubyte)]
[docs]
class S7CpInfo(Structure):
"""
S7 Cp class for Communication Information :
- MaxPduLength => Size of the maximum PDU length in bytes
- MaxConnections => Max connection allowed to S7-CPU or Server
- MaxMpiRate => MPI rate (MPI use is deprecated)
- MaxBusRate => Profibus rate
Every data packet exchanged with a PLC must fit within the PDU size,
whose is fixed from 240 up to 960 bytes.
"""
_fields_ = [
("MaxPduLength", c_uint16),
("MaxConnections", c_uint16),
("MaxMpiRate", c_uint16),
("MaxBusRate", c_uint16),
]
def __str__(self) -> str:
return (
f"<S7CpInfo MaxPduLength: {self.MaxPduLength} MaxConnections: {self.MaxConnections} "
f"MaxMpiRate: {self.MaxMpiRate} MaxBusRate: {self.MaxBusRate}>"
)
[docs]
class S7Protection(Structure):
"""See §33.19 of System Software for S7-300/400 System and Standard Functions"""
_fields_ = [
("sch_schal", word),
("sch_par", word),
("sch_rel", word),
("bart_sch", word),
("anl_sch", word),
]