source: trunk/morituri/common/task.py @ 437

Revision 437, 13.8 KB checked in by thomas, 2 years ago (diff)
  • morituri/common/task.py: Add more debug.
Line 
1# -*- Mode: Python; test-case-name: morituri.test.test_common_task -*-
2# vi:si:et:sw=4:sts=4:ts=4
3
4# Morituri - for those about to RIP
5
6# Copyright (C) 2009 Thomas Vander Stichele
7
8# This file is part of morituri.
9#
10# morituri is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# morituri is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with morituri.  If not, see <http://www.gnu.org/licenses/>.
22
23import sys
24
25import gobject
26
27from morituri.common import log
28
29class TaskException(Exception):
30    """
31    I wrap an exception that happened during task execution.
32    """
33
34    exception = None # original exception
35
36    def __init__(self, exception, message=None):
37        self.exception = exception
38        self.exceptionMessage = message
39        self.args = (exception, message, )
40
41class Task(object, log.Loggable):
42    """
43    I wrap a task in an asynchronous interface.
44    I can be listened to for starting, stopping, description changes
45    and progress updates.
46
47    I communicate an error by setting self.exception to an exception and
48    stopping myself from running.
49    The listener can then handle the Task.exception.
50
51    @ivar  description: what am I doing
52    @ivar  exception:   set if an exception happened during the task
53                        execution.  Will be raised through run() at the end.
54    """
55    logCategory = 'Task'
56
57    description = 'I am doing something.'
58
59    progress = 0.0
60    increment = 0.01
61    running = False
62    runner = None
63    exception = None
64    exceptionMessage = None
65    exceptionTraceback = None
66
67    _listeners = None
68
69
70    ### subclass methods
71    def start(self, runner):
72        """
73        Start the task.
74
75        Subclasses should chain up to me at the beginning.
76        """
77        self.debug('starting')
78        self.setProgress(self.progress)
79        self.running = True
80        self.runner = runner
81        self._notifyListeners('started')
82
83    def stop(self):
84        """
85        Stop the task.
86
87        Subclasses should chain up to me at the end.
88
89        Listeners will get notified that the task is stopped,
90        whether successfully or with an exception.
91        """
92        self.debug('stopping')
93        self.running = False
94        self.runner = None
95        self._notifyListeners('stopped')
96
97    ### base class methods
98    def setProgress(self, value):
99        """
100        Notify about progress changes bigger than the increment.
101        Called by subclass implementations as the task progresses.
102        """
103        if value - self.progress > self.increment or value >= 1.0 or value == 0.0:
104            self.progress = value
105            self._notifyListeners('progressed', value)
106            self.log('notifying progress: %r', value)
107       
108    def setDescription(self, description):
109        if description != self.description:
110            self._notifyListeners('described', description)
111            self.description = description
112
113    # FIXME: does not actually raise
114    def setAndRaiseException(self, exception):
115        """
116        Call this to set a synthetically created exception (and not one
117        that was actually raised and caught)
118        """
119        import traceback
120
121        stack = traceback.extract_stack()[:-1]
122        (filename, line, func, text) = stack[-1]
123        exc = exception.__class__.__name__
124        msg = ""
125        # a shortcut to extract a useful message out of most exceptions
126        # for now
127        if str(exception):
128            msg = ": %s" % str(exception)
129        line = "exception %(exc)s at %(filename)s:%(line)s: %(func)s()%(msg)s" \
130            % locals()
131
132        self.exception = exception
133        self.exceptionMessage = line
134        self.exceptionTraceback = traceback.format_exc()
135        self.debug('set exception, %r' % self.exceptionMessage)
136
137    def setException(self, exception):
138        import traceback
139
140        self.exception = exception
141        self.exceptionMessage = log.getExceptionMessage(exception)
142        self.exceptionTraceback = traceback.format_exc()
143        self.debug('set exception, %r' % self.exceptionMessage)
144
145    def addListener(self, listener):
146        """
147        Add a listener for task status changes.
148
149        Listeners should implement started, stopped, and progressed.
150        """
151        self.debug('Adding listener %r', listener)
152        if not self._listeners:
153            self._listeners = []
154        self._listeners.append(listener)
155
156    def _notifyListeners(self, methodName, *args, **kwargs):
157        if self._listeners:
158            for l in self._listeners:
159                getattr(l, methodName)(self, *args, **kwargs)
160
161# FIXME: should this become a real interface, like in zope ?
162class ITaskListener(object):
163    """
164    I am an interface for objects listening to tasks.
165    """
166    ### listener callbacks
167    def progressed(self, task, value):
168        """
169        Implement me to be informed about progress.
170
171        @type  value: float
172        @param value: progress, from 0.0 to 1.0
173        """
174
175    def described(self, task, description):
176        """
177        Implement me to be informed about description changes.
178
179        @type  description: str
180        @param description: description
181        """
182
183    def started(self, task):
184        """
185        Implement me to be informed about the task starting.
186        """
187
188    def stopped(self, task):
189        """
190        Implement me to be informed about the task stopping.
191        If the task had an error, task.exception will be set.
192        """
193
194
195
196# this is a Dummy task that can be used to test if this works at all
197class DummyTask(Task):
198    def start(self, runner):
199        Task.start(self, runner)
200        self.runner.schedule(1.0, self._wind)
201
202    def _wind(self):
203        self.setProgress(min(self.progress + 0.1, 1.0))
204
205        if self.progress >= 1.0:
206            self.stop()
207            return
208
209        self.runner.schedule(1.0, self._wind)
210
211class BaseMultiTask(Task, ITaskListener):
212    """
213    I perform multiple tasks.
214
215    @ivar tasks: the tasks to run
216    @type tasks: list of L{Task}
217    """
218
219    description = 'Doing various tasks'
220    tasks = None
221
222    def __init__(self):
223        self.tasks = []
224        self._task = 0
225         
226    def addTask(self, task):
227        """
228        Add a task.
229
230        @type task: L{Task}
231        """
232        if self.tasks is None:
233            self.tasks = []
234        self.tasks.append(task)
235
236    def start(self, runner):
237        """
238        Start tasks.
239
240        Tasks can still be added while running.  For example,
241        a first task can determine how many additional tasks to run.
242        """
243        Task.start(self, runner)
244
245        # initialize task tracking
246        if not self.tasks:
247            self.warning('no tasks')
248        self._generic = self.description
249
250        self.next()
251
252    def next(self):
253        """
254        Start the next task.
255        """
256        try:
257            # start next task
258            task = self.tasks[self._task]
259            self._task += 1
260            self.debug('BaseMultiTask.next(): starting task %d of %d: %r',
261                self._task, len(self.tasks), task)
262            self.setDescription("%s (%d of %d) ..." % (
263                task.description, self._task, len(self.tasks)))
264            task.addListener(self)
265            task.start(self.runner)
266        except Exception, e:
267            self.setException(e)
268            self.debug('Got exception during next: %r', self.exceptionMessage)
269            self.stop()
270            return
271       
272    ### ITaskListener methods
273    def started(self, task):
274        pass
275
276    def progressed(self, task, value):
277        pass
278
279    def stopped(self, task):
280        """
281        Subclasses should chain up to me at the end of their implementation.
282        They should fall through to chaining up if there is an exception.
283        """
284        self.log('BaseMultiTask.stopped: task %r', task)
285        if task.exception:
286            self.log('BaseMultiTask.stopped: exception %r',
287                task.exceptionMessage)
288            self.exception = task.exception
289            self.exceptionMessage = task.exceptionMessage
290            self.stop()
291            return
292
293        if self._task == len(self.tasks):
294            self.log('BaseMultiTask.stopped: all tasks done')
295            self.stop()
296            return
297
298        # pick another
299        self.log('BaseMultiTask.stopped: pick next task')
300        self.next()
301
302
303class MultiSeparateTask(BaseMultiTask):
304    """
305    I perform multiple tasks.
306    I track progress of each individual task, going back to 0 for each task.
307    """
308    description = 'Doing various tasks separately'
309
310    def start(self, runner):
311        self.debug('MultiSeparateTask.start()')
312        BaseMultiTask.start(self, runner)
313
314    def next(self):
315        self.debug('MultiSeparateTask.next()')
316        # start next task
317        self.progress = 0.0 # reset progress for each task
318        BaseMultiTask.next(self)
319       
320    ### ITaskListener methods
321    def progressed(self, task, value):
322        self.setProgress(value)
323
324    def described(self, description):
325        self.setDescription("%s (%d of %d) ..." % (
326            description, self._task, len(self.tasks)))
327
328class MultiCombinedTask(BaseMultiTask):
329    """
330    I perform multiple tasks.
331    I track progress as a combined progress on all tasks on task granularity.
332    """
333
334    description = 'Doing various tasks combined'
335    _stopped = 0
336       
337    ### ITaskListener methods
338    def progressed(self, task, value):
339        self.setProgress(float(self._stopped + value) / len(self.tasks))
340
341    def stopped(self, task):
342        self._stopped += 1
343        self.setProgress(float(self._stopped) / len(self.tasks))
344        BaseMultiTask.stopped(self, task)
345
346class TaskRunner(object, log.Loggable):
347    """
348    I am a base class for task runners.
349    Task runners should be reusable.
350    """
351    logCategory = 'TaskRunner'
352
353    def run(self, task):
354        """
355        Run the given task.
356
357        @type  task: Task
358        """
359        raise NotImplementedError
360
361    ### methods for tasks to call
362    def schedule(self, delta, callable, *args, **kwargs):
363        """
364        Schedule a single future call.
365
366        Subclasses should implement this.
367
368        @type  delta: float
369        @param delta: time in the future to schedule call for, in seconds.
370        """
371        raise NotImplementedError
372
373
374class SyncRunner(TaskRunner, ITaskListener):
375    """
376    I run the task synchronously in a gobject MainLoop.
377    """
378    def __init__(self, verbose=True):
379        self._verbose = verbose
380        self._longest = 0 # longest string shown; for clearing
381
382    def run(self, task, verbose=None, skip=False):
383        self.debug('run task %r', task)
384        self._task = task
385        self._verboseRun = self._verbose
386        if verbose is not None:
387            self._verboseRun = verbose
388        self._skip = skip
389
390        self._loop = gobject.MainLoop()
391        self._task.addListener(self)
392        # only start the task after going into the mainloop,
393        # otherwise the task might complete before we are in it
394        gobject.timeout_add(0L, self._startWrap, self._task)
395        self.debug('run loop')
396        self._loop.run()
397
398        self.debug('done running task %r', task)
399        if task.exception:
400            # catch the exception message
401            # FIXME: this gave a traceback in the logging module
402            self.debug('raising TaskException for %r, %r' % (
403                task.exceptionMessage, task.exceptionTraceback))
404            msg = task.exceptionMessage
405            if task.exceptionTraceback:
406                msg += "\n" + task.exceptionTraceback
407            raise TaskException(task.exception, message=msg)
408
409    def _startWrap(self, task):
410        # wrap task start such that we can report any exceptions and
411        # never hang
412        try:
413            self.debug('start task %r' % task)
414            task.start(self)
415        except Exception, e:
416            # getExceptionMessage uses global exception state that doesn't
417            # hang around, so store the message
418            task.setException(e)
419            self.debug('exception during start: %r', task.exceptionMessage)
420            self.stopped(task)
421
422
423    def schedule(self, delta, callable, *args, **kwargs):
424        def c():
425            callable(*args, **kwargs)
426            return False
427        gobject.timeout_add(int(delta * 1000L), c)
428
429    ### ITaskListener methods
430    def progressed(self, task, value):
431        if not self._verboseRun:
432            return
433
434        self._report()
435
436        if value >= 1.0:
437            if self._skip:
438                self._output('%s %3d %%' % (
439                    self._task.description, 100.0))
440            else:
441                # clear with whitespace
442                sys.stdout.write("%s\r" % (' ' * self._longest, ))
443
444    def _output(self, what, newline=False, ret=True):
445        sys.stdout.write(what)
446        sys.stdout.write(' ' * (self._longest - len(what)))
447        if ret:
448            sys.stdout.write('\r')
449        if newline:
450            sys.stdout.write('\n')
451        sys.stdout.flush()
452        if len(what) > self._longest:
453            #print; print 'setting longest', self._longest; print
454            self._longest = len(what)
455
456    def described(self, task, description):
457        if self._verboseRun:
458            self._report()
459
460    def stopped(self, task):
461        self.debug('stopped task %r', task)
462        self.progressed(task, 1.0)
463        self._loop.quit()
464
465    def _report(self):
466        self._output('%s %3d %%' % (
467            self._task.description, self._task.progress * 100.0))
468
469if __name__ == '__main__':
470    task = DummyTask()
471    runner = SyncRunner()
472    runner.run(task)
Note: See TracBrowser for help on using the repository browser.