root/xmppony/dispatcher.py @ 0:2b766784ee99

Revision 0:2b766784ee99, 17.6 kB (checked in by elghinn, 18 months ago)

* initial import

Line 
1##   transports.py
2##
3##   Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
4##
5##   This program is free software; you can redistribute it and/or modify
6##   it under the terms of the GNU General Public License as published by
7##   the Free Software Foundation; either version 2, or (at your option)
8##   any later version.
9##
10##   This program is distributed in the hope that it will be useful,
11##   but WITHOUT ANY WARRANTY; without even the implied warranty of
12##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13##   GNU General Public License for more details.
14
15# $Id: dispatcher.py,v 1.42 2007/05/18 23:18:36 normanr Exp $
16
17"""
18Main xmpppy mechanism. Provides library with methods to assign different handlers
19to different XMPP stanzas.
20Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
21Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
22"""
23
24import simplexml,time,sys
25from protocol import *
26from client import PlugIn
27
28DefaultTimeout=25
29ID=0
30
31class Dispatcher(PlugIn):
32    """ Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
33        Can be plugged out/in to restart these headers (used for SASL f.e.). """
34    def __init__(self):
35        PlugIn.__init__(self)
36        DBG_LINE='dispatcher'
37        self.handlers={}
38        self._expected={}
39        self._defaultHandler=None
40        self._pendingExceptions=[]
41        self._eventHandler=None
42        self._cycleHandlers=[]
43        self._exported_methods=[self.Process,self.RegisterHandler,self.RegisterDefaultHandler,\
44        self.RegisterEventHandler,self.UnregisterCycleHandler,self.RegisterCycleHandler,\
45        self.RegisterHandlerOnce,self.UnregisterHandler,self.RegisterProtocol,\
46        self.WaitForResponse,self.SendAndWaitForResponse,self.send,self.disconnect,\
47        self.SendAndCallForResponse, ]
48
49    def dumpHandlers(self):
50        """ Return set of user-registered callbacks in it's internal format.
51            Used within the library to carry user handlers set over Dispatcher replugins. """
52        return self.handlers
53    def restoreHandlers(self,handlers):
54        """ Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
55            Used within the library to carry user handlers set over Dispatcher replugins. """
56        self.handlers=handlers
57
58    def _init(self):
59        """ Registers default namespaces/protocols/handlers. Used internally.  """
60        self.RegisterNamespace('unknown')
61        self.RegisterNamespace(NS_STREAMS)
62        self.RegisterNamespace(self._owner.defaultNamespace)
63        self.RegisterProtocol('iq',Iq)
64        self.RegisterProtocol('presence',Presence)
65        self.RegisterProtocol('message',Message)
66        self.RegisterDefaultHandler(self.returnStanzaHandler)
67        self.RegisterHandler('error',self.streamErrorHandler,xmlns=NS_STREAMS)
68
69    def plugin(self, owner):
70        """ Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally."""
71        self._init()
72        for method in self._old_owners_methods:
73            if method.__name__=='send': self._owner_send=method; break
74        self._owner.lastErrNode=None
75        self._owner.lastErr=None
76        self._owner.lastErrCode=None
77        self.StreamInit()
78
79    def plugout(self):
80        """ Prepares instance to be destructed. """
81        self.Stream.dispatch=None
82        self.Stream.DEBUG=None
83        self.Stream.features=None
84        self.Stream.destroy()
85
86    def StreamInit(self):
87        """ Send an initial stream header. """
88        self.Stream=simplexml.NodeBuilder()
89        self.Stream._dispatch_depth=2
90        self.Stream.dispatch=self.dispatch
91        self.Stream.stream_header_received=self._check_stream_start
92        self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER)
93        self.Stream.DEBUG=self._owner.DEBUG
94        self.Stream.features=None
95        self._metastream=Node('stream:stream')
96        self._metastream.setNamespace(self._owner.Namespace)
97        self._metastream.setAttr('version','1.0')
98        self._metastream.setAttr('xmlns:stream',NS_STREAMS)
99        self._metastream.setAttr('to',self._owner.Server)
100        self._owner.send("<?xml version='1.0'?>%s>"%str(self._metastream)[:-2])
101
102    def _check_stream_start(self,ns,tag,attrs):
103        if ns<>NS_STREAMS or tag<>'stream':
104            raise ValueError('Incorrect stream start: (%s,%s). Terminating.'%(tag,ns))
105
106    def Process(self, timeout=0):
107        """ Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
108            Returns:
109            1) length of processed data if some data were processed;
110            2) '0' string if no data were processed but link is alive;
111            3) 0 (zero) if underlying connection is closed.
112            Take note that in case of disconnection detect during Process() call
113            disconnect handlers are called automatically.
114        """
115        for handler in self._cycleHandlers: handler(self)
116        if len(self._pendingExceptions) > 0:
117            _pendingException = self._pendingExceptions.pop()
118            raise _pendingException[0], _pendingException[1], _pendingException[2]
119        if self._owner.Connection.pending_data(timeout):
120            try: data=self._owner.Connection.receive()
121            except IOError: return
122            self.Stream.Parse(data)
123            if len(self._pendingExceptions) > 0:
124                _pendingException = self._pendingExceptions.pop()
125                raise _pendingException[0], _pendingException[1], _pendingException[2]
126            if data: return len(data)
127        return '0'      # It means that nothing is received but link is alive.
128       
129    def RegisterNamespace(self,xmlns,order='info'):
130        """ Creates internal structures for newly registered namespace.
131            You can register handlers for this namespace afterwards. By default one namespace
132            already registered (jabber:client or jabber:component:accept depending on context. """
133        self.DEBUG('Registering namespace "%s"'%xmlns,order)
134        self.handlers[xmlns]={}
135        self.RegisterProtocol('unknown',Protocol,xmlns=xmlns)
136        self.RegisterProtocol('default',Protocol,xmlns=xmlns)
137
138    def RegisterProtocol(self,tag_name,Proto,xmlns=None,order='info'):
139        """ Used to declare some top-level stanza name to dispatcher.
140           Needed to start registering handlers for such stanzas.
141           Iq, message and presence protocols are registered by default. """
142        if not xmlns: xmlns=self._owner.defaultNamespace
143        self.DEBUG('Registering protocol "%s" as %s(%s)'%(tag_name,Proto,xmlns), order)
144        self.handlers[xmlns][tag_name]={type:Proto, 'default':[]}
145
146    def RegisterNamespaceHandler(self,xmlns,handler,typ='',ns='', makefirst=0, system=0):
147        """ Register handler for processing all stanzas for specified namespace. """
148        self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, system)
149
150    def RegisterHandler(self,name,handler,typ='',ns='',xmlns=None, makefirst=0, system=0):
151        """Register user callback as stanzas handler of declared type. Callback must take
152           (if chained, see later) arguments: dispatcher instance (for replying), incomed
153           return of previous handlers.
154           The callback must raise xmpp.NodeProcessed just before return if it want preven
155           callbacks to be called with the same stanza as argument _and_, more importantly
156           library from returning stanza to sender with error set (to be enabled in 0.2 ve
157            Arguments:
158              "name" - name of stanza. F.e. "iq".
159              "handler" - user callback.
160              "typ" - value of stanza's "type" attribute. If not specified any value match
161              "ns" - namespace of child that stanza must contain.
162              "chained" - chain together output of several handlers.
163              "makefirst" - insert handler in the beginning of handlers list instead of
164                adding it to the end. Note that more common handlers (i.e. w/o "typ" and "
165                will be called first nevertheless.
166              "system" - call handler even if NodeProcessed Exception were raised already.
167            """
168        if not xmlns: xmlns=self._owner.defaultNamespace
169        self.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)'%(handler,name,typ,ns,xmlns), 'info')
170        if not typ and not ns: typ='default'
171        if not self.handlers.has_key(xmlns): self.RegisterNamespace(xmlns,'warn')
172        if not self.handlers[xmlns].has_key(name): self.RegisterProtocol(name,Protocol,xmlns,'warn')
173        if not self.handlers[xmlns][name].has_key(typ+ns): self.handlers[xmlns][name][typ+ns]=[]
174        if makefirst: self.handlers[xmlns][name][typ+ns].insert(0,{'func':handler,'system':system})
175        else: self.handlers[xmlns][name][typ+ns].append({'func':handler,'system':system})
176
177    def RegisterHandlerOnce(self,name,handler,typ='',ns='',xmlns=None,makefirst=0, system=0):
178        """ Unregister handler after first call (not implemented yet). """
179        if not xmlns: xmlns=self._owner.defaultNamespace
180        self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
181
182    def UnregisterHandler(self,name,handler,typ='',ns='',xmlns=None):
183        """ Unregister handler. "typ" and "ns" must be specified exactly the same as with registering."""
184        if not xmlns: xmlns=self._owner.defaultNamespace
185        if not self.handlers.has_key(xmlns): return
186        if not typ and not ns: typ='default'
187        for pack in self.handlers[xmlns][name][typ+ns]:
188            if handler==pack['func']: break
189        else: pack=None
190        try: self.handlers[xmlns][name][typ+ns].remove(pack)
191        except ValueError: pass
192
193    def RegisterDefaultHandler(self,handler):
194        """ Specify the handler that will be used if no NodeProcessed exception were raised.
195            This is returnStanzaHandler by default. """
196        self._defaultHandler=handler
197
198    def RegisterEventHandler(self,handler):
199        """ Register handler that will process events. F.e. "FILERECEIVED" event. """
200        self._eventHandler=handler
201
202    def returnStanzaHandler(self,conn,stanza):
203        """ Return stanza back to the sender with <feature-not-implemennted/> error set. """
204        if stanza.getType() in ['get','set']:
205            conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED))
206
207    def streamErrorHandler(self,conn,error):
208        name,text='error',error.getData()
209        for tag in error.getChildren():
210            if tag.getNamespace()==NS_XMPP_STREAMS:
211                if tag.getName()=='text': text=tag.getData()
212                else: name=tag.getName()
213        if name in stream_exceptions.keys(): exc=stream_exceptions[name]
214        else: exc=StreamError
215        raise exc((name,text))
216
217    def RegisterCycleHandler(self,handler):
218        """ Register handler that will be called on every Dispatcher.Process() call. """
219        if handler not in self._cycleHandlers: self._cycleHandlers.append(handler)
220
221    def UnregisterCycleHandler(self,handler):
222        """ Unregister handler that will is called on every Dispatcher.Process() call."""
223        if handler in self._cycleHandlers: self._cycleHandlers.remove(handler)
224
225    def Event(self,realm,event,data):
226        """ Raise some event. Takes three arguments:
227            1) "realm" - scope of event. Usually a namespace.
228            2) "event" - the event itself. F.e. "SUCESSFULL SEND".
229            3) data that comes along with event. Depends on event."""
230        if self._eventHandler: self._eventHandler(realm,event,data)
231
232    def dispatch(self,stanza,session=None,direct=0):
233        """ Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
234            Called internally. """
235        if not session: session=self
236        session.Stream._mini_dom=None
237        name=stanza.getName()
238
239        if not direct and self._owner._route:
240            if name == 'route':
241                if stanza.getAttr('error') == None:
242                    if len(stanza.getChildren()) == 1:
243                        stanza = stanza.getChildren()[0]
244                        name=stanza.getName()
245                    else:
246                        for each in stanza.getChildren():
247                            self.dispatch(each,session,direct=1)
248                        return
249            elif name == 'presence':
250                return
251            elif name in ('features','bind'):
252                pass
253            else:
254                raise UnsupportedStanzaType(name)
255
256        if name=='features': session.Stream.features=stanza
257
258        xmlns=stanza.getNamespace()
259        if not self.handlers.has_key(xmlns):
260            self.DEBUG("Unknown namespace: " + xmlns,'warn')
261            xmlns='unknown'
262        if not self.handlers[xmlns].has_key(name):
263            self.DEBUG("Unknown stanza: " + name,'warn')
264            name='unknown'
265        else:
266            self.DEBUG("Got %s/%s stanza"%(xmlns,name), 'ok')
267
268        if stanza.__class__.__name__=='Node': stanza=self.handlers[xmlns][name][type](node=stanza)
269
270        typ=stanza.getType()
271        if not typ: typ=''
272        stanza.props=stanza.getProperties()
273        ID=stanza.getID()
274
275        session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID),'ok')
276
277        list=['default']                                                     # we will use all handlers:
278        if self.handlers[xmlns][name].has_key(typ): list.append(typ)                # from very common...
279        for prop in stanza.props:
280            if self.handlers[xmlns][name].has_key(prop): list.append(prop)
281            if typ and self.handlers[xmlns][name].has_key(typ+prop): list.append(typ+prop)  # ...to very particular
282
283        chain=self.handlers[xmlns]['default']['default']
284        for key in list:
285            if key: chain = chain + self.handlers[xmlns][name][key]
286
287        output=''
288        if session._expected.has_key(ID):
289            user=0
290            if type(session._expected[ID])==type(()):
291                cb,args=session._expected[ID]
292                session.DEBUG("Expected stanza arrived. Callback %s(%s) found!"%(cb,args),'ok')
293                try: cb(session,stanza,**args)
294                except Exception, typ:
295                    if typ.__class__.__name__<>'NodeProcessed': raise
296            else:
297                session.DEBUG("Expected stanza arrived!",'ok')
298                session._expected[ID]=stanza
299        else: user=1
300        for handler in chain:
301            if user or handler['system']:
302                try:
303                    handler['func'](session,stanza)
304                except Exception, typ:
305                    if typ.__class__.__name__<>'NodeProcessed':
306                        self._pendingExceptions.insert(0, sys.exc_info())
307                        return
308                    user=0
309        if user and self._defaultHandler: self._defaultHandler(session,stanza)
310
311    def WaitForResponse(self, ID, timeout=DefaultTimeout):
312        """ Block and wait until stanza with specific "id" attribute will come.
313            If no such stanza is arrived within timeout, return None.
314            If operation failed for some reason then owner's attributes
315            lastErrNode, lastErr and lastErrCode are set accordingly. """
316        self._expected[ID]=None
317        has_timed_out=0
318        abort_time=time.time() + timeout
319        self.DEBUG("Waiting for ID:%s with timeout %s..." % (ID,timeout),'wait')
320        while not self._expected[ID]:
321            if not self.Process(0.04):
322                self._owner.lastErr="Disconnect"
323                return None
324            if time.time() > abort_time:
325                self._owner.lastErr="Timeout"
326                return None
327        response=self._expected[ID]
328        del self._expected[ID]
329        if response.getErrorCode():
330            self._owner.lastErrNode=response
331            self._owner.lastErr=response.getError()
332            self._owner.lastErrCode=response.getErrorCode()
333        return response
334
335    def SendAndWaitForResponse(self, stanza, timeout=DefaultTimeout):
336        """ Put stanza on the wire and wait for recipient's response to it. """
337        return self.WaitForResponse(self.send(stanza),timeout)
338
339    def SendAndCallForResponse(self, stanza, func, args={}):
340        """ Put stanza on the wire and call back when recipient replies.
341            Additional callback arguments can be specified in args. """
342        self._expected[self.send(stanza)]=(func,args)
343
344    def send(self,stanza):
345        """ Serialise stanza and put it on the wire. Assign an unique ID to it before send.
346            Returns assigned ID."""
347        if type(stanza) in [type(''), type(u'')]: return self._owner_send(stanza)
348        if not isinstance(stanza,Protocol): _ID=None
349        elif not stanza.getID():
350            global ID
351            ID+=1
352            _ID=`ID`
353            stanza.setID(_ID)
354        else: _ID=stanza.getID()
355        if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from',self._owner._registered_name)
356        if self._owner._route and stanza.getName()!='bind':
357            to=self._owner.Server
358            if stanza.getTo() and stanza.getTo().getDomain():
359                to=stanza.getTo().getDomain()
360            frm=stanza.getFrom()
361            if frm.getDomain():
362                frm=frm.getDomain()
363            route=Protocol('route',to=to,frm=frm,payload=[stanza])
364            stanza=route
365        stanza.setNamespace(self._owner.Namespace)
366        stanza.setParent(self._metastream)
367        self._owner_send(stanza)
368        return _ID
369
370    def disconnect(self):
371        """ Send a stream terminator and and handle all incoming stanzas before stream closure. """
372        self._owner_send('</stream:stream>')
373        while self.Process(1): pass
Note: See TracBrowser for help on using the browser.