Attention

This document was written for an old version of Plone, Plone 3, and was last updated 898 days ago.

To learn how to upgrade to the current version of Plone, read the upgrade manual.

Time based workflow transitions

by Mikko Ohtamaa last modified Dec 06, 2009 09:27 PM
How to make workflows to do something based on time

Purpose

Sometimes expiring content is not flexible enough approach for workflows. You might want to retract content automatically if it's not reviewed in a certain period of time. This how to explains how to make workflows react to a time.

This example show how to make content automatically retract after 14 days if it has not been pushed forward. Also an email is sent to the review list in this case (a content rule). The example is quite advanced level and shows only the critical parts of the code - you need to able to understand and apply principles shown here.

Prerequisities

You need to be able to

You will learn

  • Manage time based events
  • Create templated content rule emails
  • Create workflows with time based triggers
  • Unit testing time base events
  • Unit testing workflow security
  • Unit testing outgoing email

Products used

Following add on products are used:

Step by step

Create a workflow. This is done by hand crafting profiles/default/workflows/yourworkflow/definition.xml. Here we define auto-retract transition. We use Manage portal permission so that only admin (clock event) can trigger the transition:

<state state_id="doctor" title="State with auto retract">
  <description>Pending for reviewer input</description>
  <exit-transition transition_id="reject"/>
  <exit-transition transition_id="auto_retract"/>
  ...
</state>
<transition transition_id="auto_retract"
           title="Send back to the author"
           new_state="initial" trigger="USER"
           before_script="" after_script="">
  <description>Send back to the orignal author if review board does not take action.</description>
  <action url="%(content_url)s/content_status_modify?workflow_action=auto_retract"
        category="workflow">Send back to the orignal author automatically</action>
 <guard>
 <guard-permission>Manage portal</guard-permission>
 </guard>
</transition>

In profiles/default/contentrules.xml we define a content rule action which will send email to the reviewer mailing list if there was no action taken:

 <!-- auto expire -->
 <rule name="mail_when_review_expired"
        title="Email review board if no one reviewed the content"
    description=""
    enabled="True"
        event="Products.CMFCore.interfaces.IActionSucceededEvent"
    stop-after="False">

  <conditions>
   <condition type="plone.conditions.WorkflowTransition">
    <property name="wf_transitions">
     <element>auto_retract</element>
    </property>
   </condition>
  </conditions>

  <actions>
   <action type="collective.easytemplate.actions.Mail">
    <property name="source">info@reviewboard.fi</property>
    <property name="message">
Content $title was not reviewed.

The content has been sent back to its orignal author.

To see the content click:

${object_url}

    </property>
    <property name="recipients">info@reviewboard.fi</property>
    <property name="subject">Content review timeout: $title</property>
   </action>
  </actions>
 </rule>

<assignment
    location="/"
    name="mail_when_review_expired"
    enabled="True"
    bubbles="True"
    />

Create a clock event subscriber which will check whether the transition is needed:

<configure
    xmlns="http://namespaces.zope.org/zope"
    xmlns="http://namespaces.zope.org/browser"
    i18n_domain="my.app">

    <subscriber handler=".tickers.auto_expire"
        for="collective.timedevents.interfaces.ITickEvent"
        />

</configure>

The subscriber Python code example:

"""

    Example how to trigger workflow transitions based on time.

    http://www.twinapex.com

"""

__author__ = "Mikko Ohtamaa <mikko.ohtamaa@twinapex.com>"


# Python imports
import logging

# Zope imports
from zope.app.component.hooks import getSite
from DateTime import DateTime

# Plone imports
from Products.CMFCore.utils import getToolByName
from Products.CMFPlone.log import logger

# Local imports
from harvinaiset.app.interfaces import IDescriptionBase

# Design for testing:
# Number of items last call expired
triggered_count = 0



