Die Subscription, die das KT versucht zu setzen besteht aus einer SOAP Anfrage mit Mandant, Clientsystem und Arbeitsplatz.
Das System sagt, dass sie übermittelten Parameter ungültig sind.
Woran kann das liegen? Ins blaue geraten: Vermutlich ein Problem mit dem Infomodell. Es besteht ggf. keine Verknüpfung mehr zwischen Arbeitsplatz und dem KT, oder es fehlt der CSAPS Eintrag für den Arbeitsplatz im Infomodell.
Da sollte Red Medical mal schauen, ob alles da ist.
Turbomed hat seine eigenarten mit der Groß- und Kleinschreibung, d.h. im Konnektor müssen soweit ich weiß alle Arbeitsplätze in "Capslock" geschrieben sein.
Es gibt zumindest bei Koco-Boxen auch Slot-Parameter, an denen definiert ist, wo Karten stecken dürfen. Wenn es das bei Rise auch gibt, sind ggf. dort nicht alle Slots angehakt?
Ich hatte mal ein Tool geschrieben, um die aktiven Subscriptions via SOAP auf dem Konnektor abzufragen. Hilft vermutlich hier nicht so ganz. Könnte dir aber z.B. dabei helfen zu prüfen, ob Mandant, Clientsystem und Arbeitsplatz korrekt im Infomodell angelegt sind.
Du kannst natürlich auch mal ein Wireshark laufen lassen und versuchen die SOAP Nachricht mitzuschneiden. Wenn du die entschlüsselt bekommst, würdest du ja sehen, welche Parameter in der Anfrage gesetzt sind.
Code: Alles auswählen
#!/usr/bin/env python3
#2024-11-11 by Christian Krause
import requests, xml.dom.minidom, ssl, sys, re, argparse, urllib3, threading
from socket import timeout
from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
################## Base Functions #########################
def main(host, a):
K = Kon(host, a.mandant, a.clientsystem, a.arbeitsplatz, a.authMethod, a.proxy, a.mandantwide)
K.getSubscription(a.filter, a.write)
def parseArgs():
p = argparse.ArgumentParser(description='Konnektor2CheckMK')
p.add_argument('-v', '--version', action='version', version='%(prog)s 1.0 mit API-Version: 1.1')
p.add_argument('-d', '--debug', action='store_true', help='No Sanity-Check. Just do it!')
p.add_argument('-p', '--proxy', action='store_true', help='Do not ignore system Proxy')
p.add_argument('-i', '--ip', dest='ipArray', nargs='+', metavar=("<IP/IP-Range/IP-Ranges>"), required=True, help='IP or IP-Range of Konnektor: 10.1.1.17 or 10.1.1.20-35')
p.add_argument('-m', '--mandant', required=True, metavar=("<Mandant>"))
p.add_argument('-c', '--clientsystem', required=True, metavar=("<Clientsystem>"))
p.add_argument('-a', '--arbeitsplatz', required=True, metavar=("<Arbeitsplatz>"))
p.add_argument('-w', '--mandantwide', action='store_true', help='Set Mandantwide Option')
p.add_argument('-s', '--authmethod', dest='authMethod', default="https", nargs='+', metavar=("<Auth Mechanism> [FileName] or <Auth Mechanism [<User> <Pass>]"), help='Auth-Mechanism: http, https, auth, cert')
p.add_argument('-f', '--filter', metavar=("<String>"), help='Filter for special String')
p.add_argument('-o', '--write', action='store_true', help='Write output to disk <ip>.log')
return p.parse_args()
def initializeDicts():
# {"inputVar": ["Fehlermeldung", "Prüf-Regex"]}
global sanitizingDict
sanitizingDict = {
"ipArray": ["Ungültige IP-Adresse/IPRange! Range z.B. 192.168.0.17-24", "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(-25[0-5]|-2[0-4][0-9]|-1[0-9][0-9]|-[1-9]?[0-9])?$"],
"mandant": ["Mandant: Ungültige Zeichen oder Zeichenkette länger als 64 Zeichen! Gültige Zeichen: A-Z a-z 0-9 öÖäÄüÜß @#_-§%+:=.", "^[a-zA-Z0-9öÖäÄüÜß@#_\-§%+:=\.]{1,64}$"],
"clientsystem": ["ClientSystem: Ungültige Zeichen oder Zeichenkette länger als 64 Zeichen! Gültige Zeichen: A-Z a-z 0-9 öÖäÄüÜß @#_-§%+:=.", "^[a-zA-Z0-9öÖäÄüÜß@#_\-§%+:=\.]{1,64}$"],
"arbeitsplatz": ["Arbeitsplatz: Ungültige Zeichen oder Zeichenkette länger als 64 Zeichen! Gültige Zeichen: A-Z a-z 0-9 öÖäÄüÜß @#_-§%+:=.", "^[a-zA-Z0-9öÖäÄüÜß@#_\-§%+:=\.]{1,64}$"],
"authMethod": ["", "^(http|https|cert|auth)"],
}
global soapBody
soapBody = """
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v7="http://ws.gematik.de/conn/EventService/v7.2" xmlns:v2="http://ws.gematik.de/conn/ConnectorContext/v2.0" xmlns:v5="http://ws.gematik.de/conn/ConnectorCommon/v5.0">
<soapenv:Header/>
<soapenv:Body>
<v7:GetSubscription mandant-wide="%s">
<v2:Context>
<v5:MandantId>%s</v5:MandantId>
<v5:ClientSystemId>%s</v5:ClientSystemId>
<v5:WorkplaceId>%s</v5:WorkplaceId>
</v2:Context>
</v7:GetSubscription>
</soapenv:Body>
</soapenv:Envelope>
"""
def ipRange(ipRanges):
ipArray = []
for ipRange in ipRanges:
if re.search('-', ipRange):
fromIP, toOctet = ipRange.rsplit("-", 1)
ipBase, fromOctet = fromIP.rsplit(".", 1)
if int(toOctet) <= int(fromOctet):
print('"Bis"-Angabe im Konnektor-Range muss größer sein als "Von"-Angabe!')
sys.exit()
ipArray += [ipBase + "." + str(i) for i in range(int(fromOctet), int(toOctet) + 1, 1)]
else:
ipArray.append(ipRange)
return ipArray
def Sanitizing():
for inputVar in vars(args):
if not (inputValue := getattr(args, inputVar)):
continue
elif not type(inputValue) == list:
inputValue = [inputValue]
# Prüfe Regex für authMethod Parameter
if inputVar == "authMethod":
message, regex = sanitizingDict.get(inputVar)
if not (re.search(regex, inputValue[0])):
print("Ungültige Authentifizierungsmethode: http, https, auth, cert")
return False
if inputValue[0] == "auth" and not len(inputValue) == 3:
print("Bitte Benutzername und Kennwort als Parameter eingeben!")
return False
elif inputValue[0] == "cert" and not len(inputValue) == 2:
print("Bitte Pfad der Zertifikatsdatei angeben (Dateiname: globalcert.pem und globalcert.txt)")
return False
# Prüfe Regex für alles andere
elif inputVar in sanitizingDict.keys():
message, regex = sanitizingDict.get(inputVar)
for inputString in inputValue:
if not (re.search(regex, inputString)):
print(message)
return False
return True
################## Kon Class #########################
class Kon:
def __init__(self, host, Mandant, ClientSystem, Arbeitsplatz, authMethod, Proxy, MandantWide):
self.host = host
self.Mandant = Mandant
self.MandantWide = "true" if MandantWide else "false"
self.ClientSystem = ClientSystem
self.Arbeitsplatz = Arbeitsplatz
self.headers = {}
self.session = requests.session()
self.session.trust_env = Proxy
# Set Auth Parameter
self.URL = f'https://{host}:443'
self.cert = None
self.Auth = None
if authMethod[0] == 'http':
self.URL = f'http://{host}:80'
elif authMethod[0] == "cert":
self.cert = authMethod[1]
elif authMethod[0] == "auth":
self.Auth = f'{authMethod[1]}:{authMethod[2]}'
self.Auth = "'Basic ' + str(b64encode(self.Auth.encode()))"
def konComm(self, Service, data=None):
Message = ""
try:
if self.Auth: self.headers.update({"Authorization": self.Auth})
self.response =\
self.session.post(f'{self.URL}{Service}', timeout=6, headers=self.headers, verify=False, cert=self.cert, data=data) if data else\
self.session.get(f'{self.URL}{Service}', timeout=6, headers=self.headers, verify=False, cert=self.cert)
assert self.response.status_code == 200
return True
except requests.exceptions.Timeout:
Message = "Konnektor Timeout"
except IOError:
Message = "Es ist ein Kommunikationsfehler mit dem Konnektor aufgetreten. Fehlendes Zertifikat oder TLS Verbindung fehlgeschlagen."
except AssertionError:
status_code = str(self.response.status_code)
Message = f' HTTP Status Code: {status_code}'
except:
Message = "Ein undefinierter Fehler ist aufgetreten."
if Message: print(Message)
return False
def getSubscription(self, filter, write):
data = soapBody% (self.MandantWide, self.Mandant, self.ClientSystem, self.Arbeitsplatz)
self.headers = {
"Content-Type": "text/xml; charset=""utf-8""",
"Content-Length": str(len(data)),
"SOAPAction": "http://ws.gematik.de/conn/EventService/v7.2#GetSubscription"
}
if not self.konComm('/service/systeminformationservice', data):
return False
dom = xml.dom.minidom.parseString(self.response.text)
if not filter:
if write: self.writeFile(dom.toprettyxml())
print(dom.toprettyxml())
return True
node = dom.childNodes[0].childNodes[0].childNodes[0].childNodes[1]
self.Soap = ''
for e in node.childNodes:
Found = False
# Find filter
for n in e.childNodes:
prefix, tag = xml.dom.minidom._nssplit(n.nodeName)
if n.firstChild.data == filter:
Found = True
# Print if found
if Found:
msg = ""
for n in e.childNodes:
prefix, tag = xml.dom.minidom._nssplit(n.nodeName)
if write:
msg = '%s %s'%(msg, f'{tag}: {n.firstChild.data}\n')
print(f'{tag}: {n.firstChild.data}')
if write: self.writeFile(msg)
return True
def writeFile(self, Message):
with open(f'{self.host}.log', "a") as f:
Date = datetime.now()
isodate = Date.isoformat().rsplit(':', 1)[0]
f.write(f'==================== {isodate} ====================\n')
f.write(f'{Message}\n')
################# Main Function ####################
if __name__ == "__main__":
initializeDicts()
args = parseArgs()
# Create RandomValue for this Programinstance
if args.debug or Sanitizing():
for host in ipRange(args.ipArray):
threading.Thread(target=main, args=(host, args)).start()