Changeset 426 for trunk/morituri/common/checksum.py
- Timestamp:
- 15-03-11 23:44:05 (2 years ago)
- File:
-
- 1 edited
-
trunk/morituri/common/checksum.py (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/morituri/common/checksum.py
r375 r426 31 31 # checksums are not CRC's. a CRC is a specific type of checksum. 32 32 33 class ChecksumTask(task.Task): 33 # FIXME: probably this should move higher up the module hierarchy and 34 # be used wider 35 class GstException(Exception): 36 def __init__(self, gerror, debug): 37 self.args = (gerror, debug, ) 38 self.gerror = gerror 39 self.debug = debug 40 41 # FIXME: this should move up too; other tasks might have use for it. 42 class GstPipelineTask(task.Task): 43 """ 44 I am a base class for tasks that use a GStreamer pipeline. 45 46 I handle errors and raise them appropriately. 47 """ 48 def start(self, runner): 49 task.Task.start(self, runner) 50 desc = self.getPipelineDesc() 51 52 self.debug('creating pipeline %r', desc) 53 self.pipeline = gst.parse_launch(desc) 54 55 self._bus = self.pipeline.get_bus() 56 gst.debug('got bus %r' % self._bus) 57 58 # a signal watch calls callbacks from an idle loop 59 # self._bus.add_signal_watch() 60 61 # sync emission triggers sync-message signals which calls callbacks 62 # from the thread that signals, but happens immediately 63 self._bus.enable_sync_message_emission() 64 self._bus.connect('sync-message::eos', self.bus_eos_cb) 65 self._bus.connect('sync-message::tag', self.bus_tag_cb) 66 self._bus.connect('sync-message::error', self.bus_error_cb) 67 68 self.parsed() 69 70 self.debug('pausing pipeline') 71 self.pipeline.set_state(gst.STATE_PAUSED) 72 self.pipeline.get_state() 73 self.debug('paused pipeline') 74 75 if not self.exception: 76 self.paused() 77 78 def getPipelineDesc(self): 79 raise NotImplementedError 80 81 def parsed(self): 82 """ 83 Called after parsing the pipeline but before setting it to paused. 84 """ 85 pass 86 87 def paused(self): 88 """ 89 Called after pipeline is paused 90 """ 91 pass 92 93 def bus_eos_cb(self, bus, message): 94 pass 95 96 def bus_tag_cb(self, bus, message): 97 pass 98 99 def bus_error_cb(self, bus, message): 100 exc = GstException(*message.parse_error()) 101 self.setAndRaiseException(exc) 102 gst.debug('error, scheduling stop') 103 #self.runner.schedule(0, self.stop) 104 105 106 class ChecksumTask(GstPipelineTask): 34 107 """ 35 108 I am a task that calculates a checksum of the decoded audio data. … … 73 146 self.checksum = None # result 74 147 75 def start(self, runner): 76 task.Task.start(self, runner) 77 self._pipeline = gst.parse_launch(''' 148 def getPipelineDesc(self): 149 return ''' 78 150 filesrc location="%s" ! 79 151 decodebin ! audio/x-raw-int ! 80 appsink name=sink sync=False emit-signals=True''' % 81 common.quoteParse(self._path).encode('utf-8')) 82 83 self.debug('pausing pipeline') 84 self._pipeline.set_state(gst.STATE_PAUSED) 85 self._pipeline.get_state() 86 self.debug('paused pipeline') 87 88 sink = self._pipeline.get_by_name('sink') 152 appsink name=sink sync=False emit-signals=True 153 ''' % common.quoteParse(self._path).encode('utf-8') 154 155 def paused(self): 156 sink = self.pipeline.get_by_name('sink') 89 157 90 158 if self._frameLength < 0: … … 123 191 # everything for flac; fixed in recent -good 124 192 result = sink.send_event(event) 125 self.debug('event sent') 126 self.debug(result) 193 self.debug('event sent, result %r', result) 127 194 sink.connect('new-buffer', self._new_buffer_cb) 128 195 sink.connect('eos', self._eos_cb) … … 131 198 # since set_state returns non-False, adding it as timeout_add 132 199 # will repeatedly call it, and block the main loop; so 133 # gobject.timeout_add(0L, self. _pipeline.set_state, gst.STATE_PLAYING)200 # gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING) 134 201 # would not work. 135 202 136 203 def play(): 137 self. _pipeline.set_state(gst.STATE_PLAYING)204 self.pipeline.set_state(gst.STATE_PLAYING) 138 205 return False 139 206 self.runner.schedule(0, play) 140 207 141 #self. _pipeline.set_state(gst.STATE_PLAYING)208 #self.pipeline.set_state(gst.STATE_PLAYING) 142 209 self.debug('scheduled setting to play') 143 210 … … 186 253 self.debug('stopping') 187 254 self.debug('setting state to NULL') 188 self. _pipeline.set_state(gst.STATE_NULL)255 self.pipeline.set_state(gst.STATE_NULL) 189 256 190 257 if not self._last: … … 273 340 return checksum 274 341 275 class TRMTask( task.Task):342 class TRMTask(GstPipelineTask): 276 343 """ 277 344 I calculate a MusicBrainz TRM fingerprint. … … 289 356 self.path = path 290 357 self._trm = None 291 self._pipeline = None292 358 self._bus = None 293 359 294 def start(self, runner): 295 task.Task.start(self, runner) 296 self._pipeline = gst.parse_launch(''' 360 def getPipelineDesc(self): 361 return ''' 297 362 filesrc location="%s" ! 298 363 decodebin ! audioconvert ! audio/x-raw-int ! 299 364 trm name=trm ! 300 appsink name=sink sync=False emit-signals=True''' % self.path) 301 self._bus = self._pipeline.get_bus() 302 self._bus.add_signal_watch() 303 self._bus.connect('message::eos', self._bus_eos_cb) 304 self._bus.connect('message::tag', self._bus_tag_cb) 305 self._bus.connect('message::error', self._bus_error_cb) 306 sink = self._pipeline.get_by_name('sink') 365 appsink name=sink sync=False emit-signals=True''' % self.path 366 367 368 def parsed(self): 369 sink = self.pipeline.get_by_name('sink') 307 370 sink.connect('new-buffer', self._new_buffer_cb) 308 371 309 gst.debug('pausing') 310 self._pipeline.set_state(gst.STATE_PAUSED) 311 gst.debug('paused') 312 self._pipeline.get_state() 313 gst.debug('paused') 314 372 def paused(self): 315 373 gst.debug('query duration') 316 sink = self. _pipeline.get_by_name('sink')317 318 self._length, qformat = self. _pipeline.query_duration(gst.FORMAT_TIME)374 sink = self.pipeline.get_by_name('sink') 375 376 self._length, qformat = self.pipeline.query_duration(gst.FORMAT_TIME) 319 377 gst.debug('total length: %r' % self._length) 320 378 gst.debug('scheduling setting to play') 321 379 # since set_state returns non-False, adding it as timeout_add 322 380 # will repeatedly call it, and block the main loop; so 323 # gobject.timeout_add(0L, self. _pipeline.set_state, gst.STATE_PLAYING)381 # gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING) 324 382 # would not work. 325 383 326 384 def play(): 327 self. _pipeline.set_state(gst.STATE_PLAYING)385 self.pipeline.set_state(gst.STATE_PLAYING) 328 386 return False 329 387 self.runner.schedule(0, play) 330 388 331 #self. _pipeline.set_state(gst.STATE_PLAYING)389 #self.pipeline.set_state(gst.STATE_PLAYING) 332 390 gst.debug('scheduled setting to play') 333 391 334 def _bus_eos_cb(self, bus, message): 392 # FIXME: can't move this to base class because it triggers too soon 393 # in the case of checksum 394 def bus_eos_cb(self, bus, message): 335 395 gst.debug('eos, scheduling stop') 336 396 self.runner.schedule(0, self.stop) 337 397 338 def _bus_tag_cb(self, bus, message): 398 399 def bus_tag_cb(self, bus, message): 339 400 taglist = message.parse_tag() 340 401 if 'musicbrainz-trmid' in taglist.keys(): 341 402 self._trm = taglist['musicbrainz-trmid'] 342 343 def _bus_error_cb(self, bus, message):344 error = message.parse_error()345 # FIXME: handle properly346 print error347 403 348 404 def _new_buffer_cb(self, sink): … … 357 413 gst.debug('stopping') 358 414 gst.debug('setting state to NULL') 359 self. _pipeline.set_state(gst.STATE_NULL)415 self.pipeline.set_state(gst.STATE_NULL) 360 416 361 417 # publicize and stop
Note: See TracChangeset
for help on using the changeset viewer.
