# Copyright (c) 2009-2010 VMware, Inc. All Rights Reserved.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import time
import generated.VSOWebControlService_client_sync as sync_client
import generated.VSOWebControlService_client_async as async_client
from types import WorkflowTokenAttribute as _WorkflowTokenAttribute
from vmw.ZSI import EvaluateException
from interfaces import ITypedValue
try:
from twisted.internet.defer import Deferred
from twisted.internet import task, reactor
except ImportError:
TWISTED_PRESENT = False
else:
TWISTED_PRESENT = True
from zope.interface import implements
[docs]class Client(object):
"""
Implementation of a vCO webservice client.
API Calls implemented:
- echo
- echoWorkflow
- answerWorkflowInput
- cancelWorkflow
- executeWorkflow
- find
- findForId
- findRelation
- hasChildrenInRelation
- getAllPlugins
- getAllWorkflows
- getWorkflowForId
- getWorkflowsWithName
- getWorkflowTokenForId
- getWorkflowTokenResult
- getWorkflowTokenStatus
- simpleExecuteWorkflow
- hasRights
- sendCustomEvent
"""
[docs] def __init__(self,
url = "http://localhost:8280/vmware-vmo-webcontrol/webservice",
username = "admin",
password = "admin",
async = False,
**kw):
"""
Build a client for target vCO server.
:param url: full URL for the target vCO SOAP service
:type url: string
:param username: user name
:type username: string
:param password: password for the user
:type password: string
:param async: whether to use asynchronous bindings
:type async: bool
:param kw: keyword arguments passed to underlying ``Binding`` object
:type kw: dict
"""
self._url = url
self._username = username
self._password = password
if async:
assert TWISTED_PRESENT, "asynchronous mode cannot be used without Twisted library"
self._mod = async_client
self.service = async_client.VSOWebControlServiceLocator().getwebservice(url, **kw)
self.service.binding.defer = async
else:
self._mod = sync_client
self.service = sync_client.VSOWebControlServiceLocator().getwebservice(url, **kw)
self._generateAccessor("getAllWorkflows",
self._getWorkflowsTrans, True)
self._generateAccessor("getWorkflowForId",
self._getWorkflowTrans, True)
self._generateAccessor("getWorkflowsWithName",
self._getWorkflowsTrans, True)
self._generateAccessor("executeWorkflow",
self._executeWorkflowTrans, True)
self._generateAccessor("simpleExecuteWorkflow",
self._executeWorkflowTrans, ('in1', 'in2'))
self._generateAccessor("answerWorkflowInput",
None, True)
self._generateAccessor("cancelWorkflow",
None, True)
self._generateAccessor("getWorkflowTokenForId",
self._getWorkflowTokenForIdTrans, True)
self._generateAccessor("getWorkflowTokenResult",
self._getWorkflowTokenResultTrans, True)
self._generateAccessor("getWorkflowTokenStatus",
self._getWorkflowTokenStatusTrans, True)
self._generateAccessor("getAllPlugins",
self._getAllPluginsTrans, True)
self._generateAccessor("hasRights",
None, True)
self._generateAccessor("sendCustomEvent",
None, True)
self._generateAccessor("find",
self._findTrans, True)
self._generateAccessor("findForId",
self._findForIdTrans, True)
self._generateAccessor("findRelation",
self._findRelationTrans, True)
self._generateAccessor("hasChildrenInRelation",
self._hasChildrenInRelationTrans, True)
self._generateAccessor("echoWorkflow", self._getWorkflowTrans)
self._generateAccessor("echo")
def _isAsynchronous(self):
return self._mod is async_client
[docs] def getAllWorkflows(self):
"""Retrieve all workflows installed in the server.
:rtype: list of :class:`vmw.vco.client.Workflow`
"""
return self._getAllWorkflows()
[docs] def getAllPlugins(self):
"""Retrieve all plugins installed in the server.
:rtype: list of :class:`vmw.vco.client.Plugin`
"""
return self._getAllPlugins()
[docs] def getWorkflowForId(self, id):
"""Retrieve workflow with specified id.
:param id: Workflow id
:type id: string
"""
return self._getWorkflowForId(workflowId=id)
[docs] def getWorkflowsWithName(self, name):
"""Retrieve all workflows with specified name (might be in different categories).
:param name: Workflow name
:type name: string
"""
return self._getWorkflowsWithName(workflowName=name)
[docs] def cancelWorkflow(self, token):
"""Cancel specified workflow token.
:param token: Token to cancel
:type token: vmw.vco.client.WorkflowToken
"""
return self._cancelWorkflow(workflowTokenId=token.id)
def __readInputs(self, inputs):
real_inputs = []
for k in inputs.keys():
i = _WorkflowTokenAttribute()
i._name = k
wrap = ITypedValue(inputs[k])
i._type = wrap.type()
i._value = wrap.value()
real_inputs.append(i)
return real_inputs
[docs] def executeWorkflow(self, wf, inputs={}):
"""Run a workflow with specified set of inputs.
:param wf: workflow to run
:type wf: vmw.vco.client.Workflow
:param inputs: inputs to run with
:type inputs: dict
"""
return self._executeWorkflow(workflowId=wf.id,
workflowInputs=self.__readInputs(inputs))
[docs] def simpleExecuteWorkflow(self, wf, encoded_input):
"""**deprecated**
Alternate way of executing a workflow. This will encode inputs in an
unsafe way, please don't use.
:param wf: workflow to run
:type wf: vmw.vco.client.Workflow
:param inputs: inputs to run with
:type inputs: dict
"""
return self._simpleExecuteWorkflow(in0=wf.id, in3=encoded_input)
[docs] def getWorkflowTokenForId(self, token_id):
"""Retrieve a workflow token from its id.
:param token_id: token id
:type token_id: string
"""
return self._getWorkflowTokenForId(workflowTokenId=token_id)
[docs] def getWorkflowTokenResult(self, token):
"""Get result for workflow token. Only applies when the workflow execution is completed.
:param token: token to extract result from
:type token: vmw.vco.client.WorkflowToken
"""
return self._getWorkflowTokenResult(workflowTokenId=token.id)
[docs] def getWorkflowTokenStatus(self, tokens):
"""Get status for specified list of workflow tokens. Returns a list of
status in the same order.
"""
return self._getWorkflowTokenStatus(workflowTokenIds=[t.id for t in tokens])
[docs] def hasRights(self, task, right):
"""Check if current user has rights over the specified task."""
return self._hasRights(taskId=task.id, right=right)
[docs] def sendCustomEvent(self, event, props):
"""Send a custom event with specified properties.
:param event: event to send
:type event: string
:param props: properties of the event
:type props: dict
"""
props = "\n".join(["%s=%s" % (str(key), str(val))
for (key, val) in props.items()])
return self._sendCustomEvent(eventName=event, serializedProperties=props)
[docs] def find(self, type, query=""):
"""Find objects of a given type, that match a plug-in dependant query."""
return self._find(type=type, query=query)
[docs] def findForId(self, type, id):
"""Find specific object, from it's type and unique id."""
try:
return self._findForId(type=type, id=id)
except EvaluateException:
return None
[docs] def findRelation(self, type, id, relation):
"""Find children of specific object (defined by its type and unique
id), according to specified relation."""
try:
return self._findRelation(parentType=type, parentId=id,
relationName=relation)
except EvaluateException:
return []
[docs] def hasChildrenInRelation(self, type, id, relation):
"""Check if specified object (defined by its type and unique id) has
any children according to specified relation."""
return self._hasChildrenInRelation(parentType=type, parentId=id,
relationName=relation)
[docs] def echo(self, msg):
"""Test method. Echo back provided message.
:param msg: Message to echo.
:type msg: string
"""
return self._echo(message=msg)
[docs] def echoWorkflow(self, wf):
"""Test method. Echo back provided workflow.
:param wf: Workflow to echo.
:type wf: vmw.vco.client.Workflow
"""
return self._echoWorkflow(workflowMessage=wf._holder)
def _getAllPluginsTrans(self, res):
return [Plugin(self, w) for w in res]
def _getWorkflowTrans(self, res):
return Workflow(self, res)
def _getWorkflowsTrans(self, res):
return [Workflow(self, w) for w in res]
def _executeWorkflowTrans(self, res):
return WorkflowToken(self, res)
def _getWorkflowTokenForIdTrans(self, res):
return WorkflowToken(self, res)
def _getWorkflowTokenResultTrans(self, res):
return dict([(t._name, TypedValue(t._type, t._value)) for t in res])
def _getWorkflowTokenStatusTrans(self, res):
translate = {"running": WorkflowToken.RUNNING,
"waiting": WorkflowToken.WAITING,
"waiting-signal": WorkflowToken.WAITING_SIGNAL,
"canceled": WorkflowToken.CANCELLED,
"completed": WorkflowToken.COMPLETED,
"failed": WorkflowToken.FAILED}
return [translate[r] for r in res]
def _findTrans(self, res):
if res._elements is None:
return []
return [FinderResult(self, r) for r in res._elements._item]
def _findForIdTrans(self, res):
return FinderResult(self, res)
def _findRelationTrans(self, res):
return [FinderResult(self, r) for r in res]
def _hasChildrenInRelationTrans(self, res):
return (res > 0)
# magic method for generating method calling generated stuff
def _generateAccessor(self, name, trans = None, auth = False):
# compute class of correct request type
cls = eval("self._mod." + name + "Request")
def _func(**keywords):
req = cls()
# authenticated methods need username/password
if auth:
if type(auth) is bool:
req._username = self._username
req._password = self._password
else:
# some broken calls have cryptic names instead...
user_field, pwd_field = auth
setattr(req, '_' + user_field, self._username)
setattr(req, '_' + pwd_field, self._password)
keys = keywords.keys()
# build the request object with all specified components
for kw in keys:
setattr(req, "_" + kw, keywords[kw])
# call the generated method, and get response
resp = getattr(self.service, name)(req)
def __extractRes(resp, name, trans):
# extract relevant value from response object
res = getattr(resp, "_" + name + "Return", None)
# optionally transform result
if trans:
return trans(res)
else:
return res
if isinstance(resp, Deferred):
resp.addCallback(__extractRes, name, trans)
return resp
else:
return __extractRes(resp, name, trans)
# add the method to this object
setattr(self, "_" + name, _func)
[docs]class TypedValue(object):
implements(ITypedValue)
[docs] def __init__(self, type, value):
"""Build a typed value
:param type: type of the value
:param value: string representation of the value
"""
self._type = type
self._value = value
def type(self):
return self._type
def value(self):
return self._value
[docs]class Plugin(object):
def __init__(self, server, holder):
self._server = server
self._holder = holder
#: Plugin name
self.name = holder._moduleName
#: Plugin description
self.description = holder._moduleDescription
self._repr = holder._moduleDisplayName
#: Plugin version
self.version = holder._moduleVersion
def __repr__(self):
return self._repr
[docs]class WorkflowAttribute(object):
def __init__(self, server, holder):
self._server = server
self._holder = holder
#: Attribute name
self.name = holder._name
#: Attribute type
self.type = holder._type
[docs]class Workflow(object):
def __init__(self, server, holder):
self._server = server
self._holder = holder
#: Workflow id
self.id = holder._id
#: Workflow name
self.name = holder._name
#: Workflow description
self.description = holder._description
#: Workflow input parameters. list
#: of :class:`vmw.vco.client.WorkflowAttribute`
self.inParameters = self.__convertAttributes(holder._inParameters)
#: Workflow output parameters. list
#: of :class:`vmw.vco.client.WorkflowAttribute`
self.outParameters = self.__convertAttributes(holder._outParameters)
#: Workflow attributes. list
#: of :class:`vmw.vco.client.WorkflowAttribute`
self.attributes = self.__convertAttributes(holder._attributes)
def __convertAttributes(self, attrs):
return [WorkflowAttribute(self._server, it) for it in attrs._item]
[docs] def execute(self, inputs={}):
"""Execute this workflow with provided input parameters.
:param inputs: input parameters
:type inputs: dict
"""
return self._server.executeWorkflow(self, inputs)
[docs]class WorkflowToken(object):
# order matters (see method isFinished())
RUNNING = 0
WAITING = 1
WAITING_SIGNAL = 2
CANCELLED = 3
COMPLETED = 4
FAILED = 5
def __init__(self, server, holder):
self._server = server
self._holder = holder
#: WorkflowToken id
self.id = holder._id
#: WorkflowToken title
self.title = holder._title
#: Related Workflow id
self.workflowId = holder._workflowId
self.currentItemName = holder._currentItemName
self.currentItemState = holder._currentItemState
#: WorkflowToken state
self.globalState = holder._globalState
self.startDate = holder._startDate
self.endDate = holder._endDate
self.xmlContent = holder._xmlContent
def cancel(self):
return self._server.cancelWorkflow(self)
def getResult(self):
return self._server.getWorkflowTokenResult(self)
def getStatus(self):
if self._server._isAsynchronous():
d = self._server.getWorkflowTokenStatus([self])
d.addCallback(lambda st: st[0])
return d
return self._server.getWorkflowTokenStatus([self])[0]
def isFinished(self):
if self._server._isAsynchronous():
return self.getStatus().addCallback(lambda st: st >= self.CANCELLED)
return self.getStatus() >= self.CANCELLED
def isWaiting(self):
waiting_states = [self.WAITING, self.WAITING_SIGNAL]
if self._server._isAsynchronous():
return self.getStatus().addCallback(lambda st: st in waiting_states)
return self.getStatus() in waiting_states
def __wait(self, pred, poll, target=None):
_async = target is not None
def __checkPred(res):
if res:
target.callback(self)
return res
else:
task.deferLater(reactor, poll, self.__wait, pred, poll, target)
return
if _async:
return pred().addCallback(__checkPred)
else:
while not pred():
time.sleep(poll)
return True
def waitResult(self, poll=1):
if self._server._isAsynchronous():
d = Deferred()
self.__wait(self.isFinished, poll, d)
return d.addCallback(lambda _: self.getResult())
else:
self.__wait(self.isFinished, poll)
return self.getResult()
def waitQuestion(self, poll=1):
if self._server._isAsynchronous():
d = Deferred()
self.__wait(self.isWaiting, poll, d)
return d.addCallback(lambda _: self)
else:
self.__wait(self.isWaiting, poll)
return self
def answer(self, inputs):
return self._server.answerWorkflowInput(self, inputs)
class Rights(object):
READ = ord('r')
WRITE = ord('c')
EXECUTE = ord('x')
[docs]class FinderResult(object):
implements(ITypedValue)
def __init__(self, server, holder):
self._server = server
self._holder = holder
#: FinderResult id
self.id = holder._id
#: FinderResult uri
self.uri = holder._dunesUri
#: FinderResult properties (dict)
self.properties = self.__readProperties(holder._properties)
def __readProperties(self, props):
res = {}
for i in props._item:
res[i._name] = i._value
return res
def type(self):
return self._holder._type
def value(self):
return self.uri