class TransitionTriggerTicker:
    """ Perform automatic transitions for workflows after certain amount of time is reached in some content attribute.

    Usage instructions:

    1. Subclass this

    2. Override settings variables

    3. Install subscriber::

        <subscriber
          handler=".tickers.myhandler"
          for="collective.timedevents.interfaces.ITickEvent"
        />

    4. Install collective.timedevents
    """

    # Which workflow state the object must be in - string, id
    state = None

    # workflow name
    workflow = None

    # Which workflow transition we perform - string, id
    transition = None

     # Interface which marks the content types to be checked, interface object
    marker_interface = None

    # Which WF attribute we check for the expiration
    # http://localhost:8080/Plone/portal_workflow/plone_workflow/variables/manage_workspace
    time_variable = "time"

    # After how many days (floating point) we trigger the transition
    delta = None

    def __call__(self, event):
        """

        @param event: zope event object
        """
        global triggered_count

        now = event.date_time

        site = getSite()
        logger.debug("Running timed workflow transitions checks at %s, site is: %s, transition is %s" % (str(now), str(site), self.transition))

        pct = getToolByName(site, 'portal_catalog')
        wf = getToolByName(site, 'portal_workflow')

        query = {}
        query["object_provides"]=self.marker_interface.__identifier__
        # Assumes WF stores its statein review_state variable
        query["review_state"]=self.state
        # Expires expression
        #query[self.time_index]={'query':DateTime() - self.delta, 'range':'min'}
        catalog_data = pct.unrestrictedSearchResults(**query)

        # The following loop is a bit expensive, since it
        # wakes up every object to get last transition time
        for item in catalog_data:

            obj = item.getObject()
            status = wf.getStatusOf(self.workflow, obj)

            logger.debug("Checking autotransition for:" + str(obj))

            if now >= status[self.time_variable] + self.delta:
                # Perform workflow transitions.
                # This triggers content rule based actions,
                # so we can put the actual action logic in the content rule
                logger.info("Triggering auto transition for:" + str(obj) + " state:" + status["review_state"] + " transition:" + self.transition)

                wf.doActionFor(obj, self.transition)

                triggered_count += 1

class AutoExpireDocument(TransitionTriggerTicker):
    """ Define a sample time based trigger. """

    workflow = "myworkflow"

    state = "need_review"

    transition = "auto_retract"

    marker_interface = IMyContentInterface

    delta = 14.0 # Two weeks


auto_expire = AutoExpireDocument()

Buildout

The following sample bits show what you need to have in your buildout.cfg:

eggs =
   ...
   collective.timedevents
   collective.easytemplate

...and...:

[instance]
...
zcml =
    collective.easytemplate
    collective.timedevents

...
zope-conf-additional =
  <clock-server>
      method /yourinstancename/@@tick
      period 15
      host localhost
      user admin
      password adminpassword
  </clock-server>

Unit test

The following sample unit test code can be used to check if this all is succesfully pulled together:

from DateTime import DateTime
from Products.SecureMailHost.SecureMailHost import SecureMailHost
from my.app.browser import tickers

__docformat__ = "epytext"
__author__ = "Mikko Ohtamaa <mikko.ohtamaa@twinapex.com>"
__license__ = "BSD"

class DummySecureMailHost(SecureMailHost):
    meta_type = 'Dummy secure Mail Host'
    def __init__(self, id):
        self.id = id
        self.sent = []

        self.mto = None

    def _send(self, mfrom, mto, messageText, debug=False):
        print "Sending message"
        self.sent.append(messageText)
        self.mto = mto


