source: trunk/morituri/common/encode.py @ 434

Revision 434, 16.1 KB checked in by thomas, 2 years ago (diff)
  • morituri/common/encode.py: wavenc does not have merge_tags, it seems. So don't call an element a tagger, don't merge tags if there is no tagger, and complain if there is no merge_tags when we think there should be.
Line 
1# -*- Mode: Python; test-case-name: morituri.test.test_common_encode -*-
2# vi:si:et:sw=4:sts=4:ts=4
3
4# Morituri - for those about to RIP
5
6# Copyright (C) 2009 Thomas Vander Stichele
7
8# This file is part of morituri.
9#
10# morituri is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# morituri is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with morituri.  If not, see <http://www.gnu.org/licenses/>.
22
23import math
24import os
25import shutil
26import tempfile
27
28from morituri.common import common, task
29
30class Profile(object):
31    name = None
32    extension = None
33    pipeline = None
34    losless = None
35
36    def test(self):
37        """
38        Test if this profile will work.
39        Can check for elements, ...
40        """
41        pass
42
43class FlacProfile(Profile):
44    name = 'flac'
45    extension = 'flac'
46    pipeline = 'flacenc name=tagger quality=8'
47    lossless = True
48
49    # FIXME: we should do something better than just printing ERRORS
50    def test(self):
51
52        # here to avoid import gst eating our options
53        import gst
54
55        plugin = gst.registry_get_default().find_plugin('flac')
56        if not plugin:
57            print 'ERROR: cannot find flac plugin'
58            return False
59
60        versionTuple = tuple([int(x) for x in plugin.get_version().split('.')])
61        if len(versionTuple) < 4:
62            versionTuple = versionTuple + (0, )
63        if versionTuple > (0, 10, 9, 0) and versionTuple <= (0, 10, 15, 0):
64            print 'ERROR: flacenc between 0.10.9 and 0.10.15 has a bug'
65            return False
66
67        return True
68
69class AlacProfile(Profile):
70    name = 'alac'
71    extension = 'alac'
72    pipeline = 'ffenc_alac name=tagger'
73    lossless = True
74
75# FIXME: wavenc does not have merge_tags
76class WavProfile(Profile):
77    name = 'wav'
78    extension = 'wav'
79    pipeline = 'wavenc'
80    lossless = True
81
82class WavpackProfile(Profile):
83    name = 'wavpack'
84    extension = 'wv'
85    pipeline = 'wavpackenc bitrate=0 name=tagger'
86    lossless = True
87
88class MP3Profile(Profile):
89    name = 'mp3'
90    extension = 'mp3'
91    pipeline = 'lame name=tagger quality=0 ! id3v2mux'
92    lossless = False
93
94class MP3VBRProfile(Profile):
95    name = 'mp3vbr'
96    extension = 'mp3'
97    pipeline = 'lame name=tagger quality=0 vbr=new vbr-mean-bitrate=192 ! id3v2mux'
98    lossless = False
99
100
101class VorbisProfile(Profile):
102    name = 'vorbis'
103    extension = 'oga'
104    pipeline = 'audioconvert ! vorbisenc name=tagger ! oggmux'
105    lossless = False
106
107
108PROFILES = {
109    'wav':     WavProfile,
110    'flac':    FlacProfile,
111    'alac':    AlacProfile,
112    'wavpack': WavpackProfile,
113}
114
115LOSSY_PROFILES = {
116    'mp3':     MP3Profile,
117    'mp3vbr':  MP3VBRProfile,
118    'vorbis':  VorbisProfile,
119}
120
121ALL_PROFILES = PROFILES.copy()
122ALL_PROFILES.update(LOSSY_PROFILES)
123
124class EncodeTask(task.Task):
125    """
126    I am a task that encodes a .wav file.
127    I set tags too.
128    I also calculate the peak level of the track.
129
130    @param peak: the peak volume, from 0.0 to 1.0.  This is the sqrt of the
131                 peak power.
132    @type  peak: float
133    """
134
135    logCategory = 'EncodeTask'
136
137    description = 'Encoding'
138    peak = None
139
140    def __init__(self, inpath, outpath, profile, taglist=None):
141        """
142        @param profile: encoding profile
143        @type  profile: L{Profile}
144        """
145        assert type(inpath) is unicode, "inpath %r is not unicode" % inpath
146        assert type(outpath) is unicode, \
147            "outpath %r is not unicode" % outpath
148       
149        self._inpath = inpath
150        self._outpath = outpath
151        self._taglist = taglist
152
153        self._level = None
154        self._peakdB = None
155        self._profile = profile
156
157        self._profile.test()
158
159    def start(self, runner):
160        task.Task.start(self, runner)
161
162        # here to avoid import gst eating our options
163        import gst
164
165        self._pipeline = gst.parse_launch('''
166            filesrc location="%s" !
167            decodebin name=decoder !
168            audio/x-raw-int,width=16,depth=16,channels=2 !
169            level name=level !
170            %s !
171            filesink location="%s" name=sink''' % (
172                common.quoteParse(self._inpath).encode('utf-8'),
173                self._profile.pipeline,
174                common.quoteParse(self._outpath).encode('utf-8')))
175
176        tagger = self._pipeline.get_by_name('tagger')
177
178        # set tags
179        if tagger and self._taglist:
180            # FIXME: under which conditions do we not have merge_tags ?
181            # See for example comment saying wavenc did not have it.
182            try:
183                tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND)
184            except AttributeError, e:
185                self.warning('Could not merge tags: %r',
186                    log.getExceptionMessage(e))
187
188        self.debug('pausing pipeline')
189        self._pipeline.set_state(gst.STATE_PAUSED)
190        self._pipeline.get_state()
191        self.debug('paused pipeline')
192
193        # get length
194        self.debug('query duration')
195        try:
196            length, qformat = tagger.query_duration(gst.FORMAT_DEFAULT)
197        except gst.QueryError, e:
198            self.setException(e)
199            self.stop()
200            return
201
202        # wavparse 0.10.14 returns in bytes
203        if qformat == gst.FORMAT_BYTES:
204            self.debug('query returned in BYTES format')
205            length /= 4
206        self.debug('total length: %r', length)
207        self._length = length
208
209        # add eos handling
210        bus = self._pipeline.get_bus()
211        bus.add_signal_watch()
212        bus.connect('message::eos', self._message_eos_cb)
213
214        # set up level callbacks
215        bus.connect('message::element', self._message_element_cb)
216        self._level = self._pipeline.get_by_name('level')
217        # add a probe so we can track progress
218        # we connect to level because this gives us offset in samples
219        srcpad = self._level.get_static_pad('src')
220        srcpad.add_buffer_probe(self._probe_handler)
221
222        self.debug('scheduling setting to play')
223        # since set_state returns non-False, adding it as timeout_add
224        # will repeatedly call it, and block the main loop; so
225        #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
226        # would not work.
227
228        def play():
229            self._pipeline.set_state(gst.STATE_PLAYING)
230            return False
231        self.runner.schedule(0, play)
232
233        #self._pipeline.set_state(gst.STATE_PLAYING)
234        self.debug('scheduled setting to play')
235
236    def _probe_handler(self, pad, buffer):
237        # update progress based on buffer offset (expected to be in samples)
238        # versus length in samples
239        # marshal to main thread
240        self.runner.schedule(0, self.setProgress,
241            float(buffer.offset) / self._length)
242
243        # don't drop the buffer
244        return True
245
246    def _message_eos_cb(self, bus, message):
247        self.debug('eos, scheduling stop')
248        self.runner.schedule(0, self.stop)
249
250    def _message_element_cb(self, bus, message):
251        if message.src != self._level:
252            return
253
254        s = message.structure
255        if s.get_name() != 'level':
256            return
257
258
259        if self._peakdB is None:
260            self._peakdB = s['peak'][0]
261
262        for p in s['peak']:
263            if self._peakdB < p:
264                self.log('higher peakdB found, now %r', self._peakdB)
265                self._peakdB = p
266
267    def stop(self):
268        # here to avoid import gst eating our options
269        import gst
270
271        self.debug('stopping')
272        self.debug('setting state to NULL')
273        self._pipeline.set_state(gst.STATE_NULL)
274        self.debug('set state to NULL')
275        task.Task.stop(self)
276
277        if self._peakdB is not None:
278            self.debug('peakdB %r', self._peakdB)
279            self.peak = math.sqrt(math.pow(10, self._peakdB / 10.0))
280        else:
281            self.warning('No peak found, something went wrong!')
282
283class TagReadTask(task.Task):
284    """
285    I am a task that reads tags.
286
287    @ivar  taglist: the tag list read from the file.
288    @type  taglist: L{gst.TagList}
289    """
290
291    logCategory = 'TagReadTask'
292
293    description = 'Reading tags'
294
295    taglist = None
296
297    def __init__(self, path):
298        """
299        """
300        assert type(path) is unicode, "path %r is not unicode" % path
301       
302        self._path = path
303
304    def start(self, runner):
305        task.Task.start(self, runner)
306
307        # here to avoid import gst eating our options
308        import gst
309
310        self._pipeline = gst.parse_launch('''
311            filesrc location="%s" !
312            decodebin name=decoder !
313            fakesink''' % (
314                common.quoteParse(self._path).encode('utf-8')))
315
316        self.debug('pausing pipeline')
317        self._pipeline.set_state(gst.STATE_PAUSED)
318        self._pipeline.get_state()
319        self.debug('paused pipeline')
320
321        # add eos handling
322        bus = self._pipeline.get_bus()
323        bus.add_signal_watch()
324        bus.connect('message::eos', self._message_eos_cb)
325
326        # set up tag callbacks
327        bus.connect('message::tag', self._message_tag_cb)
328
329        self.debug('scheduling setting to play')
330        # since set_state returns non-False, adding it as timeout_add
331        # will repeatedly call it, and block the main loop; so
332        #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
333        # would not work.
334
335        def play():
336            self._pipeline.set_state(gst.STATE_PLAYING)
337            return False
338        self.runner.schedule(0, play)
339
340        #self._pipeline.set_state(gst.STATE_PLAYING)
341        self.debug('scheduled setting to play')
342
343    def _message_eos_cb(self, bus, message):
344        self.debug('eos, scheduling stop')
345        self.runner.schedule(0, self.stop)
346
347    def _message_tag_cb(self, bus, message):
348        taglist = message.parse_tag()
349        self.taglist = taglist
350
351    def stop(self):
352        # here to avoid import gst eating our options
353        import gst
354
355        self.debug('stopping')
356        self.debug('setting state to NULL')
357        self._pipeline.set_state(gst.STATE_NULL)
358        self.debug('set state to NULL')
359        task.Task.stop(self)
360
361class TagWriteTask(task.Task):
362    """
363    I am a task that retags an encoded file.
364    """
365
366    logCategory = 'TagWriteTask'
367
368    description = 'Writing tags'
369
370    def __init__(self, inpath, outpath, taglist=None):
371        """
372        """
373        assert type(inpath) is unicode, "inpath %r is not unicode" % inpath
374        assert type(outpath) is unicode, "outpath %r is not unicode" % outpath
375       
376        self._inpath = inpath
377        self._outpath = outpath
378        self._taglist = taglist
379
380    def start(self, runner):
381        task.Task.start(self, runner)
382
383        # here to avoid import gst eating our options
384        import gst
385
386        self._pipeline = gst.parse_launch('''
387            filesrc location="%s" !
388            flactag name=tagger !
389            filesink location="%s"''' % (
390                common.quoteParse(self._inpath).encode('utf-8'),
391                common.quoteParse(self._outpath).encode('utf-8')))
392
393        # set tags
394        tagger = self._pipeline.get_by_name('tagger')
395        if self._taglist:
396            tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND)
397
398        self.debug('pausing pipeline')
399        self._pipeline.set_state(gst.STATE_PAUSED)
400        self._pipeline.get_state()
401        self.debug('paused pipeline')
402
403        # add eos handling
404        bus = self._pipeline.get_bus()
405        bus.add_signal_watch()
406        bus.connect('message::eos', self._message_eos_cb)
407
408        self.debug('scheduling setting to play')
409        # since set_state returns non-False, adding it as timeout_add
410        # will repeatedly call it, and block the main loop; so
411        #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
412        # would not work.
413
414        def play():
415            self._pipeline.set_state(gst.STATE_PLAYING)
416            return False
417        self.runner.schedule(0, play)
418
419        #self._pipeline.set_state(gst.STATE_PLAYING)
420        self.debug('scheduled setting to play')
421
422    def _message_eos_cb(self, bus, message):
423        self.debug('eos, scheduling stop')
424        self.runner.schedule(0, self.stop)
425
426    def stop(self):
427        # here to avoid import gst eating our options
428        import gst
429
430        self.debug('stopping')
431        self.debug('setting state to NULL')
432        self._pipeline.set_state(gst.STATE_NULL)
433        self.debug('set state to NULL')
434        task.Task.stop(self)
435
436class SafeRetagTask(task.MultiSeparateTask):
437    """
438    I am a task that retags an encoded file safely in place.
439    First of all, if the new tags are the same as the old ones, it doesn't
440    do anything.
441    If the tags are not the same, then the file gets retagged, but only
442    if the decodes of the original and retagged file checksum the same.
443
444    @ivar changed: True if the tags have changed (and hence an output file is
445                   generated)
446    """
447
448    logCategory = 'SafeRetagTask'
449
450    description = 'Retagging'
451
452    changed = False
453
454    def __init__(self, path, taglist=None):
455        """
456        """
457        assert type(path) is unicode, "path %r is not unicode" % path
458
459        task.MultiSeparateTask.__init__(self)
460       
461        self._path = path
462        self._taglist = taglist.copy()
463
464        self.tasks = [TagReadTask(path), ]
465
466    def stopped(self, taskk):
467        from morituri.common import checksum
468
469        if not taskk.exception:
470            # Check if the tags are different or not
471            if taskk == self.tasks[0]:
472                taglist = taskk.taglist.copy()
473                if common.tagListEquals(taglist, self._taglist):
474                    self.debug('tags are already fine: %r',
475                        common.tagListToDict(taglist))
476                else:
477                    # need to retag
478                    self.debug('tags need to be rewritten')
479                    self.debug('Current tags: %r, new tags: %r',
480                        common.tagListToDict(taglist),
481                        common.tagListToDict(self._taglist))
482                    assert common.tagListToDict(taglist) != common.tagListToDict(self._taglist)
483                    self.tasks.append(checksum.CRC32Task(self._path))
484                    self._fd, self._tmppath = tempfile.mkstemp(
485                        dir=os.path.dirname(self._path), suffix=u'.morituri')
486                    self.tasks.append(TagWriteTask(self._path,
487                        self._tmppath, self._taglist))
488                    self.tasks.append(checksum.CRC32Task(self._tmppath))
489                    self.tasks.append(TagReadTask(self._tmppath))
490            elif len(self.tasks) > 1 and taskk == self.tasks[4]:
491                if common.tagListEquals(self.tasks[4].taglist, self._taglist):
492                    self.debug('tags written successfully')
493                    c1 = self.tasks[1].checksum
494                    c2 = self.tasks[3].checksum
495                    self.debug('comparing checksums %08x and %08x' % (c1, c2))
496                    if c1 == c2:
497                        # data is fine, so we can now move
498                        # but first, copy original mode to our temporary file
499                        shutil.copymode(self._path, self._tmppath)
500                        self.debug('moving temporary file to %r' % self._path)
501                        os.rename(self._tmppath, self._path)
502                        self.changed = True
503                    else:
504                        # FIXME: don't raise TypeError
505                        e = TypeError("Checksums failed")
506                        self.setAndRaiseException(e)
507                else:
508                    self.debug('failed to update tags, only have %r',
509                        common.tagListToDict(self.tasks[4].taglist))
510                    os.unlink(self._tmppath)
511                    e = TypeError("Tags not written")
512                    self.setAndRaiseException(e)
513                   
514        task.MultiSeparateTask.stopped(self, taskk)
515
516     
Note: See TracBrowser for help on using the repository browser.