Source code for tcms.rpc.api.testrun

# -*- coding: utf-8 -*-
from django.forms.models import model_to_dict
from modernrpc.core import REQUEST_KEY, rpc_method

from tcms.management.models import Tag
from tcms.rpc import utils
from tcms.rpc.api.forms.testrun import UpdateForm, UserForm
from tcms.rpc.decorators import permissions_required
from tcms.testcases.models import TestCase
from tcms.testruns.forms import NewRunForm
from tcms.testruns.models import Property, TestExecution, TestRun


[docs]@permissions_required("testruns.add_testexecution") @rpc_method(name="TestRun.add_case") def add_case(run_id, case_id): """ .. function:: RPC TestRun.add_case(run_id, case_id) Add a TestCase to the selected test run. :param run_id: PK of TestRun to modify :type run_id: int :param case_id: PK of TestCase to be added :type case_id: int :return: A list of serialized :class:`tcms.testruns.models.TestExecution` objects :rtype: list(dict) :raises DoesNotExist: if objects specified by the PKs don't exist :raises PermissionDenied: if missing *testruns.add_testexecution* permission :raises RuntimeError: if test case status is not CONFIRMED """ run = TestRun.objects.get(pk=run_id) case = TestCase.objects.get(pk=case_id) if run.executions.filter(case=case).exists(): return annotate_executions_with_properties(run.executions.filter(case=case)) if not case.case_status.is_confirmed: raise RuntimeError(f"TC-{case.pk} status is not confirmed") # always add new TEs at the end of TR sortkey = 10 last_te = run.executions.order_by("sortkey").last() if last_te: # in case there are no other TEs sortkey += last_te.sortkey return annotate_executions_with_properties( run.create_execution(case=case, sortkey=sortkey) )
[docs]def annotate_executions_with_properties(executions_iterable): result = [] for execution in executions_iterable: serialized_execution = model_to_dict(execution) serialized_execution["properties"] = list( execution.properties().values("name", "value") ) result.append(serialized_execution) return result
[docs]@permissions_required("testruns.delete_testexecution") @rpc_method(name="TestRun.remove_case") def remove_case(run_id, case_id): """ .. function:: RPC TestRun.remove_case(run_id, case_id) WARNING: this method is deprecated in favor of ``TestExecution.remove()``! Nothing in Kiwi TCMS uses it directly and it will be removed in the future! """ TestExecution.objects.filter(run=run_id, case=case_id).delete()
[docs]@permissions_required("testruns.view_testrun") @rpc_method(name="TestRun.get_cases") def get_cases(run_id): """ .. function:: RPC TestRun.get_cases(run_id) Get the list of test cases that are attached to a test run. :param run_id: PK of TestRun to inspect :type run_id: int :return: Serialized list of :class:`tcms.testcases.models.TestCase` objects augmented with ``execution_id`` and ``status`` information. :rtype: list(dict) """ result = list( TestCase.objects.filter(executions__run_id=run_id).values( "id", "create_date", "is_automated", "script", "arguments", "extra_link", "summary", "requirement", "notes", "text", "case_status", "category", "priority", "author", "default_tester", "reviewer", ) ) executions = TestExecution.objects.filter(run_id=run_id).values( "case", "pk", "status__name" ) extra_info = {row["case"]: row for row in executions.iterator()} for case in result: info = extra_info[case["id"]] case["execution_id"] = info["pk"] case["status"] = info["status__name"] return result
[docs]@permissions_required("testruns.add_testruntag") @rpc_method(name="TestRun.add_tag") def add_tag(run_id, tag_name, **kwargs): """ .. function:: RPC TestRun.add_tag(run_id, tag) Add one tag to the specified test run. :param run_id: PK of TestRun to modify :type run_id: int :param tag_name: Tag name to add :type tag_name: str :param \\**kwargs: Dict providing access to the current request, protocol, entry point name and handler instance from the rpc method :return: Serialized list of :class:`tcms.management.models.Tag` objects :rtype: dict :raises PermissionDenied: if missing *testruns.add_testruntag* permission :raises TestRun.DoesNotExist: if object specified by PK doesn't exist :raises Tag.DoesNotExist: if missing *management.add_tag* permission and *tag_name* doesn't exist in the database! """ request = kwargs.get(REQUEST_KEY) tag, _ = Tag.get_or_create(request.user, tag_name) test_run = TestRun.objects.get(pk=run_id) test_run.add_tag(tag) return list(test_run.tag.values("id", "name"))
[docs]@permissions_required("testruns.delete_testruntag") @rpc_method(name="TestRun.remove_tag") def remove_tag(run_id, tag_name): """ .. function:: RPC TestRun.remove_tag(run_id, tag) Remove a tag from the specified test run. :param run_id: PK of TestRun to modify :type run_id: int :param tag_name: Tag name to add :type tag_name: str :return: Serialized list of :class:`tcms.management.models.Tag` objects :rtype: dict :raises PermissionDenied: if missing *testruns.delete_testruntag* permission :raises DoesNotExist: if objects specified don't exist """ tag = Tag.objects.get(name=tag_name) test_run = TestRun.objects.get(pk=run_id) test_run.remove_tag(tag) return list(test_run.tag.values("id", "name"))
[docs]@permissions_required("testruns.add_testrun") @rpc_method(name="TestRun.create") def create(values, **kwargs): """ .. function:: RPC TestRun.create(values) Create new TestRun object and store it in the database. :param values: Field values for :class:`tcms.testruns.models.TestRun` :type values: dict :param \\**kwargs: Dict providing access to the current request, protocol, entry point name and handler instance from the rpc method :return: Serialized :class:`tcms.testruns.models.TestRun` object :rtype: dict :raises PermissionDenied: if missing *testruns.add_testrun* permission :raises ValueError: if data validations fail Example:: >>> values = {'build': 384, 'manager': 137, 'plan': 137, 'summary': 'Testing XML-RPC for TCMS', } >>> TestRun.create(values) """ if not values.get("default_tester"): values["default_tester"] = kwargs.get(REQUEST_KEY).user.pk form = NewRunForm(values) form.populate(values.get("plan")) if form.is_valid(): test_run = form.save() result = model_to_dict(test_run, exclude=["cc", "tag"]) # b/c value is set in the DB directly and if None # model_to_dict() will not return it result["start_date"] = test_run.start_date return result raise ValueError(list(form.errors.items()))
[docs]@permissions_required("testruns.view_testrun") @rpc_method(name="TestRun.filter") def filter(query=None): # pylint: disable=redefined-builtin """ .. function:: RPC TestRun.filter(query) Perform a search and return the resulting list of test runs. :param query: Field lookups for :class:`tcms.testruns.models.TestRun` :type query: dict :return: List of serialized :class:`tcms.testruns.models.TestRun` objects :rtype: list(dict) """ if query is None: query = {} return list( TestRun.objects.filter(**query) .values( "id", "start_date", "stop_date", "planned_start", "planned_stop", "summary", "notes", "plan", "plan__name", "build", "build__name", "build__version", "build__version__value", "build__version__product", "manager", "manager__username", "default_tester", "default_tester__username", ) .distinct() )
[docs]@permissions_required("testruns.change_testrun") @rpc_method(name="TestRun.update") def update(run_id, values): """ .. function:: RPC TestRun.update(run_id, values) Update the selected TestRun :param run_id: PK of TestRun to modify :type run_id: int :param values: Field values for :class:`tcms.testruns.models.TestRun` :type values: dict :return: Serialized :class:`tcms.testruns.models.TestRun` object :rtype: dict :raises PermissionDenied: if missing *testruns.change_testrun* permission :raises ValueError: if data validations fail """ test_run = TestRun.objects.get(pk=run_id) form = UpdateForm(values, instance=test_run) # In the rare case where this TR is reassigned to another TP # don't validate if TR.build has a FK relationship with TP.product_version. # Instead all Build IDs should be valid if "plan" not in values: form.populate(version_id=test_run.plan.product_version_id) if form.is_valid(): test_run = form.save() result = model_to_dict(test_run, exclude=["cc", "tag"]) # b/c value is set in the DB directly and if None # model_to_dict() will not return it result["start_date"] = test_run.start_date result["stop_date"] = test_run.stop_date return result raise ValueError(list(form.errors.items()))
[docs]@permissions_required("testruns.change_testrun") @rpc_method(name="TestRun.add_cc") def add_cc(run_id, username): """ .. function:: RPC TestRun.add_cc(run_id, username) Add the chosen user to TestRun CC :param run_id: PK of TestRun to modify :type run_id: int :param username: PK, email or username :type username: string :raises DoesNotExist: if test run specified by the PK doesn't exist :raises PermissionDenied: if missing *testruns.change_testrun* permission :raises ValueError: if data validations fail """ test_run = TestRun.objects.get(pk=run_id) form = UserForm({"user": username}) if not form.is_valid(): raise ValueError(list(form.errors.items())) test_run.add_cc(form.cleaned_data["user"])
[docs]@permissions_required("testruns.change_testrun") @rpc_method(name="TestRun.remove_cc") def remove_cc(run_id, username): """ .. function:: RPC TestRun.remove_cc(run_id, username) Remove the chosen user from TestRun CC :param run_id: PK of TestRun to modify :type run_id: int :param username: PK, email or username :type username: string :raises DoesNotExist: if test run specified by the PK doesn't exist :raises PermissionDenied: if missing *testruns.change_testrun* permission :raises ValueError: if data validations fail """ test_run = TestRun.objects.get(pk=run_id) form = UserForm({"user": username}) if not form.is_valid(): raise ValueError(list(form.errors.items())) test_run.remove_cc(form.cleaned_data["user"])
[docs]@permissions_required("testruns.view_property") @rpc_method(name="TestRun.properties") def properties(query=None): """ .. function:: TestRun.properties(query) Return all properties for the specified test run(s). :param query: Field lookups for :class:`tcms.testruns.models.Property` :type query: dict :return: Serialized list of :class:`tcms.testruns.models.Property` objects. :rtype: list(dict) :raises PermissionDenied: if missing *testruns.view_property* permission """ if query is None: query = {} return list( Property.objects.filter(**query) .values( "id", "run", "name", "value", ) .order_by("run", "name", "value") .distinct() )
[docs]@permissions_required("attachments.add_attachment") @rpc_method(name="TestRun.add_attachment") def add_attachment(run_id, filename, b64content, **kwargs): """ .. function:: RPC TestRun.add_attachment(run_id, filename, b64content) Add attachment to the given TestRun. :param run_id: PK of TestRun :type run_id: int :param filename: File name of attachment, e.g. 'logs.txt' :type filename: str :param b64content: Base64 encoded content :type b64content: str :param \\**kwargs: Dict providing access to the current request, protocol, entry point name and handler instance from the rpc method """ utils.add_attachment( run_id, "testruns.TestRun", kwargs.get(REQUEST_KEY).user, filename, b64content, )