Ignore:
Timestamp:
15-03-11 23:44:05 (2 years ago)
Author:
thomas
Message:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/morituri/common/checksum.py

    r375 r426  
    3131# checksums are not CRC's. a CRC is a specific type of checksum. 
    3232 
    33 class ChecksumTask(task.Task): 
     33# FIXME: probably this should move higher up the module hierarchy and 
     34# be used wider 
     35class GstException(Exception): 
     36    def __init__(self, gerror, debug): 
     37        self.args = (gerror, debug, ) 
     38        self.gerror = gerror 
     39        self.debug = debug 
     40 
     41# FIXME: this should move up too; other tasks might have use for it. 
     42class GstPipelineTask(task.Task): 
     43    """ 
     44    I am a base class for tasks that use a GStreamer pipeline. 
     45 
     46    I handle errors and raise them appropriately. 
     47    """ 
     48    def start(self, runner): 
     49        task.Task.start(self, runner) 
     50        desc = self.getPipelineDesc() 
     51 
     52        self.debug('creating pipeline %r', desc) 
     53        self.pipeline = gst.parse_launch(desc) 
     54 
     55        self._bus = self.pipeline.get_bus() 
     56        gst.debug('got bus %r' % self._bus) 
     57 
     58        # a signal watch calls callbacks from an idle loop 
     59        # self._bus.add_signal_watch() 
     60 
     61        # sync emission triggers sync-message signals which calls callbacks 
     62        # from the thread that signals, but happens immediately 
     63        self._bus.enable_sync_message_emission() 
     64        self._bus.connect('sync-message::eos', self.bus_eos_cb) 
     65        self._bus.connect('sync-message::tag', self.bus_tag_cb) 
     66        self._bus.connect('sync-message::error', self.bus_error_cb) 
     67 
     68        self.parsed() 
     69 
     70        self.debug('pausing pipeline') 
     71        self.pipeline.set_state(gst.STATE_PAUSED) 
     72        self.pipeline.get_state() 
     73        self.debug('paused pipeline') 
     74 
     75        if not self.exception: 
     76            self.paused() 
     77 
     78    def getPipelineDesc(self): 
     79        raise NotImplementedError 
     80 
     81    def parsed(self): 
     82        """ 
     83        Called after parsing the pipeline but before setting it to paused. 
     84        """ 
     85        pass 
     86 
     87    def paused(self): 
     88        """ 
     89        Called after pipeline is paused 
     90        """ 
     91        pass 
     92 
     93    def bus_eos_cb(self, bus, message): 
     94        pass 
     95 
     96    def bus_tag_cb(self, bus, message): 
     97        pass 
     98 
     99    def bus_error_cb(self, bus, message): 
     100        exc = GstException(*message.parse_error()) 
     101        self.setAndRaiseException(exc) 
     102        gst.debug('error, scheduling stop') 
     103        #self.runner.schedule(0, self.stop) 
     104 
     105 
     106class ChecksumTask(GstPipelineTask): 
    34107    """ 
    35108    I am a task that calculates a checksum of the decoded audio data. 
     
    73146        self.checksum = None # result 
    74147 
    75     def start(self, runner): 
    76         task.Task.start(self, runner) 
    77         self._pipeline = gst.parse_launch(''' 
     148    def getPipelineDesc(self): 
     149        return ''' 
    78150            filesrc location="%s" ! 
    79151            decodebin ! audio/x-raw-int ! 
    80             appsink name=sink sync=False emit-signals=True''' % 
    81                 common.quoteParse(self._path).encode('utf-8')) 
    82  
    83         self.debug('pausing pipeline') 
    84         self._pipeline.set_state(gst.STATE_PAUSED) 
    85         self._pipeline.get_state() 
    86         self.debug('paused pipeline') 
    87  
    88         sink = self._pipeline.get_by_name('sink') 
     152            appsink name=sink sync=False emit-signals=True 
     153            ''' % common.quoteParse(self._path).encode('utf-8') 
     154 
     155    def paused(self): 
     156        sink = self.pipeline.get_by_name('sink') 
    89157 
    90158        if self._frameLength < 0: 
     
    123191        # everything for flac; fixed in recent -good 
    124192        result = sink.send_event(event) 
    125         self.debug('event sent') 
    126         self.debug(result) 
     193        self.debug('event sent, result %r', result) 
    127194        sink.connect('new-buffer', self._new_buffer_cb) 
    128195        sink.connect('eos', self._eos_cb) 
     
    131198        # since set_state returns non-False, adding it as timeout_add 
    132199        # will repeatedly call it, and block the main loop; so 
    133         #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING) 
     200        #   gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING) 
    134201        # would not work. 
    135202 
    136203        def play(): 
    137             self._pipeline.set_state(gst.STATE_PLAYING) 
     204            self.pipeline.set_state(gst.STATE_PLAYING) 
    138205            return False 
    139206        self.runner.schedule(0, play) 
    140207 
    141         #self._pipeline.set_state(gst.STATE_PLAYING) 
     208        #self.pipeline.set_state(gst.STATE_PLAYING) 
    142209        self.debug('scheduled setting to play') 
    143210 
     
    186253        self.debug('stopping') 
    187254        self.debug('setting state to NULL') 
    188         self._pipeline.set_state(gst.STATE_NULL) 
     255        self.pipeline.set_state(gst.STATE_NULL) 
    189256 
    190257        if not self._last: 
     
    273340        return checksum 
    274341 
    275 class TRMTask(task.Task): 
     342class TRMTask(GstPipelineTask): 
    276343    """ 
    277344    I calculate a MusicBrainz TRM fingerprint. 
     
    289356        self.path = path 
    290357        self._trm = None 
    291         self._pipeline = None 
    292358        self._bus = None 
    293359 
    294     def start(self, runner): 
    295         task.Task.start(self, runner) 
    296         self._pipeline = gst.parse_launch(''' 
     360    def getPipelineDesc(self): 
     361        return ''' 
    297362            filesrc location="%s" ! 
    298363            decodebin ! audioconvert ! audio/x-raw-int ! 
    299364            trm name=trm ! 
    300             appsink name=sink sync=False emit-signals=True''' % self.path) 
    301         self._bus = self._pipeline.get_bus() 
    302         self._bus.add_signal_watch() 
    303         self._bus.connect('message::eos', self._bus_eos_cb) 
    304         self._bus.connect('message::tag', self._bus_tag_cb) 
    305         self._bus.connect('message::error', self._bus_error_cb) 
    306         sink = self._pipeline.get_by_name('sink') 
     365            appsink name=sink sync=False emit-signals=True''' % self.path 
     366 
     367 
     368    def parsed(self): 
     369        sink = self.pipeline.get_by_name('sink') 
    307370        sink.connect('new-buffer', self._new_buffer_cb) 
    308371 
    309         gst.debug('pausing') 
    310         self._pipeline.set_state(gst.STATE_PAUSED) 
    311         gst.debug('paused') 
    312         self._pipeline.get_state() 
    313         gst.debug('paused') 
    314  
     372    def paused(self): 
    315373        gst.debug('query duration') 
    316         sink = self._pipeline.get_by_name('sink') 
    317  
    318         self._length, qformat = self._pipeline.query_duration(gst.FORMAT_TIME) 
     374        sink = self.pipeline.get_by_name('sink') 
     375 
     376        self._length, qformat = self.pipeline.query_duration(gst.FORMAT_TIME) 
    319377        gst.debug('total length: %r' % self._length) 
    320378        gst.debug('scheduling setting to play') 
    321379        # since set_state returns non-False, adding it as timeout_add 
    322380        # will repeatedly call it, and block the main loop; so 
    323         #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING) 
     381        #   gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING) 
    324382        # would not work. 
    325383 
    326384        def play(): 
    327             self._pipeline.set_state(gst.STATE_PLAYING) 
     385            self.pipeline.set_state(gst.STATE_PLAYING) 
    328386            return False 
    329387        self.runner.schedule(0, play) 
    330388 
    331         #self._pipeline.set_state(gst.STATE_PLAYING) 
     389        #self.pipeline.set_state(gst.STATE_PLAYING) 
    332390        gst.debug('scheduled setting to play') 
    333391 
    334     def _bus_eos_cb(self, bus, message): 
     392    # FIXME: can't move this to base class because it triggers too soon 
     393    # in the case of checksum 
     394    def bus_eos_cb(self, bus, message): 
    335395        gst.debug('eos, scheduling stop') 
    336396        self.runner.schedule(0, self.stop) 
    337397 
    338     def _bus_tag_cb(self, bus, message): 
     398 
     399    def bus_tag_cb(self, bus, message): 
    339400        taglist = message.parse_tag() 
    340401        if 'musicbrainz-trmid' in taglist.keys(): 
    341402            self._trm = taglist['musicbrainz-trmid'] 
    342  
    343     def _bus_error_cb(self, bus, message): 
    344         error = message.parse_error() 
    345         # FIXME: handle properly 
    346         print error 
    347403 
    348404    def _new_buffer_cb(self, sink): 
     
    357413        gst.debug('stopping') 
    358414        gst.debug('setting state to NULL') 
    359         self._pipeline.set_state(gst.STATE_NULL) 
     415        self.pipeline.set_state(gst.STATE_NULL) 
    360416 
    361417        # publicize and stop 
Note: See TracChangeset for help on using the changeset viewer.