source: trunk/morituri/common/checksum.py @ 442

Revision 442, 14.1 KB checked in by thomas, 2 years ago (diff)
  • morituri/common/checksum.py:
  • morituri/common/encode.py:
  • morituri/image/table.py:
  • morituri/rip/cd.py:
  • morituri/rip/image.py:
  • morituri/rip/offset.py:
  • morituri/test/test_common_accurip.py:
  • morituri/test/test_common_checksum.py:
  • morituri/test/test_image_cue.py:
  • morituri/test/test_image_table.py: Pychecker fixes.
Line 
1# -*- Mode: Python; test-case-name: morituri.test.test_common_checksum -*-
2# vi:si:et:sw=4:sts=4:ts=4
3
4# Morituri - for those about to RIP
5
6# Copyright (C) 2009 Thomas Vander Stichele
7
8# This file is part of morituri.
9#
10# morituri is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# morituri is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with morituri.  If not, see <http://www.gnu.org/licenses/>.
22
23import os
24import struct
25import zlib
26
27import gst
28
29from morituri.common import common, task
30
31# checksums are not CRC's. a CRC is a specific type of checksum.
32
33# FIXME: probably this should move higher up the module hierarchy and
34# be used wider
35class GstException(Exception):
36    def __init__(self, gerror, debug):
37        self.args = (gerror, debug, )
38        self.gerror = gerror
39        self.debug = debug
40
41# FIXME: this should move up too; other tasks might have use for it.
42class GstPipelineTask(task.Task):
43    """
44    I am a base class for tasks that use a GStreamer pipeline.
45
46    I handle errors and raise them appropriately.
47    """
48    def start(self, runner):
49        task.Task.start(self, runner)
50        desc = self.getPipelineDesc()
51
52        self.debug('creating pipeline %r', desc)
53        self.pipeline = gst.parse_launch(desc)
54
55        self._bus = self.pipeline.get_bus()
56        gst.debug('got bus %r' % self._bus)
57
58        # a signal watch calls callbacks from an idle loop
59        # self._bus.add_signal_watch()
60
61        # sync emission triggers sync-message signals which calls callbacks
62        # from the thread that signals, but happens immediately
63        self._bus.enable_sync_message_emission()
64        self._bus.connect('sync-message::eos', self.bus_eos_cb)
65        self._bus.connect('sync-message::tag', self.bus_tag_cb)
66        self._bus.connect('sync-message::error', self.bus_error_cb)
67
68        self.parsed()
69
70        self.debug('pausing pipeline')
71        self.pipeline.set_state(gst.STATE_PAUSED)
72        self.pipeline.get_state()
73        self.debug('paused pipeline')
74
75        if not self.exception:
76            self.paused()
77        else:
78            raise self.exception
79
80    def getPipelineDesc(self):
81        raise NotImplementedError
82
83    def parsed(self):
84        """
85        Called after parsing the pipeline but before setting it to paused.
86        """
87        pass
88
89    def paused(self):
90        """
91        Called after pipeline is paused
92        """
93        pass
94
95    def bus_eos_cb(self, bus, message):
96        pass
97
98    def bus_tag_cb(self, bus, message):
99        pass
100
101    def bus_error_cb(self, bus, message):
102        exc = GstException(*message.parse_error())
103        self.setAndRaiseException(exc)
104        gst.debug('error, scheduling stop')
105        #self.runner.schedule(0, self.stop)
106
107
108class ChecksumTask(GstPipelineTask):
109    """
110    I am a task that calculates a checksum of the decoded audio data.
111
112    @ivar checksum: the resulting checksum
113    """
114
115    logCategory = 'ChecksumTask'
116
117    # this object needs a main loop to stop
118    description = 'Calculating checksum'
119
120    def __init__(self, path, frameStart=0, frameLength=-1):
121        """
122        A frame is considered a set of samples for each channel;
123        ie 16 bit stereo is 4 bytes per frame.
124        If frameLength < 0 it is treated as 'unknown' and calculated.
125
126        @type  path:       unicode
127        @type  frameStart: int
128        @param frameStart: the frame to start at
129        """
130        assert type(path) is unicode, "%r is not unicode" % path
131
132        # use repr/%r because path can be unicode
133        self.debug('Creating checksum task on %r from %d to %d',
134            path, frameStart, frameLength)
135        if not os.path.exists(path):
136            raise IndexError, '%r does not exist' % path
137
138        self._path = path
139        self._frameStart = frameStart
140        self._frameLength = frameLength
141        self._frameEnd = None
142        self._checksum = 0
143        self._bytes = 0 # number of bytes received
144        self._first = None
145        self._last = None
146        self._adapter = gst.Adapter()
147
148        self.checksum = None # result
149
150    def getPipelineDesc(self):
151        return '''
152            filesrc location="%s" !
153            decodebin ! audio/x-raw-int !
154            appsink name=sink sync=False emit-signals=True
155            ''' % common.quoteParse(self._path).encode('utf-8')
156
157    def paused(self):
158        sink = self.pipeline.get_by_name('sink')
159
160        if self._frameLength < 0:
161            self.debug('query duration')
162            try:
163                length, qformat = sink.query_duration(gst.FORMAT_DEFAULT)
164            except gst.QueryError, e:
165                self.setException(e)
166                self.stop()
167                return
168
169            # wavparse 0.10.14 returns in bytes
170            if qformat == gst.FORMAT_BYTES:
171                self.debug('query returned in BYTES format')
172                length /= 4
173            self.debug('total length: %r', length)
174            self._frameLength = length - self._frameStart
175            self.debug('audio frame length is %r', self._frameLength)
176        else:
177            self.debug('frameLength known, is %d' % self._frameLength)
178        self._frameEnd = self._frameStart + self._frameLength - 1
179
180        self.debug('event')
181
182
183        # the segment end only is respected since -good 0.10.14.1
184        event = gst.event_new_seek(1.0, gst.FORMAT_DEFAULT,
185            gst.SEEK_FLAG_FLUSH,
186            gst.SEEK_TYPE_SET, self._frameStart,
187            gst.SEEK_TYPE_SET, self._frameEnd + 1) # half-inclusive interval
188        gst.debug('CRCing %r from sector %d to sector %d' % (
189            self._path,
190            self._frameStart / common.SAMPLES_PER_FRAME,
191            (self._frameEnd + 1) / common.SAMPLES_PER_FRAME))
192        # FIXME: sending it with frameEnd set screws up the seek, we don't get
193        # everything for flac; fixed in recent -good
194        result = sink.send_event(event)
195        self.debug('event sent, result %r', result)
196        sink.connect('new-buffer', self._new_buffer_cb)
197        sink.connect('eos', self._eos_cb)
198
199        self.debug('scheduling setting to play')
200        # since set_state returns non-False, adding it as timeout_add
201        # will repeatedly call it, and block the main loop; so
202        #   gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING)
203        # would not work.
204
205        def play():
206            self.pipeline.set_state(gst.STATE_PLAYING)
207            return False
208        self.runner.schedule(0, play)
209
210        #self.pipeline.set_state(gst.STATE_PLAYING)
211        self.debug('scheduled setting to play')
212
213    def _new_buffer_cb(self, sink):
214        buf = sink.emit('pull-buffer')
215        gst.log('received new buffer at offset %r with length %r' % (
216            buf.offset, buf.size))
217        if self._first is None:
218            self._first = buf.offset
219            self.debug('first sample is %r', self._first)
220        self._last = buf
221
222        assert len(buf) % 4 == 0, "buffer is not a multiple of 4 bytes"
223       
224        # FIXME: gst-python 0.10.14.1 doesn't have adapter_peek/_take wrapped
225        # see http://bugzilla.gnome.org/show_bug.cgi?id=576505
226        self._adapter.push(buf)
227
228        while self._adapter.available() >= common.BYTES_PER_FRAME:
229            # FIXME: in 0.10.14.1, take_buffer leaks a ref
230            buf = self._adapter.take_buffer(common.BYTES_PER_FRAME)
231
232            self._checksum = self.do_checksum_buffer(buf, self._checksum)
233            self._bytes += len(buf)
234
235            # update progress
236            frame = self._first + self._bytes / 4
237            framesDone = frame - self._frameStart
238            progress = float(framesDone) / float((self._frameLength))
239            # marshall to the main thread
240            self.runner.schedule(0, self.setProgress, progress)
241
242    def do_checksum_buffer(self, buf, checksum):
243        """
244        Subclasses should implement this.
245        """
246        raise NotImplementedError
247
248    def _eos_cb(self, sink):
249        # get the last one; FIXME: why does this not get to us before ?
250        #self._new_buffer_cb(sink)
251        self.debug('eos, scheduling stop')
252        self.runner.schedule(0, self.stop)
253
254    def stop(self):
255        self.debug('stopping')
256        self.debug('setting state to NULL')
257        self.pipeline.set_state(gst.STATE_NULL)
258
259        if not self._last:
260            # see http://bugzilla.gnome.org/show_bug.cgi?id=578612
261            print 'ERROR: checksum: not a single buffer gotten'
262            # FIXME: instead of print, do something useful
263        else:
264            self._checksum = self._checksum % 2 ** 32
265            self.debug("last offset %r", self._last.offset)
266            last = self._last.offset + len(self._last) / 4 - 1
267            self.debug("last sample: %r", last)
268            self.debug("frame end: %r", self._frameEnd)
269            self.debug("frame length: %r", self._frameLength)
270            self.debug("checksum: %08X", self._checksum)
271            self.debug("bytes: %d", self._bytes)
272            if self._frameEnd != last:
273                print 'ERROR: did not get all frames, %d missing' % (
274                    self._frameEnd - last)
275
276        # publicize and stop
277        self.checksum = self._checksum
278        task.Task.stop(self)
279
280class CRC32Task(ChecksumTask):
281    """
282    I do a simple CRC32 check.
283    """
284
285    description = 'Calculating CRC'
286
287    def do_checksum_buffer(self, buf, checksum):
288        return zlib.crc32(buf, checksum)
289
290class AccurateRipChecksumTask(ChecksumTask):
291    """
292    I implement the AccurateRip checksum.
293
294    See http://www.accuraterip.com/
295    """
296
297    description = 'Calculating AccurateRip checksum'
298
299    def __init__(self, path, trackNumber, trackCount, frameStart=0, frameLength=-1):
300        ChecksumTask.__init__(self, path, frameStart, frameLength)
301        self._trackNumber = trackNumber
302        self._trackCount = trackCount
303        self._discFrameCounter = 0 # 1-based
304
305    def __repr__(self):
306        return "<AccurateRipCheckSumTask of track %d in %r>" % (
307            self._trackNumber, self._path)
308
309    def do_checksum_buffer(self, buf, checksum):
310        self._discFrameCounter += 1
311
312        # on first track ...
313        if self._trackNumber == 1:
314            # ... skip first 4 CD frames
315            if self._discFrameCounter <= 4:
316                gst.debug('skipping frame %d' % self._discFrameCounter)
317                return checksum
318            # ... on 5th frame, only use last value
319            elif self._discFrameCounter == 5:
320                values = struct.unpack("<I", buf[-4:])
321                checksum += common.SAMPLES_PER_FRAME * 5 * values[0]
322                checksum &= 0xFFFFFFFF
323                return checksum
324 
325        # on last track, skip last 5 CD frames
326        if self._trackNumber == self._trackCount:
327            discFrameLength = self._frameLength / common.SAMPLES_PER_FRAME
328            if self._discFrameCounter > discFrameLength - 5:
329                self.debug('skipping frame %d', self._discFrameCounter)
330                return checksum
331
332        values = struct.unpack("<%dI" % (len(buf) / 4), buf)
333        for i, value in enumerate(values):
334            # self._bytes is updated after do_checksum_buffer
335            checksum += (self._bytes / 4 + i + 1) * value
336            checksum &= 0xFFFFFFFF
337            # offset = self._bytes / 4 + i + 1
338            # if offset % common.SAMPLES_PER_FRAME == 0:
339            #    print 'THOMAS: frame %d, ends before %d, last value %08x, CRC %08x' % (
340            #        offset / common.SAMPLES_PER_FRAME, offset, value, sum)
341
342        return checksum
343
344class TRMTask(GstPipelineTask):
345    """
346    I calculate a MusicBrainz TRM fingerprint.
347
348    @ivar trm: the resulting trm
349    """
350
351    trm = None
352    description = 'Calculating fingerprint'
353
354    def __init__(self, path):
355        if not os.path.exists(path):
356            raise IndexError, '%s does not exist' % path
357
358        self.path = path
359        self._trm = None
360        self._bus = None
361
362    def getPipelineDesc(self):
363        return '''
364            filesrc location="%s" !
365            decodebin ! audioconvert ! audio/x-raw-int !
366            trm name=trm !
367            appsink name=sink sync=False emit-signals=True''' % self.path
368
369
370    def parsed(self):
371        sink = self.pipeline.get_by_name('sink')
372        sink.connect('new-buffer', self._new_buffer_cb)
373
374    def paused(self):
375        gst.debug('query duration')
376
377        self._length, qformat = self.pipeline.query_duration(gst.FORMAT_TIME)
378        gst.debug('total length: %r' % self._length)
379        gst.debug('scheduling setting to play')
380        # since set_state returns non-False, adding it as timeout_add
381        # will repeatedly call it, and block the main loop; so
382        #   gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING)
383        # would not work.
384
385        def play():
386            self.pipeline.set_state(gst.STATE_PLAYING)
387            return False
388        self.runner.schedule(0, play)
389
390        #self.pipeline.set_state(gst.STATE_PLAYING)
391        gst.debug('scheduled setting to play')
392
393    # FIXME: can't move this to base class because it triggers too soon
394    # in the case of checksum
395    def bus_eos_cb(self, bus, message):
396        gst.debug('eos, scheduling stop')
397        self.runner.schedule(0, self.stop)
398
399
400    def bus_tag_cb(self, bus, message):
401        taglist = message.parse_tag()
402        if 'musicbrainz-trmid' in taglist.keys():
403            self._trm = taglist['musicbrainz-trmid']
404
405    def _new_buffer_cb(self, sink):
406        # this is just for counting progress
407        buf = sink.emit('pull-buffer')
408        position = buf.timestamp
409        if buf.duration != gst.CLOCK_TIME_NONE:
410            position += buf.duration
411        self.setProgress(float(position) / self._length)
412
413    def stop(self):
414        gst.debug('stopping')
415        gst.debug('setting state to NULL')
416        self.pipeline.set_state(gst.STATE_NULL)
417
418        # publicize and stop
419        self.trm = self._trm
420        task.Task.stop(self)
Note: See TracBrowser for help on using the repository browser.