| 1 | # -*- Mode: Python; test-case-name: morituri.test.test_common_task -*- |
|---|
| 2 | # vi:si:et:sw=4:sts=4:ts=4 |
|---|
| 3 | |
|---|
| 4 | # Morituri - for those about to RIP |
|---|
| 5 | |
|---|
| 6 | # Copyright (C) 2009 Thomas Vander Stichele |
|---|
| 7 | |
|---|
| 8 | # This file is part of morituri. |
|---|
| 9 | # |
|---|
| 10 | # morituri is free software: you can redistribute it and/or modify |
|---|
| 11 | # it under the terms of the GNU General Public License as published by |
|---|
| 12 | # the Free Software Foundation, either version 3 of the License, or |
|---|
| 13 | # (at your option) any later version. |
|---|
| 14 | # |
|---|
| 15 | # morituri is distributed in the hope that it will be useful, |
|---|
| 16 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|---|
| 17 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|---|
| 18 | # GNU General Public License for more details. |
|---|
| 19 | # |
|---|
| 20 | # You should have received a copy of the GNU General Public License |
|---|
| 21 | # along with morituri. If not, see <http://www.gnu.org/licenses/>. |
|---|
| 22 | |
|---|
| 23 | import sys |
|---|
| 24 | |
|---|
| 25 | import gobject |
|---|
| 26 | |
|---|
| 27 | from morituri.common import log |
|---|
| 28 | |
|---|
| 29 | class TaskException(Exception): |
|---|
| 30 | """ |
|---|
| 31 | I wrap an exception that happened during task execution. |
|---|
| 32 | """ |
|---|
| 33 | |
|---|
| 34 | exception = None # original exception |
|---|
| 35 | |
|---|
| 36 | def __init__(self, exception, message=None): |
|---|
| 37 | self.exception = exception |
|---|
| 38 | self.exceptionMessage = message |
|---|
| 39 | self.args = (exception, message, ) |
|---|
| 40 | |
|---|
| 41 | class Task(object, log.Loggable): |
|---|
| 42 | """ |
|---|
| 43 | I wrap a task in an asynchronous interface. |
|---|
| 44 | I can be listened to for starting, stopping, description changes |
|---|
| 45 | and progress updates. |
|---|
| 46 | |
|---|
| 47 | I communicate an error by setting self.exception to an exception and |
|---|
| 48 | stopping myself from running. |
|---|
| 49 | The listener can then handle the Task.exception. |
|---|
| 50 | |
|---|
| 51 | @ivar description: what am I doing |
|---|
| 52 | @ivar exception: set if an exception happened during the task |
|---|
| 53 | execution. Will be raised through run() at the end. |
|---|
| 54 | """ |
|---|
| 55 | logCategory = 'Task' |
|---|
| 56 | |
|---|
| 57 | description = 'I am doing something.' |
|---|
| 58 | |
|---|
| 59 | progress = 0.0 |
|---|
| 60 | increment = 0.01 |
|---|
| 61 | running = False |
|---|
| 62 | runner = None |
|---|
| 63 | exception = None |
|---|
| 64 | exceptionMessage = None |
|---|
| 65 | exceptionTraceback = None |
|---|
| 66 | |
|---|
| 67 | _listeners = None |
|---|
| 68 | |
|---|
| 69 | |
|---|
| 70 | ### subclass methods |
|---|
| 71 | def start(self, runner): |
|---|
| 72 | """ |
|---|
| 73 | Start the task. |
|---|
| 74 | |
|---|
| 75 | Subclasses should chain up to me at the beginning. |
|---|
| 76 | """ |
|---|
| 77 | self.debug('starting') |
|---|
| 78 | self.setProgress(self.progress) |
|---|
| 79 | self.running = True |
|---|
| 80 | self.runner = runner |
|---|
| 81 | self._notifyListeners('started') |
|---|
| 82 | |
|---|
| 83 | def stop(self): |
|---|
| 84 | """ |
|---|
| 85 | Stop the task. |
|---|
| 86 | |
|---|
| 87 | Subclasses should chain up to me at the end. |
|---|
| 88 | |
|---|
| 89 | Listeners will get notified that the task is stopped, |
|---|
| 90 | whether successfully or with an exception. |
|---|
| 91 | """ |
|---|
| 92 | self.debug('stopping') |
|---|
| 93 | self.running = False |
|---|
| 94 | self.runner = None |
|---|
| 95 | self._notifyListeners('stopped') |
|---|
| 96 | |
|---|
| 97 | ### base class methods |
|---|
| 98 | def setProgress(self, value): |
|---|
| 99 | """ |
|---|
| 100 | Notify about progress changes bigger than the increment. |
|---|
| 101 | Called by subclass implementations as the task progresses. |
|---|
| 102 | """ |
|---|
| 103 | if value - self.progress > self.increment or value >= 1.0 or value == 0.0: |
|---|
| 104 | self.progress = value |
|---|
| 105 | self._notifyListeners('progressed', value) |
|---|
| 106 | self.log('notifying progress: %r', value) |
|---|
| 107 | |
|---|
| 108 | def setDescription(self, description): |
|---|
| 109 | if description != self.description: |
|---|
| 110 | self._notifyListeners('described', description) |
|---|
| 111 | self.description = description |
|---|
| 112 | |
|---|
| 113 | # FIXME: does not actually raise |
|---|
| 114 | def setAndRaiseException(self, exception): |
|---|
| 115 | """ |
|---|
| 116 | Call this to set a synthetically created exception (and not one |
|---|
| 117 | that was actually raised and caught) |
|---|
| 118 | """ |
|---|
| 119 | import traceback |
|---|
| 120 | |
|---|
| 121 | stack = traceback.extract_stack()[:-1] |
|---|
| 122 | (filename, line, func, text) = stack[-1] |
|---|
| 123 | exc = exception.__class__.__name__ |
|---|
| 124 | msg = "" |
|---|
| 125 | # a shortcut to extract a useful message out of most exceptions |
|---|
| 126 | # for now |
|---|
| 127 | if str(exception): |
|---|
| 128 | msg = ": %s" % str(exception) |
|---|
| 129 | line = "exception %(exc)s at %(filename)s:%(line)s: %(func)s()%(msg)s" \ |
|---|
| 130 | % locals() |
|---|
| 131 | |
|---|
| 132 | self.exception = exception |
|---|
| 133 | self.exceptionMessage = line |
|---|
| 134 | self.exceptionTraceback = traceback.format_exc() |
|---|
| 135 | self.debug('set exception, %r' % self.exceptionMessage) |
|---|
| 136 | |
|---|
| 137 | def setException(self, exception): |
|---|
| 138 | import traceback |
|---|
| 139 | |
|---|
| 140 | self.exception = exception |
|---|
| 141 | self.exceptionMessage = log.getExceptionMessage(exception) |
|---|
| 142 | self.exceptionTraceback = traceback.format_exc() |
|---|
| 143 | self.debug('set exception, %r' % self.exceptionMessage) |
|---|
| 144 | |
|---|
| 145 | def addListener(self, listener): |
|---|
| 146 | """ |
|---|
| 147 | Add a listener for task status changes. |
|---|
| 148 | |
|---|
| 149 | Listeners should implement started, stopped, and progressed. |
|---|
| 150 | """ |
|---|
| 151 | self.debug('Adding listener %r', listener) |
|---|
| 152 | if not self._listeners: |
|---|
| 153 | self._listeners = [] |
|---|
| 154 | self._listeners.append(listener) |
|---|
| 155 | |
|---|
| 156 | def _notifyListeners(self, methodName, *args, **kwargs): |
|---|
| 157 | if self._listeners: |
|---|
| 158 | for l in self._listeners: |
|---|
| 159 | getattr(l, methodName)(self, *args, **kwargs) |
|---|
| 160 | |
|---|
| 161 | # FIXME: should this become a real interface, like in zope ? |
|---|
| 162 | class ITaskListener(object): |
|---|
| 163 | """ |
|---|
| 164 | I am an interface for objects listening to tasks. |
|---|
| 165 | """ |
|---|
| 166 | ### listener callbacks |
|---|
| 167 | def progressed(self, task, value): |
|---|
| 168 | """ |
|---|
| 169 | Implement me to be informed about progress. |
|---|
| 170 | |
|---|
| 171 | @type value: float |
|---|
| 172 | @param value: progress, from 0.0 to 1.0 |
|---|
| 173 | """ |
|---|
| 174 | |
|---|
| 175 | def described(self, task, description): |
|---|
| 176 | """ |
|---|
| 177 | Implement me to be informed about description changes. |
|---|
| 178 | |
|---|
| 179 | @type description: str |
|---|
| 180 | @param description: description |
|---|
| 181 | """ |
|---|
| 182 | |
|---|
| 183 | def started(self, task): |
|---|
| 184 | """ |
|---|
| 185 | Implement me to be informed about the task starting. |
|---|
| 186 | """ |
|---|
| 187 | |
|---|
| 188 | def stopped(self, task): |
|---|
| 189 | """ |
|---|
| 190 | Implement me to be informed about the task stopping. |
|---|
| 191 | If the task had an error, task.exception will be set. |
|---|
| 192 | """ |
|---|
| 193 | |
|---|
| 194 | |
|---|
| 195 | |
|---|
| 196 | # this is a Dummy task that can be used to test if this works at all |
|---|
| 197 | class DummyTask(Task): |
|---|
| 198 | def start(self, runner): |
|---|
| 199 | Task.start(self, runner) |
|---|
| 200 | self.runner.schedule(1.0, self._wind) |
|---|
| 201 | |
|---|
| 202 | def _wind(self): |
|---|
| 203 | self.setProgress(min(self.progress + 0.1, 1.0)) |
|---|
| 204 | |
|---|
| 205 | if self.progress >= 1.0: |
|---|
| 206 | self.stop() |
|---|
| 207 | return |
|---|
| 208 | |
|---|
| 209 | self.runner.schedule(1.0, self._wind) |
|---|
| 210 | |
|---|
| 211 | class BaseMultiTask(Task, ITaskListener): |
|---|
| 212 | """ |
|---|
| 213 | I perform multiple tasks. |
|---|
| 214 | |
|---|
| 215 | @ivar tasks: the tasks to run |
|---|
| 216 | @type tasks: list of L{Task} |
|---|
| 217 | """ |
|---|
| 218 | |
|---|
| 219 | description = 'Doing various tasks' |
|---|
| 220 | tasks = None |
|---|
| 221 | |
|---|
| 222 | def __init__(self): |
|---|
| 223 | self.tasks = [] |
|---|
| 224 | self._task = 0 |
|---|
| 225 | |
|---|
| 226 | def addTask(self, task): |
|---|
| 227 | """ |
|---|
| 228 | Add a task. |
|---|
| 229 | |
|---|
| 230 | @type task: L{Task} |
|---|
| 231 | """ |
|---|
| 232 | if self.tasks is None: |
|---|
| 233 | self.tasks = [] |
|---|
| 234 | self.tasks.append(task) |
|---|
| 235 | |
|---|
| 236 | def start(self, runner): |
|---|
| 237 | """ |
|---|
| 238 | Start tasks. |
|---|
| 239 | |
|---|
| 240 | Tasks can still be added while running. For example, |
|---|
| 241 | a first task can determine how many additional tasks to run. |
|---|
| 242 | """ |
|---|
| 243 | Task.start(self, runner) |
|---|
| 244 | |
|---|
| 245 | # initialize task tracking |
|---|
| 246 | if not self.tasks: |
|---|
| 247 | self.warning('no tasks') |
|---|
| 248 | self._generic = self.description |
|---|
| 249 | |
|---|
| 250 | self.next() |
|---|
| 251 | |
|---|
| 252 | def next(self): |
|---|
| 253 | """ |
|---|
| 254 | Start the next task. |
|---|
| 255 | """ |
|---|
| 256 | try: |
|---|
| 257 | # start next task |
|---|
| 258 | task = self.tasks[self._task] |
|---|
| 259 | self._task += 1 |
|---|
| 260 | self.debug('BaseMultiTask.next(): starting task %d of %d: %r', |
|---|
| 261 | self._task, len(self.tasks), task) |
|---|
| 262 | self.setDescription("%s (%d of %d) ..." % ( |
|---|
| 263 | task.description, self._task, len(self.tasks))) |
|---|
| 264 | task.addListener(self) |
|---|
| 265 | task.start(self.runner) |
|---|
| 266 | except Exception, e: |
|---|
| 267 | self.setException(e) |
|---|
| 268 | self.debug('Got exception during next: %r', self.exceptionMessage) |
|---|
| 269 | self.stop() |
|---|
| 270 | return |
|---|
| 271 | |
|---|
| 272 | ### ITaskListener methods |
|---|
| 273 | def started(self, task): |
|---|
| 274 | pass |
|---|
| 275 | |
|---|
| 276 | def progressed(self, task, value): |
|---|
| 277 | pass |
|---|
| 278 | |
|---|
| 279 | def stopped(self, task): |
|---|
| 280 | """ |
|---|
| 281 | Subclasses should chain up to me at the end of their implementation. |
|---|
| 282 | They should fall through to chaining up if there is an exception. |
|---|
| 283 | """ |
|---|
| 284 | self.log('BaseMultiTask.stopped: task %r', task) |
|---|
| 285 | if task.exception: |
|---|
| 286 | self.log('BaseMultiTask.stopped: exception %r', |
|---|
| 287 | task.exceptionMessage) |
|---|
| 288 | self.exception = task.exception |
|---|
| 289 | self.exceptionMessage = task.exceptionMessage |
|---|
| 290 | self.stop() |
|---|
| 291 | return |
|---|
| 292 | |
|---|
| 293 | if self._task == len(self.tasks): |
|---|
| 294 | self.log('BaseMultiTask.stopped: all tasks done') |
|---|
| 295 | self.stop() |
|---|
| 296 | return |
|---|
| 297 | |
|---|
| 298 | # pick another |
|---|
| 299 | self.log('BaseMultiTask.stopped: pick next task') |
|---|
| 300 | self.next() |
|---|
| 301 | |
|---|
| 302 | |
|---|
| 303 | class MultiSeparateTask(BaseMultiTask): |
|---|
| 304 | """ |
|---|
| 305 | I perform multiple tasks. |
|---|
| 306 | I track progress of each individual task, going back to 0 for each task. |
|---|
| 307 | """ |
|---|
| 308 | description = 'Doing various tasks separately' |
|---|
| 309 | |
|---|
| 310 | def start(self, runner): |
|---|
| 311 | self.debug('MultiSeparateTask.start()') |
|---|
| 312 | BaseMultiTask.start(self, runner) |
|---|
| 313 | |
|---|
| 314 | def next(self): |
|---|
| 315 | self.debug('MultiSeparateTask.next()') |
|---|
| 316 | # start next task |
|---|
| 317 | self.progress = 0.0 # reset progress for each task |
|---|
| 318 | BaseMultiTask.next(self) |
|---|
| 319 | |
|---|
| 320 | ### ITaskListener methods |
|---|
| 321 | def progressed(self, task, value): |
|---|
| 322 | self.setProgress(value) |
|---|
| 323 | |
|---|
| 324 | def described(self, description): |
|---|
| 325 | self.setDescription("%s (%d of %d) ..." % ( |
|---|
| 326 | description, self._task, len(self.tasks))) |
|---|
| 327 | |
|---|
| 328 | class MultiCombinedTask(BaseMultiTask): |
|---|
| 329 | """ |
|---|
| 330 | I perform multiple tasks. |
|---|
| 331 | I track progress as a combined progress on all tasks on task granularity. |
|---|
| 332 | """ |
|---|
| 333 | |
|---|
| 334 | description = 'Doing various tasks combined' |
|---|
| 335 | _stopped = 0 |
|---|
| 336 | |
|---|
| 337 | ### ITaskListener methods |
|---|
| 338 | def progressed(self, task, value): |
|---|
| 339 | self.setProgress(float(self._stopped + value) / len(self.tasks)) |
|---|
| 340 | |
|---|
| 341 | def stopped(self, task): |
|---|
| 342 | self._stopped += 1 |
|---|
| 343 | self.setProgress(float(self._stopped) / len(self.tasks)) |
|---|
| 344 | BaseMultiTask.stopped(self, task) |
|---|
| 345 | |
|---|
| 346 | class TaskRunner(object, log.Loggable): |
|---|
| 347 | """ |
|---|
| 348 | I am a base class for task runners. |
|---|
| 349 | Task runners should be reusable. |
|---|
| 350 | """ |
|---|
| 351 | logCategory = 'TaskRunner' |
|---|
| 352 | |
|---|
| 353 | def run(self, task): |
|---|
| 354 | """ |
|---|
| 355 | Run the given task. |
|---|
| 356 | |
|---|
| 357 | @type task: Task |
|---|
| 358 | """ |
|---|
| 359 | raise NotImplementedError |
|---|
| 360 | |
|---|
| 361 | ### methods for tasks to call |
|---|
| 362 | def schedule(self, delta, callable, *args, **kwargs): |
|---|
| 363 | """ |
|---|
| 364 | Schedule a single future call. |
|---|
| 365 | |
|---|
| 366 | Subclasses should implement this. |
|---|
| 367 | |
|---|
| 368 | @type delta: float |
|---|
| 369 | @param delta: time in the future to schedule call for, in seconds. |
|---|
| 370 | """ |
|---|
| 371 | raise NotImplementedError |
|---|
| 372 | |
|---|
| 373 | |
|---|
| 374 | class SyncRunner(TaskRunner, ITaskListener): |
|---|
| 375 | """ |
|---|
| 376 | I run the task synchronously in a gobject MainLoop. |
|---|
| 377 | """ |
|---|
| 378 | def __init__(self, verbose=True): |
|---|
| 379 | self._verbose = verbose |
|---|
| 380 | self._longest = 0 # longest string shown; for clearing |
|---|
| 381 | |
|---|
| 382 | def run(self, task, verbose=None, skip=False): |
|---|
| 383 | self.debug('run task %r', task) |
|---|
| 384 | self._task = task |
|---|
| 385 | self._verboseRun = self._verbose |
|---|
| 386 | if verbose is not None: |
|---|
| 387 | self._verboseRun = verbose |
|---|
| 388 | self._skip = skip |
|---|
| 389 | |
|---|
| 390 | self._loop = gobject.MainLoop() |
|---|
| 391 | self._task.addListener(self) |
|---|
| 392 | # only start the task after going into the mainloop, |
|---|
| 393 | # otherwise the task might complete before we are in it |
|---|
| 394 | gobject.timeout_add(0L, self._startWrap, self._task) |
|---|
| 395 | self.debug('run loop') |
|---|
| 396 | self._loop.run() |
|---|
| 397 | |
|---|
| 398 | self.debug('done running task %r', task) |
|---|
| 399 | if task.exception: |
|---|
| 400 | # catch the exception message |
|---|
| 401 | # FIXME: this gave a traceback in the logging module |
|---|
| 402 | self.debug('raising TaskException for %r, %r' % ( |
|---|
| 403 | task.exceptionMessage, task.exceptionTraceback)) |
|---|
| 404 | msg = task.exceptionMessage |
|---|
| 405 | if task.exceptionTraceback: |
|---|
| 406 | msg += "\n" + task.exceptionTraceback |
|---|
| 407 | raise TaskException(task.exception, message=msg) |
|---|
| 408 | |
|---|
| 409 | def _startWrap(self, task): |
|---|
| 410 | # wrap task start such that we can report any exceptions and |
|---|
| 411 | # never hang |
|---|
| 412 | try: |
|---|
| 413 | self.debug('start task %r' % task) |
|---|
| 414 | task.start(self) |
|---|
| 415 | except Exception, e: |
|---|
| 416 | # getExceptionMessage uses global exception state that doesn't |
|---|
| 417 | # hang around, so store the message |
|---|
| 418 | task.setException(e) |
|---|
| 419 | self.debug('exception during start: %r', task.exceptionMessage) |
|---|
| 420 | self.stopped(task) |
|---|
| 421 | |
|---|
| 422 | |
|---|
| 423 | def schedule(self, delta, callable, *args, **kwargs): |
|---|
| 424 | def c(): |
|---|
| 425 | callable(*args, **kwargs) |
|---|
| 426 | return False |
|---|
| 427 | gobject.timeout_add(int(delta * 1000L), c) |
|---|
| 428 | |
|---|
| 429 | ### ITaskListener methods |
|---|
| 430 | def progressed(self, task, value): |
|---|
| 431 | if not self._verboseRun: |
|---|
| 432 | return |
|---|
| 433 | |
|---|
| 434 | self._report() |
|---|
| 435 | |
|---|
| 436 | if value >= 1.0: |
|---|
| 437 | if self._skip: |
|---|
| 438 | self._output('%s %3d %%' % ( |
|---|
| 439 | self._task.description, 100.0)) |
|---|
| 440 | else: |
|---|
| 441 | # clear with whitespace |
|---|
| 442 | sys.stdout.write("%s\r" % (' ' * self._longest, )) |
|---|
| 443 | |
|---|
| 444 | def _output(self, what, newline=False, ret=True): |
|---|
| 445 | sys.stdout.write(what) |
|---|
| 446 | sys.stdout.write(' ' * (self._longest - len(what))) |
|---|
| 447 | if ret: |
|---|
| 448 | sys.stdout.write('\r') |
|---|
| 449 | if newline: |
|---|
| 450 | sys.stdout.write('\n') |
|---|
| 451 | sys.stdout.flush() |
|---|
| 452 | if len(what) > self._longest: |
|---|
| 453 | #print; print 'setting longest', self._longest; print |
|---|
| 454 | self._longest = len(what) |
|---|
| 455 | |
|---|
| 456 | def described(self, task, description): |
|---|
| 457 | if self._verboseRun: |
|---|
| 458 | self._report() |
|---|
| 459 | |
|---|
| 460 | def stopped(self, task): |
|---|
| 461 | self.debug('stopped task %r', task) |
|---|
| 462 | self.progressed(task, 1.0) |
|---|
| 463 | self._loop.quit() |
|---|
| 464 | |
|---|
| 465 | def _report(self): |
|---|
| 466 | self._output('%s %3d %%' % ( |
|---|
| 467 | self._task.description, self._task.progress * 100.0)) |
|---|
| 468 | |
|---|
| 469 | if __name__ == '__main__': |
|---|
| 470 | task = DummyTask() |
|---|
| 471 | runner = SyncRunner() |
|---|
| 472 | runner.run(task) |
|---|