summaryrefslogtreecommitdiff
path: root/include/py/hikvision/isapi.py
blob: 6cc34f8c7bb65cf1732290c7fe42d8bce47cf4ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import requests

from time import time
from .util import xml_to_dict, sha256_hex
from ...util import validate_ipv4
from ...http import HTTPMethod
from typing import Optional, Union


class ResponseError(RuntimeError):
    pass


class AuthError(ResponseError):
    pass


class ISAPIClient:
    def __init__(self, host):
        self.host = host
        self.cookies = {}

    def call(self,
             path: str,
             method: HTTPMethod = HTTPMethod.GET,
             data: Optional[Union[dict, str, bytes]] = None,
             raise_for_status=True,
             check_put_response=False):
        f = getattr(requests, method.value.lower())

        headers = {
            'X-Requested-With': 'XMLHttpRequest',
        }
        if method in (HTTPMethod.PUT, HTTPMethod.POST):
            headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'

        kwargs = {}
        if data:
            kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
        if len(self.cookies) > 0:
            kwargs['cookies'] = self.cookies

        r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)

        if raise_for_status or check_put_response:
            r.raise_for_status()

        parsed_xml = None
        if check_put_response:
            parsed_xml = xml_to_dict(r.text)
            resp = parsed_xml['ResponseStatus']
            status_code = int(resp['statusCode'][0])
            status_string = resp['statusString'][0]
            if status_code != 1 or status_string.lower() != 'ok':
                raise ResponseError('response status looks bad')

        self.cookies.update(r.cookies.get_dict())

        if parsed_xml is None:
            parsed_xml = xml_to_dict(r.text)

        return parsed_xml

    def auth(self, username: str, password: str):
        xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
        caps = xml['SessionLoginCap']
        is_irreversible = caps['isIrreversible'][0].lower() == 'true'

        # https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
        # also look into webAuth.js and utils.js

        if 'salt' in caps and is_irreversible:
            p = sha256_hex(username + caps['salt'][0] + password)
            p = sha256_hex(p + caps['challenge'][0])
            for i in range(int(caps['iterations'][0])-2):
                p = sha256_hex(p)
        else:
            p = sha256_hex(password) + caps['challenge'][0]
            for i in range(int(caps['iterations'][0])-1):
                p = sha256_hex(p)

        data = '<SessionLogin>'
        data += f'<userName>{username}</userName>'
        data += f'<password>{p}</password>'
        data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
        data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
        data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
        data += '</SessionLogin>'

        resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
        status_value = int(resp['statusValue'][0])
        status_string = resp['statusString'][0]
        if status_value != 200:
            raise AuthError(f'{status_value}: {status_string}')

    def get_ntp_server(self) -> str:
        try:
            # works on newer 1080p cams
            xml = self.call('System/time/ntpServers/capabilities')
            ntp_server = xml['NTPServerList']['NTPServer'][0]
            if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
                ntp_host = ntp_server['hostName'][0]
            else:
                ntp_host = ntp_server['ipAddress'][0]

        except requests.exceptions.HTTPError:
            # works on older 720p cams
            ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
            if ntp_server['addressingFormatType'][0] == 'hostname':
                ntp_host = ntp_server['hostName'][0]
            else:
                ntp_host = ntp_server['ipAddress'][0]

        return ntp_host

    def set_timezone(self):
        data = '<?xml version="1.0" encoding="UTF-8"?>'
        data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
        self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)

    def set_ntp_server(self,
                       ntp_host: str,
                       ntp_port: int = 123):
        format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'

        # test ntp server first
        data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
        resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
        error_code = int(resp['errorCode'][0])
        error_description = resp['errorDescription'][0]
        if error_code != 0 or error_description.lower() != 'ok':
            raise ResponseError('response status looks bad')

        # then set it
        data = '<?xml version="1.0" encoding="UTF-8"?>'
        data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
        self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)