server/posts: automatically detect sound in video post uploads
This commit is contained in:
		
							parent
							
								
									3879c2ec20
								
							
						
					
					
						commit
						8a10fc8ffd
					
				@ -65,6 +65,7 @@ def create_post(
 | 
			
		||||
    posts.update_post_relations(post, relations)
 | 
			
		||||
    posts.update_post_notes(post, notes)
 | 
			
		||||
    posts.update_post_flags(post, flags)
 | 
			
		||||
    posts.test_sound(post, content)
 | 
			
		||||
    if ctx.has_file('thumbnail'):
 | 
			
		||||
        posts.update_post_thumbnail(post, ctx.get_file('thumbnail'))
 | 
			
		||||
    ctx.session.add(post)
 | 
			
		||||
 | 
			
		||||
@ -4,6 +4,7 @@ import json
 | 
			
		||||
import shlex
 | 
			
		||||
import subprocess
 | 
			
		||||
import math
 | 
			
		||||
import re
 | 
			
		||||
from szurubooru import errors
 | 
			
		||||
from szurubooru.func import mime, util
 | 
			
		||||
 | 
			
		||||
@ -141,17 +142,43 @@ class Image:
 | 
			
		||||
            with open(mp4_temp_path, 'rb') as mp4_temp:
 | 
			
		||||
                return mp4_temp.read()
 | 
			
		||||
 | 
			
		||||
    def check_for_sound(self) -> bool:
 | 
			
		||||
        audioinfo = json.loads(self._execute([
 | 
			
		||||
            '-i', '{path}',
 | 
			
		||||
            '-of', 'json',
 | 
			
		||||
            '-select_streams', 'a',
 | 
			
		||||
            '-show_streams',
 | 
			
		||||
        ], program='ffprobe').decode('utf-8'))
 | 
			
		||||
        assert 'streams' in audioinfo
 | 
			
		||||
        if len(audioinfo['streams']) < 1:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        log = self._execute([
 | 
			
		||||
            '-i', '{path}',
 | 
			
		||||
            '-af', 'volumedetect',
 | 
			
		||||
            '-f', 'null',
 | 
			
		||||
            '-y', '/dev/null',
 | 
			
		||||
        ], get_logs=True).decode('utf-8')
 | 
			
		||||
        log_match = re.search(r'.*volumedetect.*mean_volume: (.*) dB', log)
 | 
			
		||||
        assert log_match
 | 
			
		||||
        assert log_match.groups()
 | 
			
		||||
        meanvol = float(log_match.groups()[0])
 | 
			
		||||
 | 
			
		||||
        # -91.0 dB is the minimum for 16-bit audio, assume sound if > -80.0 dB
 | 
			
		||||
        return meanvol > -80.0
 | 
			
		||||
 | 
			
		||||
    def _execute(
 | 
			
		||||
            self,
 | 
			
		||||
            cli: List[str],
 | 
			
		||||
            program: str = 'ffmpeg',
 | 
			
		||||
            ignore_error_if_data: bool = False) -> bytes:
 | 
			
		||||
            ignore_error_if_data: bool = False,
 | 
			
		||||
            get_logs: bool = False) -> bytes:
 | 
			
		||||
        extension = mime.get_extension(mime.get_mime_type(self.content))
 | 
			
		||||
        assert extension
 | 
			
		||||
        with util.create_temp_file(suffix='.' + extension) as handle:
 | 
			
		||||
            handle.write(self.content)
 | 
			
		||||
            handle.flush()
 | 
			
		||||
            cli = [program, '-loglevel', '24'] + cli
 | 
			
		||||
            cli = [program, '-loglevel', '32' if get_logs else '24'] + cli
 | 
			
		||||
            cli = [part.format(path=handle.name) for part in cli]
 | 
			
		||||
            proc = subprocess.Popen(
 | 
			
		||||
                cli,
 | 
			
		||||
@ -169,7 +196,7 @@ class Image:
 | 
			
		||||
                    raise errors.ProcessingError(
 | 
			
		||||
                        'Error while processing image.\n'
 | 
			
		||||
                        + err.decode('utf-8'))
 | 
			
		||||
            return out
 | 
			
		||||
            return err if get_logs else out
 | 
			
		||||
 | 
			
		||||
    def _reload_info(self) -> None:
 | 
			
		||||
        self.info = json.loads(self._execute([
 | 
			
		||||
 | 
			
		||||
@ -474,6 +474,17 @@ def generate_alternate_formats(post: model.Post, content: bytes) \
 | 
			
		||||
    return new_posts
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_sound(post: model.Post, content: bytes) -> None:
 | 
			
		||||
    assert post
 | 
			
		||||
    assert content
 | 
			
		||||
    if mime.is_video(mime.get_mime_type(content)):
 | 
			
		||||
        if images.Image(content).check_for_sound():
 | 
			
		||||
            flags = [x for x in post.flags.split(',') if x]
 | 
			
		||||
            if model.Post.FLAG_SOUND not in flags:
 | 
			
		||||
                flags.append(model.Post.FLAG_SOUND)
 | 
			
		||||
            update_post_flags(post, flags)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_post_content(post: model.Post, content: Optional[bytes]) -> None:
 | 
			
		||||
    assert post
 | 
			
		||||
    if not content:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user