source: trunk/morituri/common/encode.py @ 484

Revision 484, 14.1 KB checked in by thomas, 2 years ago (diff)
  • morituri/rip/Makefile.am:
  • morituri/rip/main.py:
  • morituri/rip/debug.py (added): Add helper command to debug tasks.
  • morituri/common/encode.py: Add debug.
Line 
1# -*- Mode: Python; test-case-name: morituri.test.test_common_encode -*-
2# vi:si:et:sw=4:sts=4:ts=4
3
4# Morituri - for those about to RIP
5
6# Copyright (C) 2009 Thomas Vander Stichele
7
8# This file is part of morituri.
9#
10# morituri is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# morituri is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with morituri.  If not, see <http://www.gnu.org/licenses/>.
22
23import math
24import os
25import shutil
26import tempfile
27
28from morituri.common import common, task, log, gstreamer
29
30class Profile(object):
31    name = None
32    extension = None
33    pipeline = None
34    losless = None
35
36    def test(self):
37        """
38        Test if this profile will work.
39        Can check for elements, ...
40        """
41        pass
42
43class FlacProfile(Profile):
44    name = 'flac'
45    extension = 'flac'
46    pipeline = 'flacenc name=tagger quality=8'
47    lossless = True
48
49    # FIXME: we should do something better than just printing ERRORS
50    def test(self):
51
52        # here to avoid import gst eating our options
53        import gst
54
55        plugin = gst.registry_get_default().find_plugin('flac')
56        if not plugin:
57            print 'ERROR: cannot find flac plugin'
58            return False
59
60        versionTuple = tuple([int(x) for x in plugin.get_version().split('.')])
61        if len(versionTuple) < 4:
62            versionTuple = versionTuple + (0, )
63        if versionTuple > (0, 10, 9, 0) and versionTuple <= (0, 10, 15, 0):
64            print 'ERROR: flacenc between 0.10.9 and 0.10.15 has a bug'
65            return False
66
67        return True
68
69# FIXME: ffenc_alac does not have merge_tags
70class AlacProfile(Profile):
71    name = 'alac'
72    extension = 'alac'
73    pipeline = 'ffenc_alac'
74    lossless = True
75
76# FIXME: wavenc does not have merge_tags
77class WavProfile(Profile):
78    name = 'wav'
79    extension = 'wav'
80    pipeline = 'wavenc'
81    lossless = True
82
83class WavpackProfile(Profile):
84    name = 'wavpack'
85    extension = 'wv'
86    pipeline = 'wavpackenc bitrate=0 name=tagger'
87    lossless = True
88
89class MP3Profile(Profile):
90    name = 'mp3'
91    extension = 'mp3'
92    pipeline = 'lame name=tagger quality=0 ! id3v2mux'
93    lossless = False
94
95class MP3VBRProfile(Profile):
96    name = 'mp3vbr'
97    extension = 'mp3'
98    pipeline = 'lame name=tagger quality=0 vbr=new vbr-mean-bitrate=192 ! id3v2mux'
99    lossless = False
100
101
102class VorbisProfile(Profile):
103    name = 'vorbis'
104    extension = 'oga'
105    pipeline = 'audioconvert ! vorbisenc name=tagger ! oggmux'
106    lossless = False
107
108
109PROFILES = {
110    'wav':     WavProfile,
111    'flac':    FlacProfile,
112    'alac':    AlacProfile,
113    'wavpack': WavpackProfile,
114}
115
116LOSSY_PROFILES = {
117    'mp3':     MP3Profile,
118    'mp3vbr':  MP3VBRProfile,
119    'vorbis':  VorbisProfile,
120}
121
122ALL_PROFILES = PROFILES.copy()
123ALL_PROFILES.update(LOSSY_PROFILES)
124
125class EncodeTask(gstreamer.GstPipelineTask):
126    """
127    I am a task that encodes a .wav file.
128    I set tags too.
129    I also calculate the peak level of the track.
130
131    @param peak: the peak volume, from 0.0 to 1.0.  This is the sqrt of the
132                 peak power.
133    @type  peak: float
134    """
135
136    logCategory = 'EncodeTask'
137
138    description = 'Encoding'
139    peak = None
140
141    def __init__(self, inpath, outpath, profile, taglist=None, what="track"):
142        """
143        @param profile: encoding profile
144        @type  profile: L{Profile}
145        """
146        assert type(inpath) is unicode, "inpath %r is not unicode" % inpath
147        assert type(outpath) is unicode, \
148            "outpath %r is not unicode" % outpath
149       
150        self._inpath = inpath
151        self._outpath = outpath
152        self._taglist = taglist
153
154        self._level = None
155        self._peakdB = None
156        self._profile = profile
157
158        self.description = "Encoding %s" % what
159        self._profile.test()
160
161    def getPipelineDesc(self):
162        return '''
163            filesrc location="%s" !
164            decodebin name=decoder !
165            audio/x-raw-int,width=16,depth=16,channels=2 !
166            level name=level !
167            %s ! identity name=identity !
168            filesink location="%s" name=sink''' % (
169                common.quoteParse(self._inpath).encode('utf-8'),
170                self._profile.pipeline,
171                common.quoteParse(self._outpath).encode('utf-8'))
172
173    def parsed(self):
174        tagger = self.pipeline.get_by_name('tagger')
175
176        # set tags
177        if tagger and self._taglist:
178            # FIXME: under which conditions do we not have merge_tags ?
179            # See for example comment saying wavenc did not have it.
180            try:
181                tagger.merge_tags(self._taglist, self.gst.TAG_MERGE_APPEND)
182            except AttributeError, e:
183                self.warning('Could not merge tags: %r',
184                    log.getExceptionMessage(e))
185
186    def paused(self):
187        # get length
188        identity = self.pipeline.get_by_name('identity')
189        self.debug('query duration')
190        try:
191            length, qformat = identity.query_duration(self.gst.FORMAT_DEFAULT)
192        except self.gst.QueryError, e:
193            self.setException(e)
194            self.stop()
195            return
196
197        # wavparse 0.10.14 returns in bytes
198        if qformat == self.gst.FORMAT_BYTES:
199            self.debug('query returned in BYTES format')
200            length /= 4
201        self.debug('total length: %r', length)
202        self._length = length
203
204        # set up level callbacks
205        # FIXME: publicize bus and reuse it instead of regetting and adding ?
206        bus = self.pipeline.get_bus()
207        bus.add_signal_watch()
208
209        bus.connect('message::element', self._message_element_cb)
210        self._level = self.pipeline.get_by_name('level')
211        # add a probe so we can track progress
212        # we connect to level because this gives us offset in samples
213        srcpad = self._level.get_static_pad('src')
214        self.gst.debug('adding srcpad buffer probe to %r' % srcpad)
215        ret = srcpad.add_buffer_probe(self._probe_handler)
216        self.gst.debug('added srcpad buffer probe to %r: %r' % (srcpad, ret))
217
218    def _probe_handler(self, pad, buffer):
219        # update progress based on buffer offset (expected to be in samples)
220        # versus length in samples
221        # marshal to main thread
222        self.schedule(0, self.setProgress,
223            float(buffer.offset) / self._length)
224
225        # don't drop the buffer
226        return True
227
228    def bus_eos_cb(self, bus, message):
229        self.debug('eos, scheduling stop')
230        self.schedule(0, self.stop)
231
232    def _message_element_cb(self, bus, message):
233        if message.src != self._level:
234            return
235
236        s = message.structure
237        if s.get_name() != 'level':
238            return
239
240
241        if self._peakdB is None:
242            self._peakdB = s['peak'][0]
243
244        for p in s['peak']:
245            if self._peakdB < p:
246                self.log('higher peakdB found, now %r', self._peakdB)
247                self._peakdB = p
248
249    def stopped(self):
250        if self._peakdB is not None:
251            self.debug('peakdB %r', self._peakdB)
252            self.peak = math.sqrt(math.pow(10, self._peakdB / 10.0))
253        else:
254            self.warning('No peak found, something went wrong!')
255
256class TagReadTask(gstreamer.GstPipelineTask):
257    """
258    I am a task that reads tags.
259
260    @ivar  taglist: the tag list read from the file.
261    @type  taglist: L{gst.TagList}
262    """
263
264    logCategory = 'TagReadTask'
265
266    description = 'Reading tags'
267
268    taglist = None
269
270    def __init__(self, path):
271        """
272        """
273        assert type(path) is unicode, "path %r is not unicode" % path
274       
275        self._path = path
276
277    def getPipelineDesc(self):
278        return '''
279            filesrc location="%s" !
280            decodebin name=decoder !
281            fakesink''' % (
282                common.quoteParse(self._path).encode('utf-8'))
283
284    def bus_eos_cb(self, bus, message):
285        self.debug('eos, scheduling stop')
286        self.schedule(0, self.stop)
287
288    def bus_tag_cb(self, bus, message):
289        taglist = message.parse_tag()
290        self.taglist = taglist
291
292class TagWriteTask(task.Task):
293    """
294    I am a task that retags an encoded file.
295    """
296
297    logCategory = 'TagWriteTask'
298
299    description = 'Writing tags'
300
301    def __init__(self, inpath, outpath, taglist=None):
302        """
303        """
304        assert type(inpath) is unicode, "inpath %r is not unicode" % inpath
305        assert type(outpath) is unicode, "outpath %r is not unicode" % outpath
306       
307        self._inpath = inpath
308        self._outpath = outpath
309        self._taglist = taglist
310
311    def start(self, runner):
312        task.Task.start(self, runner)
313
314        # here to avoid import gst eating our options
315        import gst
316
317        self._pipeline = gst.parse_launch('''
318            filesrc location="%s" !
319            flactag name=tagger !
320            filesink location="%s"''' % (
321                common.quoteParse(self._inpath).encode('utf-8'),
322                common.quoteParse(self._outpath).encode('utf-8')))
323
324        # set tags
325        tagger = self._pipeline.get_by_name('tagger')
326        if self._taglist:
327            tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND)
328
329        self.debug('pausing pipeline')
330        self._pipeline.set_state(gst.STATE_PAUSED)
331        self._pipeline.get_state()
332        self.debug('paused pipeline')
333
334        # add eos handling
335        bus = self._pipeline.get_bus()
336        bus.add_signal_watch()
337        bus.connect('message::eos', self._message_eos_cb)
338
339        self.debug('scheduling setting to play')
340        # since set_state returns non-False, adding it as timeout_add
341        # will repeatedly call it, and block the main loop; so
342        #   gobject.timeout_add(0L, self._pipeline.set_state,
343        #       gst.STATE_PLAYING)
344        # would not work.
345
346        def play():
347            self._pipeline.set_state(gst.STATE_PLAYING)
348            return False
349        self.schedule(0, play)
350
351        #self._pipeline.set_state(gst.STATE_PLAYING)
352        self.debug('scheduled setting to play')
353
354    def _message_eos_cb(self, bus, message):
355        self.debug('eos, scheduling stop')
356        self.schedule(0, self.stop)
357
358    def stop(self):
359        # here to avoid import gst eating our options
360        import gst
361
362        self.debug('stopping')
363        self.debug('setting state to NULL')
364        self._pipeline.set_state(gst.STATE_NULL)
365        self.debug('set state to NULL')
366        task.Task.stop(self)
367
368class SafeRetagTask(task.MultiSeparateTask):
369    """
370    I am a task that retags an encoded file safely in place.
371    First of all, if the new tags are the same as the old ones, it doesn't
372    do anything.
373    If the tags are not the same, then the file gets retagged, but only
374    if the decodes of the original and retagged file checksum the same.
375
376    @ivar changed: True if the tags have changed (and hence an output file is
377                   generated)
378    """
379
380    logCategory = 'SafeRetagTask'
381
382    description = 'Retagging'
383
384    changed = False
385
386    def __init__(self, path, taglist=None):
387        """
388        """
389        assert type(path) is unicode, "path %r is not unicode" % path
390
391        task.MultiSeparateTask.__init__(self)
392       
393        self._path = path
394        self._taglist = taglist.copy()
395
396        self.tasks = [TagReadTask(path), ]
397
398    def stopped(self, taskk):
399        from morituri.common import checksum
400
401        if not taskk.exception:
402            # Check if the tags are different or not
403            if taskk == self.tasks[0]:
404                taglist = taskk.taglist.copy()
405                if common.tagListEquals(taglist, self._taglist):
406                    self.debug('tags are already fine: %r',
407                        common.tagListToDict(taglist))
408                else:
409                    # need to retag
410                    self.debug('tags need to be rewritten')
411                    self.debug('Current tags: %r, new tags: %r',
412                        common.tagListToDict(taglist),
413                        common.tagListToDict(self._taglist))
414                    assert common.tagListToDict(taglist) != common.tagListToDict(self._taglist)
415                    self.tasks.append(checksum.CRC32Task(self._path))
416                    self._fd, self._tmppath = tempfile.mkstemp(
417                        dir=os.path.dirname(self._path), suffix=u'.morituri')
418                    self.tasks.append(TagWriteTask(self._path,
419                        self._tmppath, self._taglist))
420                    self.tasks.append(checksum.CRC32Task(self._tmppath))
421                    self.tasks.append(TagReadTask(self._tmppath))
422            elif len(self.tasks) > 1 and taskk == self.tasks[4]:
423                if common.tagListEquals(self.tasks[4].taglist, self._taglist):
424                    self.debug('tags written successfully')
425                    c1 = self.tasks[1].checksum
426                    c2 = self.tasks[3].checksum
427                    self.debug('comparing checksums %08x and %08x' % (c1, c2))
428                    if c1 == c2:
429                        # data is fine, so we can now move
430                        # but first, copy original mode to our temporary file
431                        shutil.copymode(self._path, self._tmppath)
432                        self.debug('moving temporary file to %r' % self._path)
433                        os.rename(self._tmppath, self._path)
434                        self.changed = True
435                    else:
436                        # FIXME: don't raise TypeError
437                        e = TypeError("Checksums failed")
438                        self.setAndRaiseException(e)
439                else:
440                    self.debug('failed to update tags, only have %r',
441                        common.tagListToDict(self.tasks[4].taglist))
442                    os.unlink(self._tmppath)
443                    e = TypeError("Tags not written")
444                    self.setAndRaiseException(e)
445                   
446        task.MultiSeparateTask.stopped(self, taskk)
447
448     
Note: See TracBrowser for help on using the repository browser.