Python X2Go Broker API Documentation (v0.0.4.4)

x2gobroker.utils

Contents

Source code for x2gobroker.utils

# -*- coding: utf-8 -*-
# vim:fenc=utf-8

# Copyright (C) 2012-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# X2Go Session Broker is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# X2Go Session Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

"""\
Here you find a collection of tools that are used by X2Go Session
Broker's code internally.

Everything that is not directly related to specific broker code and
potentially reusable at other places in the code tree, is placed into
this module.

"""

import os
import sys
import locale
import netaddr
import distutils.version
import pwd, grp
import socket
import binascii
import time

def _checkConfigFileDefaults(data_structure):
    """\
    Check an ini-file-like data structure.

    :param data_structure: an ini-file-like data structure
    :type data_structure: ``dict`` of ``dict``s

    :returns: ``True`` if ``data_structure`` matches that of an ini file data structure
    :rtype: ``bool``

    """
    if data_structure is None:
        return False
    if type(data_structure) is not dict:
        return False
    for sub_dict in list(data_structure.values()):
        if type(sub_dict) is not dict:
            return False
    return True


[docs]def touch_file(filename, mode='a'): """\ Imitates the behaviour of the GNU/touch command. :param filename: name of the file to touch :type filename: ``str`` :param mode: the file mode (as used for Python file objects) :type mode: ``str`` """ if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename), mode=0o700) f = open(filename, mode=mode) f.close()
[docs]def get_encoding(): """\ Detect systems default character encoding. :returns: The system's local character encoding. :rtype: ``str`` """ try: encoding = locale.getdefaultlocale()[1] if encoding is None: raise BaseException except: try: encoding = sys.getdefaultencoding() except: encoding = 'ascii' return encoding
[docs]def compare_versions(version_a, op, version_b): """\ Compare <version_a> with <version_b> using operator <op>. In the background ``distutils.version.LooseVersion`` is used for the comparison operation. :param version_a: a version string :type version_a: ``str`` :param op: an operator provide as string (e.g. '<', '>', '==', '>=' etc.) :type op: ``str`` :param version_b: another version string that is to be compared with <version_a> :type version_b: ``str`` :returns: if the comparison is ``True`` or ``False`` :rtype: ``bool`` """ ### FIXME: this comparison is not reliable with beta et al. version strings ver_a = distutils.version.LooseVersion(version_a) ver_b = distutils.version.LooseVersion(version_b) return eval("ver_a %s ver_b" % op)
[docs]def normalize_hostnames(servers): """\ Take a ``list`` or ``dict`` of servers and check if they match in their domain part and strip the domain part finally off. E.g., for servers provided as a ``list`` (tuple would be ok, too:: ['server1', 'server2'] -> ['server1', server2'] ['server1.domain1, 'server2.domain1'] -> ['server1', server2'] ['server1.domain1, 'server2.domain2'] -> (['server1', server2'], ['domain1', 'domain2'] E.g., for servers provided as a ``dict``:: {'server1': <whatever-params>, 'server2': <whatever-params> } -> {'server1': <whatever-params>, 'server2': <whatever-params> } {'server1.domain1': <whatever-params>, 'server2.domain1': <whatever-params> } -> {'server1': <whatever-params>, 'server2': <whatever-params> } {'server1.domain1': <whatever-params>, 'server2.domain2': <whatever-params> } -> ({'server1': <whatever-params>, 'server2': <whatever-params> }, ['domain1', 'domain2'] :param servers: a ``list``, ``tuple`` or ``dict`` hash with either server hostnames as items or dictionary keys :type servers: ``list``, ``tuple`` or ``dict`` :returns: a ``list`` or a ``dict`` with server domains stripped of the items / keys :rtype: ``list``, ``dict`` or ``tuple`` """ # test the data type of servers arg_is_dict = False servers_normalized = [] if type(servers) is dict: arg_is_dict = True servers_normalized = {} elif type(servers) is tuple: servers=list(servers) elif type(servers) not in (list, tuple): raise ValueError('only lists, tuples and dictionaries are valid for x2gobroker.utils.normalize_hostnames()') subdomains = [] for server in servers: # do not deal with IPv4 or IPv6 addresses if netaddr.valid_ipv4(server) or netaddr.valid_ipv6(server): continue else: _server = server if '.' not in _server: _server += '.' hostname, subdomain = _server.split('.', 1) if arg_is_dict: servers_normalized[hostname] = servers[server] else: servers_normalized.append(hostname) # collect the list of subdomains used in all server names if subdomain and subdomain not in subdomains: subdomains.append(subdomain) # return the original servers dict/list/tuple if len(subdomains) > 1: servers_normalized = servers return servers_normalized, subdomains
[docs]def matching_hostnames(server_list_a, server_list_b): """\ Compare two list of servers, if they have matching hostnames. This function tries to smoothly ship around asymmetric usage of FQDN hostnames and short hostnames in one list. :param server_list_a: list of servers :type server_list_a: ``list`` of ``str`` :param server_list_b: list of servers to compare the first list with :type server_list_b: ``list`` of ``str`` :returns: a sorted list of matching server hostnames (hostnames that appear in both provided server lists. :returns: ``list`` of ``str`` """ matching_hosts = [] ### NORMALIZE (=reduce to hostname only) server names (list A) if possible server_list_a_normalized, subdomains_a = normalize_hostnames(server_list_a) ### NORMALIZE server names (in list B), only if we have a unique domain match in list A if len(subdomains_a) <= 1: server_list_b_normalized, subdomains_b = normalize_hostnames(server_list_b) if len(subdomains_b) <= 1: if len(subdomains_a) == 0 or len(subdomains_b) == 0: matching_hosts = list(set(server_list_a_normalized).intersection(set(server_list_b_normalized))) if not matching_hosts: matching_hosts = list(set(server_list_a).intersection(set(server_list_b))) matching_hosts.sort() return matching_hosts
[docs]def drop_privileges(uid, gid): """\ Drop privileges from super-user root to given ``<uid>`` and ``<gid>``. Only works when run as root, if run with a non-super-user account, ``None`` is returned. If privileges could be dropped, the environment's HOME variable is adapted to the new user account's home directory path. Also, the umask of the account we dropped privileges to is set to ``0o077``. :param uid: the user ID of the user account to drop privileges to :type uid: ``str`` :param gid: the group ID to drop privileges to :type gid: ``str`` """ if os.getuid() != 0: # We're not root so, like, whatever dude return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid).pw_uid running_gid = grp.getgrnam(gid).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask os.umask(0o077) # set the new user's home directory as $HOME os.environ['HOME'] = pwd.getpwnam(uid).pw_dir
[docs]def split_host_address(host, default_address=None, default_port=22): """\ Try to split a ``<host_addr>:<port>`` expression into hostname and port. This function is supposed to work with DNS hostnames, IPv4 and IPv6 address. Both parts (<host_addr> and <port>) can be omitted in the given ``host`` string. If so, ``default_address`` and ``default_port`` come into play. :param host: an expression like ``<host_addr>:<port>`` (where either the host address or the port can be optional) :type host: ``str`` :param default_address: a fallback host address to be used (default: None) :type default_address: ``str`` :param default_port: a fallback port to be used (default: 22) :type default_port: ``int`` :returns: a tuple of host address and port :rtype: ``tuple(<host_addr>, <port>)`` """ if type(host) is int: host = str(host) # do some stripping first... host = host.strip() host = host.lstrip('*') host = host.lstrip(':') bind_address = None bind_port = None is_ipv6 = None if host and host[0] == '[': is_ipv6 = True if ':' in host: bind_address, bind_port = host.rsplit(':', 1) try: bind_port = int(bind_port) except ValueError: # obviously we split an IPv6 address bind_address = host bind_port = int(default_port) else: try: # in host we find a port number only bind_port = int(host) except ValueError: if host: bind_address = host else: bind_address = '0.0.0.0' if type(default_port) is int: # use the given default, in host, there is an IP address or hostname bind_port = default_port else: # setting a hard-coded port bind_port = 22 if bind_address is None: # in "host" we found the bind_port, now we assign the bind_address bind_address = '0.0.0.0' if default_address: bind_address = default_address bind_address = bind_address.lstrip('[').rstrip(']') if is_ipv6: bind_address = '[{address}]'.format(address=bind_address) return bind_address, bind_port
[docs]def portscan(addr, port=22): """\ Perform a port scan to the requested hostname. :param addr: address (IPv4, IPv6 or hostname) of the host we want to probe :type addr: ``str`` :param port: port number (default: 22) :type port: ``int`` :returns: ``True`` if the port is in use, else ``False`` (also on errors) :rtype: ``bool`` """ ip_proto = 0 try: socket.getaddrinfo(addr, None, socket.AF_INET6) ip_proto = 6 except socket.gaierror: try: socket.getaddrinfo(addr, None, socket.AF_INET) ip_proto = 4 except socket.gaierror: # we can't find a valid address for this host, so returning a failure... return False if ip_proto == 6 or netaddr.valid_ipv6(addr): sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) elif ip_proto == 4 or netaddr.valid_ipv4(addr): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) try: result = sock.connect_ex((addr, port)) if result !=0: sock.close() return False except socket.gaierror: return False except socket.error: return False finally: sock.close() return True
[docs]def get_key_fingerprint(key): """\ Retrieve the host key fingerprint of the server to be validated. :param key: a Python Paramik :class:`PKey`` object :type key: ``PKey`` :returns: host key fingerprint :rtype: ``str`` """ return binascii.hexlify(key.get_fingerprint()).decode()
[docs]def get_key_fingerprint_with_colons(key): """\ Retrieve the (colonized) host key fingerprint of the server to be validated. :param key: a Python Paramik :class:`PKey`` object :type key: ``PKey`` :returns: host key fingerprint (with colons) :rtype: ``str`` """ _fingerprint = get_key_fingerprint(key) _colon_fingerprint = '' idx = 0 for char in _fingerprint: idx += 1 _colon_fingerprint += char if idx % 2 == 0: _colon_fingerprint += ':' return _colon_fingerprint.rstrip(':')
[docs]def delayed_execution(agent_func, delay, *args, **kwargs): """\ Delay execution of a function. :param func: function to be executed. :type func: ``func`` :param delay: delay of the function start in seconds :type delay: ``int`` :param args: arg parameters to be handed over to the to-be-delayed function :type args: ``list`` :param kwargs: kwarg parameters to be handed over to the to-be-delayed function :type kwargs: ``dict`` """ forkpid = os.fork() if forkpid == 0: # close stdin, stdout and stderr in the forked process... for nm in os.listdir("/proc/self/fd"): if nm.startswith('.'): continue fd = int(nm) if fd in (0,1,2): os.close(fd) # wait for the given delay period i = 0 while i < delay: time.sleep(1) i += 1 # execute the function requested agent_func(*args, **kwargs) os._exit(0)

Contents