Source code for tcms.rpc.api.user

# -*- coding: utf-8 -*-

from attachments.models import Attachment
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.exceptions import PermissionDenied
from django.forms.models import model_to_dict
from modernrpc.core import REQUEST_KEY, rpc_method

from tcms.rpc import utils
from tcms.rpc.decorators import permissions_required

User = get_user_model()  # pylint: disable=invalid-name


__all__ = (
    "update",
    "filter",
    "join_group",
    "add_attachment",
)


def _get_user_dict(user):
    user_dict = model_to_dict(user)

    for field in (
        "password",
        "groups",
        "user_permissions",
        "date_joined",
        "last_login",
    ):
        if field in user_dict:
            del user_dict[field]

    return user_dict


[docs]@permissions_required("auth.view_user") @rpc_method(name="User.filter") def filter(query=None, **kwargs): # pylint: disable=redefined-builtin """ .. function:: RPC User.filter(query) Search and return the resulting list of users. :param query: Field lookups for :class:`django.contrib.auth.models.User` :type query: dict :param kwargs: Dict providing access to the current request, protocol entry point name and handler instance from the rpc method :return: Serialized :class:`django.contrib.auth.models.User` object without the password field! :rtype: dict .. note:: If query is ``None`` will return the user issuing the RPC request. """ if not query: query = {"pk": kwargs.get(REQUEST_KEY).user.pk} return list( User.objects.filter(**query) .values( "email", "first_name", "id", "is_active", "is_staff", "is_superuser", "last_name", "username", ) .distinct() )
[docs]@rpc_method(name="User.update") def update( user_id, values, **kwargs ): # pylint: disable=missing-api-permissions-required """ .. function:: RPC User.update(user_id, values) Updates the fields of the selected user. Can be used to update password as well! :param user_id: PK of user to update :type user_id: int :param values: Field values for :class:`django.contrib.auth.models.User` :type values: dict :param kwargs: Dict providing access to the current request, protocol entry point name and handler instance from the rpc method :return: Serialized :class:`django.contrib.auth.models.User` object :rtype: dict :raises PermissionDenied: if missing the *auth.change_user* permission when updating another user or when passwords don't match. .. note:: If ``user_id`` is None will update the user issuing the RPC request. .. warning:: Changing the password for another user via RPC is not allowed! """ request = kwargs.get(REQUEST_KEY) if user_id: user_being_updated = User.objects.get(pk=user_id) else: user_being_updated = request.user editable_fields = ("first_name", "last_name", "email", "password") can_change_user = request.user.has_perm("auth.change_user") is_updating_other = request.user != user_being_updated # If changing other's attributes, current user must have proper permission if is_updating_other and not can_change_user: raise PermissionDenied("Permission denied") update_fields = [] for field in editable_fields: if not values.get(field): continue update_fields.append(field) if field == "password": if is_updating_other: raise PermissionDenied( "Password updates for other users are not allowed via RPC!" ) old_password = values.get("old_password") if not old_password: raise PermissionDenied("Old password is required") if not user_being_updated.check_password(old_password): raise PermissionDenied("Password is incorrect") user_being_updated.set_password(values["password"]) else: setattr(user_being_updated, field, values[field]) user_being_updated.save(update_fields=update_fields) return _get_user_dict(user_being_updated)
[docs]@permissions_required("auth.change_user") @rpc_method(name="User.join_group") def join_group(username, groupname): """ .. function:: RPC User.join_group(username, groupname) Add user to a group specified by name. :param username: Username to modify :type username: str :param groupname: Name of group to join, must exist! :type groupname: str :raises PermissionDenied: if missing *auth.change_user* permission """ user = User.objects.get(username=username) group = Group.objects.get(name=groupname) user.groups.add(group)
[docs]@permissions_required("attachments.add_attachment") @rpc_method(name="User.add_attachment") def add_attachment(filename, b64content, **kwargs): """ .. function:: RPC User.add_attachment(filename, b64content) Attach a file under the currently logged-in user! This method is meant to be used by SimpleMDE combined with post_save processing for various models like TestPlan and TestCase. While files uploaded by this method will be attached and available (if you know their URL), there is no UI to see all of the files uploaded by a certain user or manage them! :param filename: File name of attachment, e.g. 'logs.txt' :type filename: str :param b64content: Base64 encoded content :type b64content: str :param kwargs: Dict providing access to the current request, protocol entry point name and handler instance from the rpc method :return: Information about the attachment :rtype: dict """ user = kwargs.get(REQUEST_KEY).user utils.add_attachment( user.pk, settings.AUTH_USER_MODEL, kwargs.get(REQUEST_KEY).user, filename, b64content, ) # take the last attachment for this user and return information about it attachment = ( Attachment.objects.attachments_for_object(user).order_by("created").last() ) return { "url": attachment.attachment_file.url, "filename": attachment.filename, }