"""
Read and write EcoTaxa archives.
"`EcoTaxa`_ is a web application dedicated to the visual exploration
and the taxonomic annotation of images that illustrate the
beauty of planktonic biodiversity."
.. _EcoTaxa: https://ecotaxa.obs-vlfr.fr/
"""
import fnmatch
import io
import os.path
import pathlib
import tarfile
import zipfile
from abc import ABC, abstractproperty
from shutil import copyfileobj
from typing import (
IO,
BinaryIO,
Callable,
List,
Mapping,
Optional,
Tuple,
TypeVar,
Union,
)
import numpy as np
import pandas as pd
import PIL
import PIL.Image
from morphocut import Node, Output, RawOrVariable, ReturnOutputs, closing_if_closable
from morphocut.core import Stream
from morphocut.utils import StreamEstimator, stream_groupby
T = TypeVar("T")
MaybeTuple = Union[T, Tuple[T]]
MaybeList = Union[T, List[T]]
def dtype_to_ecotaxa(dtype):
try:
if np.issubdtype(dtype, np.number):
return "[f]"
except TypeError: # pragma: no cover
print(type(dtype))
raise
return "[t]"
[docs]class MemberNotFoundError(Exception):
pass
[docs]class UnknownArchiveError(Exception):
pass
[docs]class Archive:
"""
A generic archive reader and writer for ZIP and TAR archives.
"""
extensions: List[str] = []
def __new__(cls, archive_fn: Union[str, pathlib.Path], mode: str = "r"):
archive_fn = str(archive_fn)
if mode[0] == "r":
for subclass in cls.__subclasses__():
if subclass.is_readable(archive_fn):
return super(Archive, subclass).__new__(subclass)
raise UnknownArchiveError(f"No handler found to read {archive_fn}")
if mode[0] in ("a", "w", "x"):
for subclass in cls.__subclasses__():
if any(archive_fn.endswith(ext) for ext in subclass.extensions):
return super(Archive, subclass).__new__(subclass)
raise UnknownArchiveError(f"No handler found to write {archive_fn}")
@staticmethod
def is_readable(archive_fn) -> bool:
raise NotImplementedError()
def __init__(self, archive_fn: Union[str, pathlib.Path], mode: str = "r"):
raise NotImplementedError()
[docs] def read_member(self, member_fn) -> IO:
"""
Raises:
MemberNotFoundError: if a member was not found
"""
raise NotImplementedError()
def write_member(
self, member_fn, fileobj_or_bytes: Union[IO, bytes], compress_hint=True
):
raise NotImplementedError()
def find(self, pattern) -> List[str]:
return fnmatch.filter(self.members(), pattern)
def members(self) -> List[str]:
raise NotImplementedError()
def close(self): # pragma: no cover
pass
def __enter__(self):
return self
def __exit__(self, *_, **__):
self.close()
[docs]class TarArchive(Archive):
extensions = [
".tar",
".tar.bz2",
".tb2",
".tbz",
".tbz2",
".tz2",
".tar.gz",
".taz",
".tgz",
".tar.lzma",
".tlz",
]
@staticmethod
def is_readable(archive_fn):
return tarfile.is_tarfile(archive_fn)
def __init__(self, archive_fn: Union[str, pathlib.Path], mode: str = "r"):
self._tar = tarfile.open(archive_fn, mode)
self._members = None
def close(self):
self._tar.close()
[docs] def read_member(self, member_fn):
return self._tar.extractfile(self._resolve_member(member_fn))
def _load_members(self):
if self._members is not None:
return
self._members = {tar_info.name: tar_info for tar_info in self._tar.getmembers()}
def _resolve_member(self, member):
if isinstance(member, tarfile.TarInfo):
return member
self._load_members()
assert self._members is not None
return self._members[member]
def write_member(
self, member_fn: str, fileobj_or_bytes: Union[IO, bytes], compress_hint=True
):
if isinstance(fileobj_or_bytes, bytes):
fileobj_or_bytes = io.BytesIO(fileobj_or_bytes)
if isinstance(fileobj_or_bytes, io.BytesIO):
tar_info = tarfile.TarInfo(member_fn)
tar_info.size = len(fileobj_or_bytes.getbuffer())
else:
tar_info = self._tar.gettarinfo(arcname=member_fn, fileobj=fileobj_or_bytes)
self._tar.addfile(tar_info, fileobj=fileobj_or_bytes)
def members(self):
return self._tar.getnames()
[docs]class ZipArchive(Archive):
extensions = [".zip"]
@staticmethod
def is_readable(archive_fn):
return zipfile.is_zipfile(archive_fn)
def __init__(self, archive_fn: Union[str, pathlib.Path], mode: str = "r"):
self._zip = zipfile.ZipFile(archive_fn, mode)
def members(self):
return self._zip.namelist()
[docs] def read_member(self, member):
try:
return self._zip.open(member)
except KeyError as exc:
raise MemberNotFoundError(f"{member} not in {self._zip.filename}") from exc
def write_member(
self, member_fn: str, fileobj_or_bytes: Union[IO, bytes], compress_hint=True
):
compress_type = zipfile.ZIP_DEFLATED if compress_hint else zipfile.ZIP_STORED
# TODO: Optimize for on-disk files and BytesIO (.getvalue())
if isinstance(fileobj_or_bytes, bytes):
return self._zip.writestr(
member_fn, fileobj_or_bytes, compress_type=compress_type
)
self._zip.writestr(
member_fn, fileobj_or_bytes.read(), compress_type=compress_type
)
def close(self):
self._zip.close()
def split_path(path: str) -> Tuple[str, str]:
if "/" in path:
head, tail = path.rsplit("/", 1)
return head, tail
return "", path
[docs]@ReturnOutputs
class EcotaxaWriter(Node):
"""
Create an archive of images and metadata that is importable to EcoTaxa.
Args:
archive_fn (str): Location of the output file.
fnames_images (Tuple, Variable, or a list thereof):
Tuple of ``(filename, image)`` or a list of such tuples.
``filename`` is the name in the archive. ``image`` is a NumPy array.
The file extension has to be one of ``".jpg"``, ``".png"`` or ``".gif"``
to meet the specifications of EcoTaxa.
meta (Mapping or Variable, optional): Metadata to store in the TSV file.
Each key corresponds to a column in the resulting file.
object_meta (Mapping or Variable, optional): Metadata stored with ``object_`` prefix.
acq_meta (Mapping or Variable, optional): Metadata stored with ``acq_`` prefix.
process_meta (Mapping or Variable, optional): Metadata stored with ``process_`` prefix.
sample_meta (Mapping or Variable, optional): Metadata stored with ``sample_`` prefix.
meta_fn (str, optional): TSV file. Must start with ``ecotaxa``.
store_types (bool, optional): Whether to add a row with types after the header.
Defaults to `True`, according to EcoTaxa's specifications.
If multiple images are provided, ``image`` and
``image_name`` must be tuples of the same length.
The TSV file will have the following columns by default:
- ``img_file_name``: Name of the image file (including extension)
- ``img_rank``: Rank of image to be displayed. Starts at 1.
Other columns are read from ``meta``. The file will contain a column for each object in the stream.
Example:
.. code-block:: python
with Pipeline() as pipeline:
image_fn = ...
image = ImageReader(image_fn)
meta = ... # Calculate some meta-data
EcotaxaWriter("path/to/archive.zip", (image_fn, image), meta)
pipeline.transform_stream()
.. seealso::
For more information about the metadata fields, see the project import page of EcoTaxa.
"""
def __init__(
self,
archive_fn: RawOrVariable[str],
fnames_images: MaybeList[RawOrVariable[Tuple[str, ...]]],
meta: Optional[RawOrVariable[Mapping]] = None,
object_meta: Optional[RawOrVariable[Mapping]] = None,
acq_meta: Optional[RawOrVariable[Mapping]] = None,
process_meta: Optional[RawOrVariable[Mapping]] = None,
sample_meta: Optional[RawOrVariable[Mapping]] = None,
meta_fn: RawOrVariable[str] = "ecotaxa_export.tsv",
store_types: bool = True,
):
super().__init__()
self.archive_fn = archive_fn
if isinstance(fnames_images, tuple):
fnames_images = [fnames_images]
if not isinstance(fnames_images, list):
raise ValueError(
"Unexpected type for fnames_images: needs to be a tuple or a list of tuples"
)
self.fnames_images = fnames_images
self.meta = meta
self.object_meta = object_meta
self.acq_meta = acq_meta
self.process_meta = process_meta
self.sample_meta = sample_meta
self.meta_fn = meta_fn
self.store_types = store_types
@classmethod
def _prepare_images(
cls, fnames_images, archive: Archive, pil_extensions, meta_prefix, meta
):
for img_rank, (fname, img) in enumerate(fnames_images, start=1):
if isinstance(img, io.IOBase):
# Stream
img_fp = img
else:
# Array
img = np.asarray(img)
img_ext = os.path.splitext(fname)[1]
pil_format = pil_extensions[img_ext]
img = PIL.Image.fromarray(img)
img_fp = io.BytesIO()
try:
img.save(img_fp, format=pil_format)
except: # pragma: no cover
print(f"EcotaxaWriter: Error writing {fname}")
raise
# Rewind
img_fp.seek(0)
# Do not compress image files as already compressed
archive.write_member(meta_prefix + fname, img_fp, compress_hint=False)
yield {
**meta,
"img_file_name": fname,
"img_rank": img_rank,
}
def transform_stream(self, stream):
PIL.Image.init()
pil_extensions = PIL.Image.registered_extensions()
with closing_if_closable(stream):
for archive_fn, archive_group in stream_groupby(stream, by=self.archive_fn):
i = 0
with Archive(archive_fn, "w") as archive:
for meta_fn, meta_group in stream_groupby(
archive_group, by=self.meta_fn
):
meta_fn: str
dataframe = []
meta_prefix = split_path(meta_fn)[0]
if meta_prefix:
meta_prefix = meta_prefix + "/"
for obj in meta_group:
(
fnames_images,
meta,
object_meta,
acq_meta,
process_meta,
sample_meta,
) = self.prepare_input(
obj,
(
"fnames_images",
"meta",
"object_meta",
"acq_meta",
"process_meta",
"sample_meta",
),
) # type: ignore
if meta is None:
meta = {}
if object_meta is not None:
meta.update(
("object_" + k, v) for k, v in object_meta.items()
)
if acq_meta is not None:
meta.update(
("acq_" + k, v) for k, v in acq_meta.items()
)
if process_meta is not None:
meta.update(
("process_" + k, v) for k, v in process_meta.items()
)
if sample_meta is not None:
meta.update(
("sample_" + k, v) for k, v in sample_meta.items()
)
if fnames_images:
# Metadata and images: Store image and repeat meta for each individual image
dataframe.extend(
self._prepare_images(
fnames_images,
archive,
pil_extensions,
meta_prefix,
meta,
)
)
else:
# Only metadata: Write only meta
dataframe.append(meta)
yield obj
i += 1
dataframe = pd.DataFrame(dataframe)
if dataframe.size:
# Insert types into header
type_header = [
dtype_to_ecotaxa(dt) for dt in dataframe.dtypes
]
dataframe.columns = pd.MultiIndex.from_tuples(
list(zip(dataframe.columns, type_header))
)
archive.write_member(
meta_fn,
io.BytesIO(
dataframe.to_csv(
sep="\t", encoding="utf-8", index=False
).encode()
),
)
print(
f"EcotaxaWriter: Wrote {len(dataframe):,d} entries to {meta_fn}."
)
print(f"EcotaxaWriter: Wrote {i:,d} objects to {archive_fn}.")
[docs]class EcotaxaObject:
"""
Attributes:
meta (Mapping): Object metadata
index_fn (str, optional): Source index filename inside the archive.
archive_fn (str, optional): Sourche archive filename.
default_mode (str, optional): Default mode for image loading.
See `PIL Modes <https://pillow.readthedocs.io/en/stable/handbook/concepts.html#modes>`_.
"""
__slots__ = ["meta", "_image_data", "index_fn", "archive_fn", "default_mode"]
def __init__(
self, meta, image_data, archive_fn=None, index_fn=None, default_mode=None
):
self.meta = meta
self._image_data = image_data
self.archive_fn = archive_fn
self.index_fn = index_fn
self.default_mode = default_mode
@property
def image(self):
return self.get_image()
@property
def image_data(self):
return self.get_image_data()
@property
def object_id(self):
return self.meta.get("object_id", "<unidentified object>")
def get_image_data(self, img_rank=None):
if img_rank is None:
return next(iter(self._image_data.values()))
else:
try:
return self._image_data[img_rank]
except KeyError:
print(
f"Unknown rank {img_rank}. Available:",
list(self._image_data.keys()),
)
raise
[docs] def get_image(self, img_rank=None, mode=None):
"""
Get an image for the object.
Args:
img_rank: Rank of the image. (Default: The first one.)
mode: Mode to load the image.
See `PIL Modes <https://pillow.readthedocs.io/en/stable/handbook/concepts.html#modes>`_.
"""
data = self.get_image_data(img_rank)
if mode is None:
mode = self.default_mode
try:
image = PIL.Image.open(data)
if mode is not None:
image = image.convert(mode)
return np.array(image)
except PIL.UnidentifiedImageError:
head = data.getvalue()[:20]
print(f"Error reading {self.object_id}: {head}")
raise
[docs]@ReturnOutputs
@Output("object")
class EcotaxaReader(Node):
"""
|stream| Read an archive of images and metadata that is importable to EcoTaxa.
Args:
archive_fn (str, Variable): Location of the archive file.
query (str, Variable, optional): A query string to select a subset of the data.
See :py:meth:`pandas.DataFrame.query`.
prepare_data (callable, optional): A function that receives a :py:class:`pandas.DataFrame`
and returns a modified version.
Returns:
object: An :class:`EcotaxaObject` that allows to access image(s) and metadata.
To read multiple image ranks, provide a tuple of ints as ``img_rank``.
The first output will then be a tuple of images.
The TSV file needs at least an ``img_file_name``
column that provides the name of the image file.
Other columns are read from ``meta``.
The TSV file MAY contain a row of types after the header
(``"[f]"`` for numeric columns, ``"[t]"`` else).
Example:
.. code-block:: python
with Pipeline() as p:
obj = EcotaxaReader("path/to/archive.zip")
p.transform_stream()
"""
def __init__(
self,
archive_fn: RawOrVariable[str],
*,
query: RawOrVariable[Optional[str]] = None,
prepare_data: Optional[Callable[["pd.DataFrame"], "pd.DataFrame"]] = None,
verbose=False,
keep_going=False,
print_summary=False,
encoding="utf-8",
index_pattern="*ecotaxa_*",
columns: Optional[List] = None,
image_default_mode=None,
):
super().__init__()
self.archive_fn = archive_fn
self.query = query
self.prepare_data = prepare_data
self.verbose = verbose
self.keep_going = keep_going
self.print_summary = print_summary
self.encoding = encoding
self.index_pattern = index_pattern
self.columns = columns
self.image_default_mode = image_default_mode
def transform_stream(self, stream: Stream):
n_indices_received = 0
n_indices_processed = 0
n_objects_received = 0
n_objects_read = 0
messages = []
if self.columns is not None:
usecols = lambda c: any(fnmatch.fnmatch(c, p) for p in self.columns)
else:
usecols = None
with closing_if_closable(stream):
stream_estimator = StreamEstimator()
for obj in stream:
archive_fn, query = self.prepare_input(obj, ("archive_fn", "query"))
if self.verbose:
print(f"EcotaxaReader: Processing archive {archive_fn}...")
try:
archive = Archive(archive_fn)
except UnknownArchiveError as exc:
if self.keep_going:
print(exc)
messages.append(str(exc))
continue
raise
with archive:
index_fns = sorted(archive.find(self.index_pattern))
with stream_estimator.consume(
obj.n_remaining_hint, est_n_emit=len(index_fns)
) as incoming_archive:
n_indices_received += len(index_fns)
archive_estimator = StreamEstimator()
for index_fn in index_fns:
if self.verbose:
print(f"EcotaxaReader: Processing index {index_fn}...")
index_base = os.path.dirname(index_fn)
n_indices_processed += 1
with archive.read_member(index_fn) as index_fp:
dataframe = pd.read_csv(
index_fp,
sep="\t",
low_memory=False,
encoding=self.encoding,
usecols=usecols,
)
dataframe = self._fix_types(dataframe)
if self.prepare_data is not None:
dataframe = self.prepare_data(dataframe)
if "img_rank" not in dataframe.columns:
dataframe["img_rank"] = 1
if "object_id" not in dataframe.columns:
raise ValueError(
f"object_id missing in {archive_fn}/{index_fn}\n"
f"Columns: {dataframe.columns}"
)
if query is not None:
dataframe = dataframe.query(query)
dataframe_by_object_id = dataframe.groupby("object_id")
with archive_estimator.consume(
incoming_archive.emit(),
est_n_emit=len(dataframe_by_object_id),
) as incoming_index:
index_estimator = StreamEstimator()
n_objects_received += len(dataframe_by_object_id)
for (object_id, group) in dataframe_by_object_id:
with index_estimator.consume(
incoming_index.emit(), est_n_emit=1
) as incoming_object:
if "img_file_name" in dataframe.columns:
try:
image_data = {
row[
"img_rank"
]: self._load_image(
archive,
index_base,
row["img_file_name"],
)
for _, row in group.iterrows()
}
except MemberNotFoundError as exc:
if self.keep_going:
print(exc)
messages.append(str(exc))
continue
raise
else:
image_data = {}
meta = group.iloc[0].to_dict()
yield self.prepare_output(
obj.copy(),
EcotaxaObject(
meta=meta,
image_data=image_data,
index_fn=index_fn,
default_mode=self.image_default_mode,
archive_fn=archive_fn,
),
n_remaining_hint=incoming_object.emit(),
)
n_objects_read += 1
if self.print_summary:
print(f"EcotaxaReader: Read {n_objects_read} objects.")
if messages:
print("EcotaxaReader: Messages")
for msg in messages:
print(msg)
@staticmethod
def _load_image(archive: Archive, index_base, image_fn):
image_fn = os.path.join(index_base, image_fn)
image_data = io.BytesIO()
with archive.read_member(image_fn) as image_fp:
copyfileobj(image_fp, image_data)
return image_data
@staticmethod
def _fix_types(dataframe):
first_row = dataframe.iloc[0]
num_cols = []
for c, v in first_row.items():
if v == "[f]":
num_cols.append(c)
elif v == "[t]":
continue
else:
# If the first row contains other values than [f] or [t],
# it is not a type header and the dataframe doesn't need to be changed.
return dataframe
dataframe = dataframe.iloc[1:].copy()
dataframe[num_cols] = dataframe[num_cols].apply(
pd.to_numeric, errors="coerce", axis=1
)
return dataframe