gallery.accords-library.com/server/szurubooru/func/images.py

117 lines
3.6 KiB
Python
Raw Normal View History

from typing import List
import logging
2016-04-30 21:17:08 +00:00
import json
import shlex
2016-04-09 19:41:10 +00:00
import subprocess
import math
2016-04-09 19:41:10 +00:00
from szurubooru import errors
from szurubooru.func import mime, util
logger = logging.getLogger(__name__)
2016-04-09 19:41:10 +00:00
2017-04-24 21:30:53 +00:00
_SCALE_FIT_FMT = (
r'scale=iw*max({width}/iw\,{height}/ih):ih*max({width}/iw\,{height}/ih)')
2016-04-09 19:41:10 +00:00
class Image:
def __init__(self, content: bytes) -> None:
2016-04-09 19:41:10 +00:00
self.content = content
2016-04-30 21:17:08 +00:00
self._reload_info()
@property
def width(self) -> int:
2016-04-30 21:17:08 +00:00
return self.info['streams'][0]['width']
@property
def height(self) -> int:
2016-04-30 21:17:08 +00:00
return self.info['streams'][0]['height']
@property
def frames(self) -> int:
2016-04-30 21:17:08 +00:00
return self.info['streams'][0]['nb_read_frames']
2016-04-09 19:41:10 +00:00
def resize_fill(self, width: int, height: int) -> None:
cli = [
'-i', '{path}',
2016-04-09 19:41:10 +00:00
'-f', 'image2',
'-vf', _SCALE_FIT_FMT.format(width=width, height=height),
'-map', '0:v:0',
2016-04-09 19:41:10 +00:00
'-vframes', '1',
'-vcodec', 'png',
'-',
]
if 'duration' in self.info['format'] \
and self.info['format']['format_name'] != 'swf':
duration = float(self.info['format']['duration'])
if duration > 3:
cli = [
'-ss',
'%d' % math.floor(duration * 0.3),
] + cli
self.content = self._execute(cli)
assert self.content
2016-04-30 21:17:08 +00:00
self._reload_info()
2016-04-09 19:41:10 +00:00
def to_png(self) -> bytes:
2016-04-09 19:41:10 +00:00
return self._execute([
'-i', '{path}',
2016-04-09 19:41:10 +00:00
'-f', 'image2',
'-map', '0:v:0',
2016-04-09 19:41:10 +00:00
'-vframes', '1',
'-vcodec', 'png',
2016-04-09 19:41:10 +00:00
'-',
])
def to_jpeg(self) -> bytes:
return self._execute([
'-f', 'lavfi',
'-i', 'color=white:s=%dx%d' % (self.width, self.height),
'-i', '{path}',
'-f', 'image2',
'-filter_complex', 'overlay',
'-map', '0:v:0',
'-vframes', '1',
'-vcodec', 'mjpeg',
'-',
])
2016-04-09 19:41:10 +00:00
2017-04-24 21:30:53 +00:00
def _execute(self, cli: List[str], program: str = 'ffmpeg') -> bytes:
extension = mime.get_extension(mime.get_mime_type(self.content))
assert extension
with util.create_temp_file(suffix='.' + extension) as handle:
handle.write(self.content)
handle.flush()
cli = [program, '-loglevel', '24'] + cli
cli = [part.format(path=handle.name) for part in cli]
proc = subprocess.Popen(
cli,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate(input=self.content)
if proc.returncode != 0:
logger.warning(
'Failed to execute ffmpeg command (cli=%r, err=%r)',
' '.join(shlex.quote(arg) for arg in cli),
err)
raise errors.ProcessingError(
'Error while processing image.\n' + err.decode('utf-8'))
return out
2016-04-30 21:17:08 +00:00
def _reload_info(self) -> None:
2016-04-30 21:17:08 +00:00
self.info = json.loads(self._execute([
'-i', '{path}',
2016-04-30 21:17:08 +00:00
'-of', 'json',
'-select_streams', 'v',
'-show_format',
2016-04-30 21:17:08 +00:00
'-show_streams',
], program='ffprobe').decode('utf-8'))
assert 'format' in self.info
2016-04-30 21:17:08 +00:00
assert 'streams' in self.info
if len(self.info['streams']) < 1:
logger.warning('The video contains no video streams.')
2017-02-03 20:42:15 +00:00
raise errors.ProcessingError(
'The video contains no video streams.')