class WorkflowTestCase(MyAppTestCase):
    " Test workflow access rights. "

    def afterSetUp(self):
        self.workflow = getToolByName(self.portal, 'portal_workflow')
        self.acl_users = getToolByName(self.portal, 'acl_users')
        self.types = getToolByName(self.portal, 'portal_types')
        self.registration =  getToolByName(self.portal, 'portal_registration')
        self.membership =  getToolByName(self.portal, 'portal_membership')

        # Create a normal registered portal member
        # to be used in tests
        self.registration.addMember("testmember", "secret", ["Member",], properties={ 'username': "testmember", 'email' : "foobar@foobar.com" })

        # Editors are portal review board members and can create, edit and review the content
        self.registration.addMember("editor", "secret", ["Member"], properties={ 'username': "editor", 'email' : "foobar@foobar.com" })

        self.portal.manage_setLocalRoles("editor", ["Editor", "Reviewer", "Contributor"])
        self.portal.reindexObjectSecurity()

        self.loginAsPortalOwner()
        sm = getSiteManager(self.portal)
        sm.unregisterUtility(provided=IMailHost)
        self.dummyMailHost = DummySecureMailHost('dMailhost')
        sm.registerUtility(self.dummyMailHost, IMailHost)

        # Set verbose security policy, making debugging Unauthorized
        # exceptions great deal easier in unit tests
        setSecurityPolicy(ZopeSecurityPolicy(verbose=True))

        # Enable stdout Python debug logging
        from Products.CMFPlone.log import logger
        logger.root.setLevel(logging.DEBUG)
        logger.root.addHandler(logging.StreamHandler(sys.stdout))


    def loginAsPortalMember(self, id):
        ''' Login as a normal portal member.

        @param id. username
        '''
        self.login(id)

    def _execUntrusted(self, debug, function_body, **kwargs):
        ''' Sets up a sandboxed Python environment with Zope security in place.

        Calls func() in an sandboxed environment. The security mechanism
        should catch all unauthorized function calls (declared
        with a class SecurityManager).

        Security is effective only inside the function itself -
        The function security declarations themselves are ignored.

        @param func: Function object
        @param args: Parameters delivered to func
        @param kwargs: Parameters delivered to func
        @param debug: If True, break into pdb debugger just before evaluation
        @return: Function return value
        '''

        # Create global variable environment for the sandbox
        globals = get_safe_globals()
        globals['__builtins__'] = safe_builtins

        # Zope seems to have some hacks with guaded_getattr.
        # guarded_getattr is used to check the permission when the
        # object is being traversed in the restricted code.
        # E.g. this controls function call permissions.
        from AccessControl.ImplPython import guarded_getattr as guarded_getattr_safe
        globals['_getattr_'] = guarded_getattr_safe
        #globals['getattr'] = guarded_getattr_safe
        #globals['guarded_getattr'] = guarded_getattr_safe

        #import pdb ; pdb.set_trace()
        globals.update(kwargs)

        # Our magic code

        # The following will compile the parsed Python code
        # and applies a special AST mutator
        # which will proxy __getattr__ and function calls
        # through guarded_getattr
        code = compile_restricted(function_body, "<string>", "eval")

        # Here is a good place to break in
        # if you need to do some ugly permission debugging
        if debug:
            import pdb
            pdb.set_trace()

        return eval(code, globals)

    def execUntrusted(self, func, **kwargs):
        """ Sets up a sandboxed Python environment with Zope security in place. """
        return self._execUntrusted(False, func, **kwargs)

    def execUntrustedDebug(self, func, **kwargs):
        """ Sets up a sandboxed Python debug environment with Zope security in place. """
        return self._execUntrusted(True, func, **kwargs)

    def assertUnauthorized(self, func, **kwargs):
        """ Check that calling func with currently effective roles will raise Unauthroized error. """
        try:
            self.execUntrusted(func, **kwargs)
        except Unauthorized, e:
            return

        raise AssertionError, 'Unauthorized exception was expected'

    def test_autoexpire(self):
        """ Test that the actions are properly taken when diagnose hits our special timed expire. """

        self.loginAsPortalMember("editor")
        diagnosis = self.create_sample_diagnosis_content()

        diagnosis.setSomething("editor edits text")
        # Submit item to doctors for edit
        self.execUntrusted("portal.portal_workflow.doActionFor(diagnosis, action)", portal=self.portal, diagnosis=diagnosis, action="send_to_review")
        status = self.workflow.getStatusOf("my_workflow", diagnosis)
        self.assertEqual(status["review_state"], "waits_review")
        self.logout()
        self.dummyMailHost.sent = []

        # Fire a time event - run them as an admin
        from collective.timedevents.events import TickEvent
        from zope.event import notify

        self.loginAsPortalOwner()

        tickers.triggered_count = 0
        notify(TickEvent(DateTime(), DateTime()))

        # The item is fresh and should not auto expire
        self.assertEqual(tickers.triggered_count, 0)

        # Now put the items modified 200 days to the past
        status = self.workflow.getStatusOf("my_workflow", diagnosis)
        status["time"] = DateTime() - 200

        # Tick again and we should see some results

        # We need to reset content rules applied cache, otherwise
        # rules are not evaluated - took 5 hours to find this
        # little trick out :(((
        from plone.app.contentrules import handlers
        import zope.thread
        handlers._status = zope.thread.local()

        notify(TickEvent(DateTime(), DateTime()))

        # Transition should be done and we should be back to the initial state
        self.assertEqual(tickers.triggered_count, 1)
        status = self.workflow.getStatusOf("my_workflow", diagnosis)
        self.assertEqual(status["review_state"], "initial")

        # Check that a notification email was send (as defined in contentrules.xml)
        self.assertEqual(len(self.dummyMailHost.sent), 1)

Contribute

Something wrong or out of date? Anybody can edit or create a new article in the knowledge base. Simply create an account on this site, log in, and click the Edit button to contribute.