mirror of https://github.com/wanadev/yoga.git
Refactoring: split encoders (one per file), moves the format detection code in the dedicate dencoder
This commit is contained in:
parent
2e46e47ed7
commit
4803e3f760
|
@ -2,7 +2,7 @@
|
|||
|
||||
import sys
|
||||
|
||||
from yoga.image.helpers import get_riff_structure
|
||||
from yoga.image.encoders.webp import get_riff_structure
|
||||
|
||||
|
||||
def print_riff_info(input_path):
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
import pytest
|
||||
|
||||
from yoga.image.encoders import webp
|
||||
|
||||
|
||||
class Test_little_endian_unint32_bytes_to_python_int(object):
|
||||
def test_uint32_value(self):
|
||||
assert (
|
||||
webp.little_endian_unint32_bytes_to_python_int(b"\x78\x56\x34\x12")
|
||||
== 305419896
|
||||
)
|
||||
|
||||
|
||||
class Test_get_riff_structure(object):
|
||||
@pytest.fixture
|
||||
def webp_image(self):
|
||||
return open("test/images/alpha.lossless.metadata.webp", "rb").read()
|
||||
|
||||
def test_riff_structure(object, webp_image):
|
||||
riff = webp.get_riff_structure(webp_image)
|
||||
assert riff["formtype"] == "WEBP"
|
||||
assert riff["size"] == 11868
|
||||
assert len(riff["chunks"]) == 5
|
||||
assert riff["chunks"][0]["type"] == "VP8X"
|
||||
assert riff["chunks"][1]["type"] == "ICCP"
|
||||
assert riff["chunks"][2]["type"] == "VP8L"
|
||||
assert riff["chunks"][3]["type"] == "EXIF"
|
||||
assert riff["chunks"][4]["type"] == "XMP "
|
|
@ -37,33 +37,6 @@ class Test_image_have_alpha(object):
|
|||
assert not helpers.image_have_alpha(image, threshold)
|
||||
|
||||
|
||||
class Test_little_endian_unint32_bytes_to_python_int(object):
|
||||
def test_uint32_value(self):
|
||||
assert (
|
||||
helpers.little_endian_unint32_bytes_to_python_int(
|
||||
b"\x78\x56\x34\x12"
|
||||
)
|
||||
== 305419896
|
||||
)
|
||||
|
||||
|
||||
class Test_get_riff_structure(object):
|
||||
@pytest.fixture
|
||||
def webp_image(self):
|
||||
return open("test/images/alpha.lossless.metadata.webp", "rb").read()
|
||||
|
||||
def test_riff_structure(object, webp_image):
|
||||
riff = helpers.get_riff_structure(webp_image)
|
||||
assert riff["formtype"] == "WEBP"
|
||||
assert riff["size"] == 11868
|
||||
assert len(riff["chunks"]) == 5
|
||||
assert riff["chunks"][0]["type"] == "VP8X"
|
||||
assert riff["chunks"][1]["type"] == "ICCP"
|
||||
assert riff["chunks"][2]["type"] == "VP8L"
|
||||
assert riff["chunks"][3]["type"] == "EXIF"
|
||||
assert riff["chunks"][4]["type"] == "XMP "
|
||||
|
||||
|
||||
class Test_guess_image_format(object):
|
||||
@pytest.mark.parametrize(
|
||||
"image_path, expected_format",
|
||||
|
|
|
@ -125,7 +125,8 @@ API
|
|||
|
||||
from PIL import Image
|
||||
|
||||
from . import encoders
|
||||
from .encoders.jpeg import optimize_jpeg
|
||||
from .encoders.png import optimize_png
|
||||
from .options import normalize_options
|
||||
from .helpers import image_have_alpha
|
||||
|
||||
|
@ -173,9 +174,9 @@ def optimize(input_file, output_file, options={}, verbose=False, quiet=False):
|
|||
# convert / optimize
|
||||
output_image_bytes = None
|
||||
if output_format == "jpeg":
|
||||
output_image_bytes = encoders.jpeg(image, options["jpeg_quality"])
|
||||
output_image_bytes = optimize_jpeg(image, options["jpeg_quality"])
|
||||
elif output_format == "png":
|
||||
output_image_bytes = encoders.png(image)
|
||||
output_image_bytes = optimize_png(image)
|
||||
else:
|
||||
raise ValueError("Invalid output format %s" % output_format)
|
||||
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
import pyguetzli
|
||||
|
||||
|
||||
def is_jpeg(file_bytes):
|
||||
"""Whether or not the given bytes represent a JPEG file.
|
||||
|
||||
:params bytes file_bytes: The bytes of the file to check.
|
||||
|
||||
:rtype: bool
|
||||
:return: ``True`` if the bytes represent a JPEG file, ``False`` else.
|
||||
"""
|
||||
JPEG_MAGICS = [
|
||||
b"\xFF\xD8\xFF\xDB",
|
||||
b"\xFF\xD8\xFF\xE0\x00\x10\x4A\x46\x49\x46\x00\x01", # JFIF format
|
||||
b"\xFF\xD8\xFF\xEE",
|
||||
b"\xFF\xD8\xFF\xE1", # xx xx 45 78 69 66 00 00 / Exif format
|
||||
]
|
||||
for magic in JPEG_MAGICS:
|
||||
if file_bytes.startswith(magic):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def optimize_jpeg(image, quality):
|
||||
"""Encode image to JPEG using Guetzli.
|
||||
|
||||
:param PIL.Image image: The image to encode.
|
||||
:param float quality: The output JPEG quality (from ``0.00``to ``1.00``).
|
||||
|
||||
:returns: The encoded image's bytes.
|
||||
"""
|
||||
if not 0.00 <= quality <= 1.00:
|
||||
raise ValueError("JPEG quality value must be between 0.00 and 1.00")
|
||||
return pyguetzli.process_pil_image(image, int(quality * 100))
|
|
@ -1,23 +1,20 @@
|
|||
import io
|
||||
|
||||
import pyguetzli
|
||||
import zopfli
|
||||
|
||||
|
||||
def jpeg(image, quality):
|
||||
"""Encode image to JPEG using Guetzli.
|
||||
def is_png(file_bytes):
|
||||
"""Whether or not the given bytes represent a PNG file.
|
||||
|
||||
:param PIL.Image image: The image to encode.
|
||||
:param float quality: The output JPEG quality (from ``0.00``to ``1.00``).
|
||||
:params bytes file_bytes: The bytes of the file to check.
|
||||
|
||||
:returns: The encoded image's bytes.
|
||||
:rtype: bool
|
||||
:return: ``True`` if the bytes represent a PNG file, ``False`` else.
|
||||
"""
|
||||
if not 0.00 <= quality <= 1.00:
|
||||
raise ValueError("JPEG quality value must be between 0.00 and 1.00")
|
||||
return pyguetzli.process_pil_image(image, int(quality * 100))
|
||||
return file_bytes.startswith(b"\x89PNG\r\n")
|
||||
|
||||
|
||||
def png(image):
|
||||
def optimize_png(image):
|
||||
"""Encode image to PNG using ZopfliPNG.
|
||||
|
||||
:param PIL.Image image: The image to encode.
|
|
@ -0,0 +1,66 @@
|
|||
import struct
|
||||
|
||||
|
||||
def little_endian_unint32_bytes_to_python_int(bytes_):
|
||||
return struct.unpack("<L", bytes_)[0]
|
||||
|
||||
|
||||
def get_riff_structure(data):
|
||||
if data[0:4] != b"RIFF":
|
||||
raise ValueError("Unvalid RIFF: Not a RIFF file")
|
||||
|
||||
result = {
|
||||
"formtype": data[8:12].decode(),
|
||||
"size": little_endian_unint32_bytes_to_python_int(data[4:8]),
|
||||
"chunks": [],
|
||||
}
|
||||
|
||||
if result["size"] + 8 != len(data):
|
||||
raise ValueError("Unvalid RIFF: Truncated data")
|
||||
|
||||
offset = 12 # RIFF header length
|
||||
|
||||
while offset < len(data):
|
||||
chunk = {
|
||||
"type": data[offset : offset + 4].decode(),
|
||||
"data_offset": offset + 8,
|
||||
"size": little_endian_unint32_bytes_to_python_int(
|
||||
data[offset + 4 : offset + 8]
|
||||
),
|
||||
}
|
||||
result["chunks"].append(chunk)
|
||||
offset += 8 + chunk["size"] + chunk["size"] % 2
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def is_riff(file_bytes):
|
||||
"""Whether or not the given bytes represent a RIFF file.
|
||||
|
||||
:params bytes file_bytes: The bytes of the file to check.
|
||||
|
||||
:rtype: bool
|
||||
:return: ``True`` if the bytes represent a RIFF file, ``False`` else.
|
||||
"""
|
||||
return file_bytes.startswith(b"RIFF")
|
||||
|
||||
|
||||
def is_lossy_webp(file_bytes):
|
||||
"""Whether or not the given bytes represent a lossy WEBP file.
|
||||
|
||||
:params bytes file_bytes: The bytes of the file to check.
|
||||
|
||||
:rtype: bool
|
||||
:return: ``True`` if the bytes represent a lossy WEBP file, ``False`` else.
|
||||
"""
|
||||
if not is_riff(file_bytes):
|
||||
return False
|
||||
|
||||
riff = get_riff_structure(file_bytes)
|
||||
|
||||
if riff["formtype"] == "WEBP":
|
||||
chunks = [chunk["type"] for chunk in riff["chunks"]]
|
||||
if "VP8 " in chunks:
|
||||
return True
|
||||
|
||||
return False
|
|
@ -0,0 +1,23 @@
|
|||
from .webp import is_riff, get_riff_structure
|
||||
|
||||
|
||||
def is_lossless_webp(file_bytes):
|
||||
"""Whether or not the given bytes represent a lossless WEBP file.
|
||||
|
||||
:params bytes file_bytes: The bytes of the file to check.
|
||||
|
||||
:rtype: bool
|
||||
:return: ``True`` if the bytes represent a lossless WEBP file, ``False``
|
||||
else.
|
||||
"""
|
||||
if not is_riff(file_bytes):
|
||||
return False
|
||||
|
||||
riff = get_riff_structure(file_bytes)
|
||||
|
||||
if riff["formtype"] == "WEBP":
|
||||
chunks = [chunk["type"] for chunk in riff["chunks"]]
|
||||
if "VP8L" in chunks:
|
||||
return True
|
||||
|
||||
return False
|
|
@ -1,4 +1,7 @@
|
|||
import struct
|
||||
from .encoders.jpeg import is_jpeg
|
||||
from .encoders.png import is_png
|
||||
from .encoders.webp import is_lossy_webp
|
||||
from .encoders.webp_lossless import is_lossless_webp
|
||||
|
||||
|
||||
def image_have_alpha(image, threshold=0xFE):
|
||||
|
@ -14,53 +17,16 @@ def image_have_alpha(image, threshold=0xFE):
|
|||
return False
|
||||
|
||||
|
||||
def little_endian_unint32_bytes_to_python_int(bytes_):
|
||||
return struct.unpack("<L", bytes_)[0]
|
||||
|
||||
|
||||
def get_riff_structure(data):
|
||||
if data[0:4] != b"RIFF":
|
||||
raise ValueError("Unvalid RIFF: Not a RIFF file")
|
||||
|
||||
result = {
|
||||
"formtype": data[8:12].decode(),
|
||||
"size": little_endian_unint32_bytes_to_python_int(data[4:8]),
|
||||
"chunks": [],
|
||||
def guess_image_format(image_bytes):
|
||||
FORMATS = {
|
||||
"jpeg": is_jpeg,
|
||||
"png": is_png,
|
||||
"webp": is_lossy_webp,
|
||||
"webpl": is_lossless_webp,
|
||||
}
|
||||
|
||||
if result["size"] + 8 != len(data):
|
||||
raise ValueError("Unvalid RIFF: Truncated data")
|
||||
|
||||
offset = 12 # RIFF header length
|
||||
|
||||
while offset < len(data):
|
||||
chunk = {
|
||||
"type": data[offset : offset + 4].decode(),
|
||||
"data_offset": offset + 8,
|
||||
"size": little_endian_unint32_bytes_to_python_int(
|
||||
data[offset + 4 : offset + 8]
|
||||
),
|
||||
}
|
||||
result["chunks"].append(chunk)
|
||||
offset += 8 + chunk["size"] + chunk["size"] % 2
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def guess_image_format(image_bytes):
|
||||
if image_bytes.startswith(b"\xFF\xD8\xFF\xE0"):
|
||||
return "jpeg"
|
||||
|
||||
if image_bytes.startswith(b"\x89PNG\r\n"):
|
||||
return "png"
|
||||
|
||||
if image_bytes.startswith(b"RIFF"):
|
||||
riff = get_riff_structure(image_bytes)
|
||||
if riff["formtype"] == "WEBP":
|
||||
chunks = [chunk["type"] for chunk in riff["chunks"]]
|
||||
if "VP8 " in chunks:
|
||||
return "webp"
|
||||
if "VP8L" in chunks:
|
||||
return "webpl"
|
||||
for format_, checker in FORMATS.items():
|
||||
if checker(image_bytes):
|
||||
return format_
|
||||
|
||||
raise ValueError("Unsupported image format")
|
||||
|
|
Loading…
Reference in New Issue